| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103 |
- package metric
- import (
- "fmt"
- "slices"
- "sync"
- "time"
- "github.com/opencost/opencost/modules/collector-source/pkg/metric/aggregator"
- )
- // MetricStore is an interface that defines an implementation capable of managing a collection
- // of metric instances, and exposes helper methods for routing metric updates and queries to the
- // proper metric instances.
- type MetricStore interface {
- // Register accepts a `MetricCollector` instance and registers it for routing updates and querying.
- Register(collector *MetricCollector) error
- // Unregister accepts a `MetricCollectorID` and unregisters the metric metric instance from receiving metrics
- // updates and query availability.
- Unregister(collectorID MetricCollectorID) bool
- // Query accepts a `MetricCollectorID` and returns a slice of `MetricResult` instances for that metric.
- Query(collectorID MetricCollectorID) ([]*aggregator.MetricResult, error)
- // Update accepts the name of a metric, the label set and values to update the metric, the updated Value, and a Timestamp.
- // This method does not accept a `MetricCollectorID` because it provides updates across many potential MetricCollector instances
- // which utilize the same metric.
- Update(metricName string, labels map[string]string, value float64, timestamp time.Time, additionalInformation map[string]string)
- }
- type MetricStoreFactory func() MetricStore
- // InMemoryMetricStore is a thread-safe implementation of the MetricStore interface that stores MetricCollector instances
- // in memory.
- type InMemoryMetricStore struct {
- lock sync.Mutex
- byMetricName map[string][]*MetricCollector
- byCollectorID map[MetricCollectorID]*MetricCollector
- }
- func NewInMemoryMetricStore() MetricStore {
- return &InMemoryMetricStore{
- byMetricName: make(map[string][]*MetricCollector),
- byCollectorID: make(map[MetricCollectorID]*MetricCollector),
- }
- }
- func (m *InMemoryMetricStore) Register(collector *MetricCollector) error {
- m.lock.Lock()
- defer m.lock.Unlock()
- if _, ok := m.byCollectorID[collector.id]; ok {
- return fmt.Errorf("metric with ID: %s already exists", collector.id)
- }
- m.byCollectorID[collector.id] = collector
- m.byMetricName[collector.metricName] = append(m.byMetricName[collector.metricName], collector)
- return nil
- }
- func (m *InMemoryMetricStore) Unregister(collectorID MetricCollectorID) bool {
- m.lock.Lock()
- defer m.lock.Unlock()
- if _, ok := m.byCollectorID[collectorID]; !ok {
- return false
- }
- inst := m.byCollectorID[collectorID]
- m.byMetricName[inst.metricName] = slices.DeleteFunc(m.byMetricName[inst.metricName], func(mc *MetricCollector) bool {
- return mc == nil || mc.id == collectorID
- })
- delete(m.byCollectorID, collectorID)
- return true
- }
- func (m *InMemoryMetricStore) Query(collectorID MetricCollectorID) ([]*aggregator.MetricResult, error) {
- m.lock.Lock()
- defer m.lock.Unlock()
- if _, ok := m.byCollectorID[collectorID]; !ok {
- return nil, fmt.Errorf("metric with ID: %s does not exist", collectorID)
- }
- return m.byCollectorID[collectorID].Get(), nil
- }
- func (m *InMemoryMetricStore) Update(
- metricName string,
- labels map[string]string,
- value float64,
- timestamp time.Time,
- additionalInformation map[string]string,
- ) {
- m.lock.Lock()
- defer m.lock.Unlock()
- for _, collector := range m.byMetricName[metricName] {
- collector.Update(labels, value, timestamp, additionalInformation)
- }
- }
|