Просмотр исходного кода

General Metrics Scraping API, improvements to parser tests.

Matt Bolt 1 год назад
Родитель
Сommit
07786736e4

+ 12 - 1
modules/collector-source/pkg/metrics/parser/lexer.go

@@ -162,7 +162,8 @@ func (l *lexer) float() string {
 			return sb.String()
 		}
 
-		if r == 'N' || r == 'a' || r == 'I' || r == 'n' || r == 'f' || r == '+' || r == '-' || r == '.' || r == 'e' || r == 'E' || r == '_' || unicode.IsDigit(r) {
+		if isOneOf(r, "NaInf+-._eE") || unicode.IsDigit(r) {
+			//if r == 'N' || r == 'a' || r == 'I' || r == 'n' || r == 'f' || r == '+' || r == '-' || r == '.' || r == 'e' || r == 'E' || r == '_' || unicode.IsDigit(r) {
 			sb.WriteRune(r)
 		} else {
 			return sb.String()
@@ -170,6 +171,16 @@ func (l *lexer) float() string {
 	}
 }
 
+func isOneOf(ch rune, chars string) bool {
+	for _, c := range chars {
+		if c == ch {
+			return true
+		}
+	}
+
+	return false
+}
+
 func isAlphaNumeric(ch rune) bool {
 	return unicode.IsLetter(ch) || unicode.IsDigit(ch)
 }

+ 7 - 0
modules/collector-source/pkg/metrics/parser/parser.go

@@ -16,6 +16,13 @@ type MetricRecord struct {
 	Timestamp *time.Time
 }
 
+// Parse reads the input reader containing the raw metric format, and returns a slice of MetricRecord instances
+// containing the data parsed from the input.
+func Parse(reader io.Reader) ([]*MetricRecord, error) {
+	p := newParser(reader)
+	return p.parse()
+}
+
 // Parses Metrics from raw metric format.
 //
 // metric_name ["{" label_name "=" `"` label_value `"` { "," label_name"=" `"` label_value `"` } [ "," ] "}"] value [ timestamp ]

+ 57 - 0
modules/collector-source/pkg/metrics/parser/parser_test.go

@@ -2,9 +2,38 @@ package parser
 
 import (
 	"os"
+	"strings"
 	"testing"
 )
 
+const interestingFloatCases = `
+# HELP random comment
+test_metric{label1="value1", label2="value2"} .0123 1708014188740
+test_metric{label1="value1", label2="value2"} 1.23e-2 1708014188740
+test_metric{label1="value1", label2="value2"} 1.23e2 1708014188740
+test_metric{label1="value1", label2="value2"} 1.23e+2 1708014188740
+test_metric{label1="value1", label2="value2"} 0.23E-1 1708014188740
+test_metric{label1="value1", label2="value2"} 0.23E1 1708014188740
+test_metric{label1="value1", label2="value2"} 0.23E+1 1708014188740
+test_metric{label1="value1", label2="value2"} 1_000_000.0 1708014188740
+test_metric{label1="value1", label2="value2"} ___123 1708014188740
+`
+
+const cases = `
+# HELP random comment 
+test_metric{  , label1="value1"   , label2="value2" ,} 123 1708014188740
+a_metric{} 0
+another_metric{__foo="bar", } 15.2 1708014188740
+spaced_metric
+{
+   label1="value1",
+   label2="value2"
+   
+}
+   123.52
+   1708014188740
+`
+
 func TestParser(t *testing.T) {
 	f, err := os.Open("scrape.txt")
 	if err != nil {
@@ -23,3 +52,31 @@ func TestParser(t *testing.T) {
 		t.Logf("Metric: %v", m)
 	}
 }
+
+func TestInterestingFloatParsing(t *testing.T) {
+	f := strings.NewReader(interestingFloatCases)
+	p := newParser(f)
+
+	metrics, err := p.parse()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	for _, m := range metrics {
+		t.Logf("Metric: %v", m)
+	}
+}
+
+func TestMetricFormatResilience(t *testing.T) {
+	f := strings.NewReader(cases)
+	p := newParser(f)
+
+	metrics, err := p.parse()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	for _, m := range metrics {
+		t.Logf("Metric: %v", m)
+	}
+}

+ 31 - 0
modules/collector-source/pkg/metrics/scraper.go

@@ -0,0 +1,31 @@
+package metrics
+
+import (
+	"github.com/opencost/opencost/modules/collector-source/pkg/metrics/parser"
+	"github.com/opencost/opencost/modules/collector-source/pkg/metrics/target"
+)
+
+// MetricScraper is a struct that is used to scrape and parse a raw metrics `ScrapeTarget.`
+type MetricScraper struct {
+	scrapeTarget target.ScrapeTarget
+}
+
+func NewMetricScraper(scrapeTarget target.ScrapeTarget) *MetricScraper {
+	return &MetricScraper{
+		scrapeTarget: scrapeTarget,
+	}
+}
+
+func (s *MetricScraper) Scrape() ([]*parser.MetricRecord, error) {
+	reader, err := s.scrapeTarget.Load()
+	if err != nil {
+		return nil, err
+	}
+
+	metrics, err := parser.Parse(reader)
+	if err != nil {
+		return nil, err
+	}
+
+	return metrics, nil
+}

+ 20 - 0
modules/collector-source/pkg/metrics/target/filetarget.go

@@ -0,0 +1,20 @@
+package target
+
+import (
+	"io"
+	"os"
+)
+
+type FileTarget struct {
+	path string
+}
+
+func NewFileTarget(path string) *FileTarget {
+	return &FileTarget{
+		path: path,
+	}
+}
+
+func (t *FileTarget) Load() (io.Reader, error) {
+	return os.Open(t.path)
+}

+ 9 - 0
modules/collector-source/pkg/metrics/target/target.go

@@ -0,0 +1,9 @@
+package target
+
+import "io"
+
+// ScrapeTarget is an interface representing an object that is capable of loading/refreshing it's
+// target data.
+type ScrapeTarget interface {
+	Load() (io.Reader, error)
+}

+ 26 - 0
modules/collector-source/pkg/metrics/target/urltarget.go

@@ -0,0 +1,26 @@
+package target
+
+import (
+	"fmt"
+	"io"
+	"net/http"
+)
+
+type UrlTarget struct {
+	url string
+}
+
+func NewUrlTarget(url string) *UrlTarget {
+	return &UrlTarget{
+		url: url,
+	}
+}
+
+func (t *UrlTarget) Load() (io.Reader, error) {
+	resp, err := http.Get(t.url)
+	if err != nil {
+		return nil, fmt.Errorf("failed to fetch URL: %w", err)
+	}
+
+	return resp.Body, nil
+}

+ 4 - 4
modules/prometheus-source/pkg/prom/metricsquerier.go

@@ -573,7 +573,7 @@ func (pds *PrometheusMetricsQuerier) QueryRAMUsageAvg(start, end time.Time) *sou
 }
 
 func (pds *PrometheusMetricsQuerier) QueryRAMUsageMax(start, end time.Time) *source.Future[source.RAMUsageMaxResult] {
-	const queryFmtRAMUsageMax = `max(max_over_time(container_memory_working_set_bytes{container!="", container_name!="POD", container!="POD", %s}[%s])) by (container_name, container, pod_name, pod, namespace, instance, %s)`
+	const queryFmtRAMUsageMax = `max(max_over_time(container_memory_working_set_bytes{container!="", container_name!="POD", container!="POD", %s}[%s])) by (container_name, container, pod_name, pod, namespace, node, instance, %s)`
 	// env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel()
 
 	cfg := pds.promConfig
@@ -621,7 +621,7 @@ func (pds *PrometheusMetricsQuerier) QueryCPURequests(start, end time.Time) *sou
 }
 
 func (pds *PrometheusMetricsQuerier) QueryCPUUsageAvg(start, end time.Time) *source.Future[source.CPUUsageAvgResult] {
-	const queryFmtCPUUsageAvg = `avg(rate(container_cpu_usage_seconds_total{container!="", container_name!="POD", container!="POD", %s}[%s])) by (container_name, container, pod_name, pod, namespace, instance, %s)`
+	const queryFmtCPUUsageAvg = `avg(rate(container_cpu_usage_seconds_total{container!="", container_name!="POD", container!="POD", %s}[%s])) by (container_name, container, pod_name, pod, namespace, node, instance, %s)`
 	// env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel()
 
 	cfg := pds.promConfig
@@ -650,7 +650,7 @@ func (pds *PrometheusMetricsQuerier) QueryCPUUsageMax(start, end time.Time) *sou
 	//
 	// If changing the name of the recording rule, make sure to update the
 	// corresponding diagnostic query to avoid confusion.
-	const queryFmtCPUUsageMaxRecordingRule = `max(max_over_time(kubecost_container_cpu_usage_irate{%s}[%s])) by (container_name, container, pod_name, pod, namespace, instance, %s)`
+	const queryFmtCPUUsageMaxRecordingRule = `max(max_over_time(kubecost_container_cpu_usage_irate{%s}[%s])) by (container_name, container, pod_name, pod, namespace, node, instance, %s)`
 	// env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel()
 
 	// This is the subquery equivalent of the above recording rule query. It is
@@ -663,7 +663,7 @@ func (pds *PrometheusMetricsQuerier) QueryCPUUsageMax(start, end time.Time) *sou
 	// the resolution, to make sure the irate always has two points to query
 	// in case the Prom scrape duration has been reduced to be equal to the
 	// ETL resolution.
-	const queryFmtCPUUsageMaxSubquery = `max(max_over_time(irate(container_cpu_usage_seconds_total{container!="POD", container!="", %s}[%s])[%s:%s])) by (container, pod_name, pod, namespace, instance, %s)`
+	const queryFmtCPUUsageMaxSubquery = `max(max_over_time(irate(container_cpu_usage_seconds_total{container!="POD", container!="", %s}[%s])[%s:%s])) by (container, pod_name, pod, namespace, node, instance, %s)`
 	// env.GetPromClusterFilter(), doubleResStr, durStr, resStr, env.GetPromClusterLabel()
 
 	cfg := pds.promConfig