فهرست منبع

Updates to promless collector API

Matt Bolt 1 سال پیش
والد
کامیت
02952c8dc5

+ 32 - 0
modules/collector-source/pkg/collector/avgovertime.go

@@ -0,0 +1,32 @@
+package collector
+
+type AverageOverTimeTransformer struct {
+	name        string
+	labelValues []string
+	total       float64
+	count       int
+}
+
+func AverageOverTime(name string, labelValues []string) MetricAggregator {
+	return &AverageOverTimeTransformer{
+		name:        name,
+		labelValues: labelValues,
+	}
+}
+
+func (m *AverageOverTimeTransformer) Name() string {
+	return m.name
+}
+
+func (m *AverageOverTimeTransformer) LabelValues() []string {
+	return m.labelValues
+}
+
+func (m *AverageOverTimeTransformer) Update(value float64) {
+	m.total += value
+	m.count++
+}
+
+func (m *AverageOverTimeTransformer) Value() float64 {
+	return m.total / float64(m.count)
+}

+ 114 - 0
modules/collector-source/pkg/collector/collector.go

@@ -0,0 +1,114 @@
+package collector
+
+import (
+	"fmt"
+	"slices"
+	"sync"
+	"time"
+)
+
+// Metric names
+const (
+	ContainerMemoryWorkingSetBytes string = "container_memory_working_set_bytes"
+)
+
+// MetricCollectorID is a unique identifier for a specific metric collector instance. We
+// use this identifier to register and unregister metric instances from the metrics collector
+// instead of the metric name and aggregation type to allow selectable cardinality (via labels)
+// across multiple instances of the same aggregation type and metric name.
+type MetricCollectorID string
+
+const (
+	RAMUsageAverageID MetricCollectorID = "RAMUsageAverage"
+	// etc ...
+)
+
+// MetricsCollector is an interface that defines an implementation capable of managing a collection
+// of metric instances, and exposes helper methods for routing metric updates and queries to the
+// proper collector instances.
+type MetricsCollector interface {
+	// Register accepts a `MetricCollector` instance and registers it for routing updates and querying.
+	Register(collector *MetricCollector) error
+
+	// Unregister accepts a `MetricCollectorID` and unregisters the metric collector instance from receiving metrics
+	// updates and query availability.
+	Unregister(collectorID MetricCollectorID) bool
+
+	// Query accepts a `MetricCollectorID` and returns a slice of `MetricResult` instances for that collector.
+	Query(collectorID MetricCollectorID) ([]*MetricResult, error)
+
+	// Update accepts the name of a metric, the label set and values to update the metric, the updated value, and a timestamp.
+	// This method does not accept a `MetricCollectorID` because it provides updates across many potential metric collector instances
+	// which utilize the same metric.
+	Update(metricName string, labels map[string]string, value float64, timestamp *time.Time)
+}
+
+// InMemoryMetricsCollector is a thread-safe implementation of the `MetricsCollector` interface that stores metric instances
+// in memory.
+type InMemoryMetricsCollector struct {
+	lock          sync.Mutex
+	byMetricName  map[string][]*MetricCollector
+	byCollectorID map[MetricCollectorID]*MetricCollector
+}
+
+func NewInMemoryMetricsCollector() MetricsCollector {
+	return &InMemoryMetricsCollector{
+		byMetricName:  make(map[string][]*MetricCollector),
+		byCollectorID: make(map[MetricCollectorID]*MetricCollector),
+	}
+}
+
+func (immc *InMemoryMetricsCollector) Register(collector *MetricCollector) error {
+	immc.lock.Lock()
+	defer immc.lock.Unlock()
+
+	if _, ok := immc.byCollectorID[collector.id]; ok {
+		return fmt.Errorf("collector with ID: %s already exists", collector.id)
+	}
+
+	immc.byCollectorID[collector.id] = collector
+	immc.byMetricName[collector.metricName] = append(immc.byMetricName[collector.metricName], collector)
+	return nil
+}
+
+func (immc *InMemoryMetricsCollector) Unregister(collectorID MetricCollectorID) bool {
+	immc.lock.Lock()
+	defer immc.lock.Unlock()
+
+	if _, ok := immc.byCollectorID[collectorID]; !ok {
+		return false
+	}
+
+	inst := immc.byCollectorID[collectorID]
+	immc.byMetricName[inst.metricName] = slices.DeleteFunc(immc.byMetricName[inst.metricName], func(mc *MetricCollector) bool {
+		return mc == nil || mc.id == collectorID
+	})
+
+	delete(immc.byCollectorID, collectorID)
+	return true
+}
+
+func (immc *InMemoryMetricsCollector) Query(collectorID MetricCollectorID) ([]*MetricResult, error) {
+	immc.lock.Lock()
+	defer immc.lock.Unlock()
+
+	if _, ok := immc.byCollectorID[collectorID]; !ok {
+		return nil, fmt.Errorf("collector with ID: %s does not exist", collectorID)
+	}
+
+	return immc.byCollectorID[collectorID].Get(), nil
+}
+
+func (immc *InMemoryMetricsCollector) Update(metricName string, labels map[string]string, value float64, timestamp *time.Time) {
+	immc.lock.Lock()
+	defer immc.lock.Unlock()
+
+	for _, collector := range immc.byMetricName[metricName] {
+		labelValues := make([]string, 0, len(collector.labels))
+		for _, label := range collector.labels {
+			labelValues = append(labelValues, labels[label])
+		}
+
+		collector.Update(labelValues, value, timestamp)
+	}
+}

