| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236 |
- package prom
- import (
- "fmt"
- "math"
- "strconv"
- "strings"
- "github.com/opencost/opencost/core/pkg/log"
- "github.com/opencost/opencost/core/pkg/source"
- "github.com/opencost/opencost/core/pkg/util"
- )
- var (
- // Static Warnings for data point parsing
- InfWarning warning = newWarning("Found Inf value parsing vector data point for metric")
- NaNWarning warning = newWarning("Found NaN value parsing vector data point for metric")
- )
- func DataFieldFormatErr(query string, promResponse interface{}) error {
- return fmt.Errorf("Error parsing Prometheus response: 'data' field improperly formatted. Query: '%s'. Response: '%+v'", query, promResponse)
- }
- func DataPointFormatErr(query string, promResponse interface{}) error {
- return fmt.Errorf("Error parsing Prometheus response: improperly formatted datapoint. Query: '%s'. Response: '%+v'", query, promResponse)
- }
- func MetricFieldDoesNotExistErr(query string, promResponse interface{}) error {
- return fmt.Errorf("Error parsing Prometheus response: 'metric' field does not exist in data result vector. Query: '%s'. Response: '%+v'", query, promResponse)
- }
- func MetricFieldFormatErr(query string, promResponse interface{}) error {
- return fmt.Errorf("Error parsing Prometheus response: 'metric' field improperly formatted. Query: '%s'. Response: '%+v'", query, promResponse)
- }
- func NoDataErr(query string) error {
- return source.NewNoDataError(query)
- }
- func PromUnexpectedResponseErr(query string, promResponse interface{}) error {
- return fmt.Errorf("Error parsing Prometheus response: unexpected response. Query: '%s'. Response: '%+v'", query, promResponse)
- }
- func QueryResultNilErr(query string) error {
- return source.NewCommError(query)
- }
- func ResultFieldDoesNotExistErr(query string, promResponse interface{}) error {
- return fmt.Errorf("Error parsing Prometheus response: 'result' field does not exist. Query: '%s'. Response: '%+v'", query, promResponse)
- }
- func ResultFieldFormatErr(query string, promResponse interface{}) error {
- return fmt.Errorf("Error parsing Prometheus response: 'result' field improperly formatted. Query: '%s'. Response: '%+v'", query, promResponse)
- }
- func ResultFormatErr(query string, promResponse interface{}) error {
- return fmt.Errorf("Error parsing Prometheus response: 'result' field improperly formatted. Query: '%s'. Response: '%+v'", query, promResponse)
- }
- func ValueFieldDoesNotExistErr(query string, promResponse interface{}) error {
- return fmt.Errorf("Error parsing Prometheus response: 'value' field does not exist in data result vector. Query: '%s'. Response: '%+v'", query, promResponse)
- }
- // NewQueryResultError returns a QueryResults object with an error set and does not parse a result.
- func NewQueryResultError(query string, err error) *source.QueryResults {
- qrs := source.NewQueryResults(query)
- qrs.Error = err
- return qrs
- }
- // NewQueryResults accepts the raw prometheus query result and returns an array of
- // QueryResult objects
- func NewQueryResults(query string, queryResult interface{}, resultKeys *source.ResultKeys) *source.QueryResults {
- qrs := source.NewQueryResults(query)
- if queryResult == nil {
- qrs.Error = QueryResultNilErr(query)
- return qrs
- }
- data, ok := queryResult.(map[string]interface{})["data"]
- if !ok {
- e, err := wrapPrometheusError(query, queryResult)
- if err != nil {
- qrs.Error = err
- return qrs
- }
- qrs.Error = fmt.Errorf("%s", e)
- return qrs
- }
- // Deep Check for proper formatting
- d, ok := data.(map[string]interface{})
- if !ok {
- qrs.Error = DataFieldFormatErr(query, data)
- return qrs
- }
- resultData, ok := d["result"]
- if !ok {
- qrs.Error = ResultFieldDoesNotExistErr(query, d)
- return qrs
- }
- resultsData, ok := resultData.([]interface{})
- if !ok {
- qrs.Error = ResultFieldFormatErr(query, resultData)
- return qrs
- }
- // Result vectors from the query
- var results []*source.QueryResult
- // Parse raw results and into QueryResults
- for _, val := range resultsData {
- resultInterface, ok := val.(map[string]interface{})
- if !ok {
- qrs.Error = ResultFormatErr(query, val)
- return qrs
- }
- metricInterface, ok := resultInterface["metric"]
- if !ok {
- qrs.Error = MetricFieldDoesNotExistErr(query, resultInterface)
- return qrs
- }
- metricMap, ok := metricInterface.(map[string]interface{})
- if !ok {
- qrs.Error = MetricFieldFormatErr(query, metricInterface)
- return qrs
- }
- // Define label string for values to ensure that we only run labelsForMetric once
- // if we receive multiple warnings.
- var labelString string = ""
- // Determine if the result is a ranged data set or single value
- _, isRange := resultInterface["values"]
- var vectors []*util.Vector
- if !isRange {
- dataPoint, ok := resultInterface["value"]
- if !ok {
- qrs.Error = ValueFieldDoesNotExistErr(query, resultInterface)
- return qrs
- }
- // Append new data point, log warnings
- v, warn, err := parseDataPoint(query, dataPoint)
- if err != nil {
- qrs.Error = err
- return qrs
- }
- if warn != nil {
- log.DedupedWarningf(5, "%s\nQuery: %s\nLabels: %s", warn.Message(), query, labelsForMetric(metricMap))
- }
- vectors = append(vectors, v)
- } else {
- values, ok := resultInterface["values"].([]interface{})
- if !ok {
- qrs.Error = fmt.Errorf("Values field is improperly formatted")
- return qrs
- }
- // Append new data points, log warnings
- for _, value := range values {
- v, warn, err := parseDataPoint(query, value)
- if err != nil {
- qrs.Error = err
- return qrs
- }
- if warn != nil {
- if labelString == "" {
- labelString = labelsForMetric(metricMap)
- }
- log.DedupedWarningf(5, "%s\nQuery: %s\nLabels: %s", warn.Message(), query, labelString)
- }
- vectors = append(vectors, v)
- }
- }
- results = append(results, source.NewQueryResult(metricMap, vectors, resultKeys))
- }
- qrs.Results = results
- return qrs
- }
- // parseDataPoint parses a data point from raw prometheus query results and returns
- // a new Vector instance containing the parsed data along with any warnings or errors.
- func parseDataPoint(query string, dataPoint interface{}) (*util.Vector, warning, error) {
- var w warning = nil
- value, ok := dataPoint.([]interface{})
- if !ok || len(value) != 2 {
- return nil, w, DataPointFormatErr(query, dataPoint)
- }
- strVal := value[1].(string)
- v, err := strconv.ParseFloat(strVal, 64)
- if err != nil {
- return nil, w, err
- }
- // Test for +Inf and -Inf (sign: 0), Test for NaN
- if math.IsInf(v, 0) {
- w = InfWarning
- v = 0.0
- } else if math.IsNaN(v) {
- w = NaNWarning
- v = 0.0
- }
- return &util.Vector{
- Timestamp: math.Round(value[0].(float64)/10) * 10,
- Value: v,
- }, w, nil
- }
- func labelsForMetric(metricMap map[string]interface{}) string {
- var pairs []string
- for k, v := range metricMap {
- pairs = append(pairs, fmt.Sprintf("%s: %+v", k, v))
- }
- return fmt.Sprintf("{%s}", strings.Join(pairs, ", "))
- }
- func wrapPrometheusError(query string, qr interface{}) (string, error) {
- e, ok := qr.(map[string]interface{})["error"]
- if !ok {
- return "", PromUnexpectedResponseErr(query, qr)
- }
- eStr, ok := e.(string)
- return fmt.Sprintf("'%s' parsing query '%s'", eStr, query), nil
- }
|