metricsynthesizer.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. package synthetic
  2. import (
  3. "time"
  4. "github.com/opencost/opencost/modules/collector-source/pkg/metric"
  5. )
  6. // InstantMetric is a metric update that happened at a specific timestamp.
  7. type InstantMetric struct {
  8. timestamp time.Time
  9. update *metric.Update
  10. }
  11. // MetricSynthesizer is an implementation prototype for an object capable of processing
  12. // a stream of metric updates, and then synthesizing new metric updates based on the processed
  13. // data.
  14. type MetricSynthesizer interface {
  15. // Process accepts individual Updates from an UpdateSet for processing. Once all Updates
  16. // have been processed, call Synthesize() to generate any additional updates.
  17. Process(t time.Time, update *metric.Update)
  18. // Synthesize will generate all synthetic Update instances after processing all existing updates
  19. // in a set.
  20. Synthesize() []metric.Update
  21. // Clear resets or cycles the current state of the processed metrics to prepare for the next scrape.
  22. Clear()
  23. }
  24. // MetricSynthesizers implements the `metric.Updater` interface, to accept a `metric.UpdateSet` of metric updates,
  25. // pipes each `metric.Update` into the registered MetricSynthesizer instances for processing, and then synthesizes
  26. // new metric updates to append.
  27. type MetricSynthesizers struct {
  28. synthesizers []MetricSynthesizer
  29. next metric.Updater
  30. }
  31. // NewMetricSynthesizers creates a new set of metric synthesizers, which acts as an updater decorator to append
  32. // all newly synthesized metrics onto the existing update set before passing it along to the next updater.
  33. func NewMetricSynthesizers(next metric.Updater, synthesizers ...MetricSynthesizer) *MetricSynthesizers {
  34. return &MetricSynthesizers{
  35. synthesizers: synthesizers,
  36. next: next,
  37. }
  38. }
  39. func (ms *MetricSynthesizers) Update(set *metric.UpdateSet) {
  40. ts := set.Timestamp
  41. // first pass is to have all synthesizers process all updates
  42. for _, synthesizer := range ms.synthesizers {
  43. for i := range len(set.Updates) {
  44. update := set.Updates[i]
  45. synthesizer.Process(ts, &update)
  46. }
  47. }
  48. // second pass is to have the synthesizers generate all synthetic updates
  49. for _, synthesizer := range ms.synthesizers {
  50. updates := synthesizer.Synthesize()
  51. if len(updates) != 0 {
  52. set.Updates = append(set.Updates, updates...)
  53. }
  54. synthesizer.Clear()
  55. }
  56. ms.next.Update(set)
  57. }