+ 55 - 0
modules/collector-source/pkg/collector/collector_test.go

@@ -0,0 +1,55 @@
+package collector
+
+import "testing"
+
+func TestBasicCollectorFunctionality(t *testing.T) {
+	// avg of 55 (sum of [1,10]) / data points (10) = 5.5
+	const expected = 55.0 / 10.0
+
+	labelsA := map[string]string{
+		"container": "container-a",
+		"uid":       "uid-a",
+		"pod":       "pod-a",
+		"namespace": "namespace-a",
+		"instance":  "instance-a",
+		"node":      "node-a",
+		"cluster":   "cluster-a",
+	}
+
+	labelsB := map[string]string{
+		"container": "container-b",
+		"uid":       "uid-b",
+		"pod":       "pod-b",
+		"namespace": "namespace-b",
+		"instance":  "instance-b",
+		"node":      "node-b",
+		"cluster":   "cluster-a",
+	}
+
+	collector := NewOpenCostMetricCollector()
+
+	for i := 1; i <= 10; i++ {
+		collector.Update(ContainerMemoryWorkingSetBytes, labelsA, float64(i), nil)
+		collector.Update(ContainerMemoryWorkingSetBytes, labelsB, float64(i), nil)
+	}
+
+	results, err := collector.Query(RAMUsageAverageID)
+	if err != nil {
+		t.Fatalf("error: %v", err)
+	}
+
+	if len(results) != 2 {
+		t.Fatalf("expected 2 results, got %d", len(results))
+	}
+
+	for _, result := range results {
+		if result.Values[0].Value != expected {
+			t.Fatalf("expected %f, got %f", expected, result.Values[0].Value)
+		}
+
+		t.Logf("+-- Result -------------------------------")
+		t.Logf("| Labels: %v", result.MetricLabels)
+		t.Logf("| Value: %v", result.Values[0].Value)
+		t.Logf("+----------------------------------------")
+	}
+}

+ 45 - 0
modules/collector-source/pkg/collector/helper.go

@@ -0,0 +1,45 @@
+package collector
+
+import (
+	"hash/fnv"
+	"strings"
+)
+
+func hash(s []string) uint64 {
+	h := fnv.New64a()
+	for _, v := range s {
+		h.Write([]byte(v))
+	}
+	return h.Sum64()
+}
+
+func metricNameFor(metric string, labels []string, values []string) string {
+	var sb strings.Builder
+	sb.WriteString(metric)
+	sb.WriteRune('{')
+	for i := 0; i < len(labels); i++ {
+		sb.WriteRune('"')
+		sb.WriteString(labels[i])
+		sb.WriteString(`"="`)
+		sb.WriteString(values[i])
+		sb.WriteRune('"')
+		if i < len(labels)-1 {
+			sb.WriteRune(',')
+		}
+	}
+	sb.WriteRune('}')
+	return sb.String()
+}
+
+func toMap(labels []string, values []string) map[string]string {
+	min := len(labels)
+	if len(values) < min {
+		min = len(values)
+	}
+
+	m := make(map[string]string, min)
+	for i := 0; i < min; i++ {
+		m[labels[i]] = values[i]
+	}
+	return m
+}

+ 61 - 86
modules/collector-source/pkg/collector/metric.go

@@ -1,111 +1,86 @@
 package collector
 
 import (
-	"hash/fnv"
-	"strings"
+	"time"
 )
 
-type Metric interface {
-	Name() string
-	Update(value float64)
-	Value() float64
-}
-
-type AverageOverTimeMetric struct {
-	name  string
-	total float64
-	count int
-}
-
-func NewAverageOverTimeMetric(name string) *AverageOverTimeMetric {
-	return &AverageOverTimeMetric{
-		name: name,
-	}
-}
-
-func (m *AverageOverTimeMetric) Name() string {
-	return m.name
+// MetricValue is a resulting data point value with an optional timestamp.
+type MetricValue struct {
+	Value     float64
+	Timestamp *time.Time
 }
 
-func (m *AverageOverTimeMetric) Update(value float64) {
-	m.total += value
-	m.count++
+// MetricResult contains a resulting metric name, the associated labels and label values, and a slice of
+// MetricValues.
+type MetricResult struct {
+	Name         string
+	MetricLabels map[string]string
+	Values       []MetricValue
 }
 
-func (m *AverageOverTimeMetric) Value() float64 {
-	return m.total / float64(m.count)
+// MetricAggregator is an interface that defines the methods for a metric collector aggregation.
+// For example, we have a metric `foo_metric`, and we wish to query and collect the average over time.
+// In this case, the `AverageOverTime` component is the MetricAggregator. It is the component responsible
+// for routing updates to metric values into their proper condensed form.
+type MetricAggregator interface {
+	Name() string
+	Update(value float64)
+	Value() float64
+	LabelValues() []string
 }
 
-// avg(
-// 		avg_over_time(
-// 			container_memory_working_set_bytes{
-// 				container!="",
-// 				container!="POD",
-// 				<some_custom_filter>
-// 			}[1h]
-// 		)
-// ) by (container, pod, namespace, instance, cluster_id)
-
-type ContainerWorkingSetBytes struct {
-	name    string
-	labels  []string
-	metrics map[uint64]*AverageOverTimeMetric
+// MetricAggregatorFactory is a function that accepts a string name and returns a pointer to a MetricAggregator
+// implementation.
+type MetricAggregatorFactory func(name string, labelValues []string) MetricAggregator
+
+// MetricCollector is a data structure that represents a specific metric collector instance that contains it's own breakdown
+// of stored metrics by a specific label set.
+type MetricCollector struct {
+	id                MetricCollectorID // ie: RAMUsageAverage
+	metricName        string            // ie: container_memory_working_set_bytes
+	labels            []string
+	aggregatorFactory MetricAggregatorFactory
+	metrics           map[uint64]MetricAggregator // map[hash(labelValues)] = aggregator
 }
 
-func NewContainerWorkingSetBytes() *ContainerWorkingSetBytes {
-	return &ContainerWorkingSetBytes{
-		name:    "container_memory_working_set_bytes",
-		labels:  []string{"container", "uid", "pod", "namespace", "instance", "node", "cluster"},
-		metrics: make(map[uint64]*AverageOverTimeMetric),
+// NewMetricCollector creates a new MetricCollector instance with a unique identifier. The metric name is the specific
+// name of the collected metric that will be used to query the
+func NewMetricCollector(id MetricCollectorID, metricName string, labels []string, aggregatorFactory MetricAggregatorFactory) *MetricCollector {
+	return &MetricCollector{
+		id:                id,
+		metricName:        metricName,
+		labels:            labels,
+		aggregatorFactory: aggregatorFactory,
+		metrics:           make(map[uint64]MetricAggregator),
 	}
 }
 
-// We could also be way more generic than this and do something like: Update(labelValues []string, value float64)
-func (cwsb *ContainerWorkingSetBytes) Update(container, uid, pod, namespace, instance, node, cluster string, value float64) {
-	// hash key
-	key := hash([]string{container, uid, pod, namespace, instance, node, cluster})
-	if cwsb.metrics[key] == nil {
-		cwsb.metrics[key] = NewAverageOverTimeMetric(metricNameFor(cwsb.name, cwsb.labels, []string{container, uid, pod, namespace, instance, node, cluster}))
+func (mi *MetricCollector) Update(labelValues []string, value float64, timestamp *time.Time) {
+	key := hash(labelValues)
+	if mi.metrics[key] == nil {
+		mi.metrics[key] = mi.aggregatorFactory(metricNameFor(mi.metricName, mi.labels, labelValues), labelValues)
 	}
-	cwsb.metrics[key].Update(value)
-}
-
-type NamedMetric struct {
-	name    string // RAMUsageAvg
-	buckets map[uint64]*ContainerWorkingSetBytes
-}
-
-func (nm *NamedMetric) Update(container, uid, pod, namespace, instance, node, cluster string, value float64) {
-	//bucket := time.Now().Unix()
 
+	mi.metrics[key].Update(value)
 }
 
-func (nm *NamedMetric) Value() float64 {
-	return 0
-}
+func (mi *MetricCollector) Get() []*MetricResult {
+	results := make([]*MetricResult, 0, len(mi.metrics))
+	for _, metric := range mi.metrics {
+		mr := &MetricResult{
+			Name:         metric.Name(),
+			MetricLabels: toMap(mi.labels, metric.LabelValues()),
+			Values: []MetricValue{
+				{Value: metric.Value(), Timestamp: nil},
+			},
+		}
 
-func hash(s []string) uint64 {
-	h := fnv.New64a()
-	for _, v := range s {
-		h.Write([]byte(v))
+		results = append(results, mr)
 	}
-	return h.Sum64()
+
+	return results
 }
 
-func metricNameFor(metric string, labels []string, values []string) string {
-	var sb strings.Builder
-	sb.WriteString(metric)
-	sb.WriteRune('{')
-	for i := 0; i < len(labels); i++ {
-		sb.WriteRune('"')
-		sb.WriteString(labels[i])
-		sb.WriteString(`"="`)
-		sb.WriteString(values[i])
-		sb.WriteRune('"')
-		if i < len(labels)-1 {
-			sb.WriteRune(',')
-		}
-	}
-	sb.WriteRune('}')
-	return sb.String()
+func (mi *MetricCollector) Labels() []string {
+	return mi.labels
 }

+ 20 - 0
modules/collector-source/pkg/collector/metrics.go

@@ -0,0 +1,20 @@
+package collector
+
+// avg(
+// 		avg_over_time(
+// 			container_memory_working_set_bytes{
+// 				container!="",
+// 				container!="POD",
+// 				<some_custom_filter>
+// 			}[1h]
+// 		)
+// ) by (container, pod, namespace, instance, cluster_id)
+
+func NewRAMUsageAverageMetricInstance() *MetricCollector {
+	return NewMetricCollector(
+		RAMUsageAverageID,
+		ContainerMemoryWorkingSetBytes,
+		[]string{"container", "uid", "pod", "namespace", "instance", "node", "cluster"},
+		AverageOverTime,
+	)
+}

+ 19 - 0
modules/collector-source/pkg/collector/opencost.go

@@ -0,0 +1,19 @@
+package collector
+
+func NewOpenCostMetricCollector() MetricsCollector {
+	memCollector := NewInMemoryMetricsCollector()
+
+	// Register all the metrics
+	memCollector.Register(NewRAMUsageAverageMetricInstance())
+	// etc...
+
+	// Use ./modules/prometheus-source/pkg/prom/metricsquerier.go as a good
+	// reference for the Queries we require (and therefore, the metrics we need to register).
+
+	return memCollector
+}
+
+// There are a couple ways we can make "Reporting" of the metrics a bit cleaner:
+// -- we can write thin API friendly wrappers that can be used to funnel value updates into
+//    collector.Update(...) calls [similar to prom]. This is purely convenience and there isn't
+//    really an architecture bearing weight on this decisions. Whatever is easier to use.

+ 1 - 2
modules/collector-source/pkg/metrics/parser/parser.go

@@ -19,8 +19,7 @@ type MetricRecord struct {
 // Parse reads the input reader containing the raw metric format, and returns a slice of MetricRecord instances
 // containing the data parsed from the input.
 func Parse(reader io.Reader) ([]*MetricRecord, error) {
-	p := newParser(reader)
-	return p.parse()
+	return newParser(reader).parse()
 }
 
 // Parses Metrics from raw metric format.