Przeglądaj źródła

WIP: custom field mappings via env vars

Signed-off-by: Matt Bolt <mbolt35@gmail.com>
Matt Bolt 9 miesięcy temu
rodzic
commit
ecd5028bb2

+ 1 - 0
core/pkg/source/decoders.go

@@ -11,6 +11,7 @@ const (
 	InstanceLabel        = "instance"
 	InstanceTypeLabel    = "instance_type"
 	ContainerLabel       = "container"
+	ContainerNameLabel   = "container_name"
 	PodLabel             = "pod"
 	PodNameLabel         = "pod_name"
 	ProviderIDLabel      = "provider_id"

+ 152 - 0
core/pkg/source/fieldmapper.go

@@ -0,0 +1,152 @@
+package source
+
+import (
+	"fmt"
+	"maps"
+
+	"github.com/opencost/opencost/core/pkg/util/sliceutil"
+)
+
+// A FieldMapper maps the source name for the field to a corresponding set of
+// potential labels representing that source field. It also maintains a reverse mapping
+// which allows finding a source field by a label.
+//
+// By design, there are NO overlapping labels permitted between source fields and labels. That is,
+// labels for a specific field cannot overlap with labels for another field.
+type FieldMapper interface {
+	// LabelsFor returns all of the labels associated with that specific field.
+	LabelsFor(field string) []string
+
+	// FieldFor returns the field the provided label is associated with
+	FieldFor(label string) string
+
+	// Resolve attempts to find all labels associated with a given value. If the value
+	// is a field, then all of the labels associated with that field are returned. If
+	// the value is a label, then all the labels (including the provided value) are
+	// returned. Lastly, if neither of the previous conditions are true, an error is
+	// returned.
+	Resolve(value string) ([]string, error)
+}
+
+// NoOpFieldMapper is a no-op implementation of the FieldMapper interface, which provides a direct
+// passthrough of lookups. Useful when all field names are deterministic!
+type NoOpFieldMapper struct{}
+
+// NewNoOpFieldMapper creates a new NoOpFieldMapper instance for use with query results where the
+// field names are deterministic.
+func NewNoOpFieldMapper() *NoOpFieldMapper {
+	return new(NoOpFieldMapper)
+}
+
+// LabelsFor returns all of the labels associated with that specific field.
+func (nofm *NoOpFieldMapper) LabelsFor(field string) []string {
+	return []string{field}
+}
+
+// FieldFor returns the field the provided label is associated with
+func (nofm *NoOpFieldMapper) FieldFor(label string) string {
+	return label
+}
+
+// Resolve returns the value passed into the function
+func (nofm *NoOpFieldMapper) Resolve(value string) ([]string, error) {
+	return []string{value}, nil
+}
+
+// ReverseFieldMapper maps the source name for the field to a corresponding set of
+// potential labels representing that source field. It also maintains a reverse mapping
+// which allows finding a source field by a label.
+//
+// By design, there are NO overlapping labels permitted between source fields and labels. That is,
+// labels for a specific field cannot overlap with labels for another field.
+type ReverseFieldMapper struct {
+	fieldToLabel map[string][]string
+	labelToField map[string]string
+}
+
+// NewReverseFieldMapper creates a new ReverseFieldMapper instance.
+func NewReverseFieldMapper() *ReverseFieldMapper {
+	return &ReverseFieldMapper{
+		fieldToLabel: make(map[string][]string),
+		labelToField: make(map[string]string),
+	}
+}
+
+// LabelsFor returns all of the labels associated with that specific field.
+func (rfm *ReverseFieldMapper) LabelsFor(field string) []string {
+	return rfm.fieldToLabel[field]
+}
+
+// FieldFor returns the field the provided label is associated with
+func (rfm *ReverseFieldMapper) FieldFor(label string) string {
+	return rfm.labelToField[label]
+}
+
+// Resolve attempts to find all labels associated with a given value. If the value
+// is a field, then all of the labels associated with that field are returned. If
+// the value is a label, then all the labels (including the provided value) are
+// returned. Lastly, if neither of the previous conditions are true, an error is
+// returned.
+func (rfm *ReverseFieldMapper) Resolve(value string) ([]string, error) {
+	labels := rfm.LabelsFor(value)
+	if len(labels) > 0 {
+		return labels, nil
+	}
+
+	field := rfm.FieldFor(value)
+	if field != "" {
+		return rfm.LabelsFor(field), nil
+	}
+
+	return nil, fmt.Errorf("no labels found for value %s", value)
+}
+
+// Set appends a field -> labels mapping and returns an error if any keys or fields
+// overlap
+func (rfm *ReverseFieldMapper) Set(source string, labels ...string) error {
+	if rfm.fieldToLabel == nil {
+		rfm.fieldToLabel = make(map[string][]string)
+	}
+
+	if _, ok := rfm.fieldToLabel[source]; ok {
+		return fmt.Errorf("source: %s is already mapped to a set of labels", source)
+	}
+
+	// ensure all labels are unique, and ensure non-zero length
+	labels = toUnique(source, labels)
+	rfm.fieldToLabel[source] = labels
+
+	if rfm.labelToField == nil {
+		rfm.labelToField = make(map[string]string)
+	}
+	for _, label := range labels {
+		// overlap check -- clear out previously written mappings, return an error with conflict
+		if l, ok := rfm.labelToField[label]; ok {
+			for _, ll := range labels {
+				delete(rfm.labelToField, ll)
+			}
+
+			return fmt.Errorf("label %s is already mapped to source %s", label, l)
+		}
+
+		rfm.labelToField[label] = source
+	}
+	return nil
+}
+
+// toUnique ensures that all labels are unique within the provided slices, and will add the source
+// string to the list if the provided labels are empty
+func toUnique(source string, labels []string) []string {
+	set := make(map[string]struct{}, len(labels))
+	for _, label := range labels {
+		if label != "" {
+			set[label] = struct{}{}
+		}
+	}
+	result := sliceutil.SeqToSlice(maps.Keys(set))
+	if len(result) == 0 {
+		result = append(result, source)
+	}
+
+	return result
+}

+ 53 - 89
core/pkg/source/queryresult.go

@@ -24,54 +24,6 @@ func (qrc QueryResultsChan) Await() ([]*QueryResult, error) {
 	return results.Results, nil
 }
 
-// ResultKeys is a "configuration" struct that contains the keys/labels used to resolve labeled query
-// results. ResultKeys can be defined with every QueryResults instance if necessary, and alter the keys
-// used to fetch results when calling the following methods on QueryResults:
-//
-//	GetCluster()
-//	GetNamespace()
-//	GetNode()
-//	GetInstance()
-//	GetInstanceType()
-//	GetContainer()
-//	GetPod()
-//	GetProviderID()
-//	GetDevice()
-type ResultKeys struct {
-	ClusterKey      string
-	NamespaceKey    string
-	NodeKey         string
-	InstanceKey     string
-	InstanceTypeKey string
-	ContainerKey    string
-	PodKey          string
-	ProviderIDKey   string
-	DeviceKey       string
-}
-
-// DefaultResultKeys returns a new ResultKeys instance with typical default values.
-func DefaultResultKeys() *ResultKeys {
-	return &ResultKeys{
-		ClusterKey:      ClusterIDLabel,
-		NamespaceKey:    NamespaceLabel,
-		NodeKey:         NodeLabel,
-		InstanceKey:     InstanceLabel,
-		InstanceTypeKey: InstanceTypeLabel,
-		ContainerKey:    ContainerLabel,
-		PodKey:          PodLabel,
-		ProviderIDKey:   ProviderIDLabel,
-		DeviceKey:       DeviceLabel,
-	}
-}
-
-// ClusterKeyWithDefaults returns a new ResultKeys instance with the provided cluster key and the
-// rest of the keys set to their default values.
-func ClusterKeyWithDefaults(clusterKey string) *ResultKeys {
-	keys := DefaultResultKeys()
-	keys.ClusterKey = clusterKey
-	return keys
-}
-
 // QueryResults contains all of the query results and the source query string.
 type QueryResults struct {
 	Query   string
@@ -91,75 +43,80 @@ type QueryResult struct {
 	Metric map[string]interface{} `json:"metric"`
 	Values []*util.Vector         `json:"values"`
 
-	keys *ResultKeys
+	// field mapper resolves the lookup keys for a specific field
+	fieldMapper FieldMapper
 }
 
-func NewQueryResult(metrics map[string]any, values []*util.Vector, keys *ResultKeys) *QueryResult {
+func NewQueryResult(metrics map[string]any, values []*util.Vector, keys FieldMapper) *QueryResult {
 	if keys == nil {
-		keys = DefaultResultKeys()
+		keys = NewNoOpFieldMapper()
 	}
 
 	return &QueryResult{
-		Metric: metrics,
-		Values: values,
-		keys:   keys,
+		Metric:      metrics,
+		Values:      values,
+		fieldMapper: keys,
 	}
 }
 
 func (qr *QueryResult) GetCluster() (string, error) {
-	return qr.GetString(qr.keys.ClusterKey)
+	labels := qr.fieldMapper.LabelsFor(ClusterIDLabel)
+	return qr.firstOf(labels...)
 }
 
 func (qr *QueryResult) GetNamespace() (string, error) {
-	return qr.GetString(qr.keys.NamespaceKey)
+	labels := qr.fieldMapper.LabelsFor(NamespaceLabel)
+	return qr.firstOf(labels...)
 }
 
 func (qr *QueryResult) GetNode() (string, error) {
-	return qr.GetString(qr.keys.NodeKey)
+	labels := qr.fieldMapper.LabelsFor(NodeLabel)
+	return qr.firstOf(labels...)
 }
 
 func (qr *QueryResult) GetInstance() (string, error) {
-	return qr.GetString(qr.keys.InstanceKey)
+	labels := qr.fieldMapper.LabelsFor(InstanceLabel)
+	return qr.firstOf(labels...)
 }
 
 func (qr *QueryResult) GetInstanceType() (string, error) {
-	return qr.GetString(qr.keys.InstanceTypeKey)
+	labels := qr.fieldMapper.LabelsFor(InstanceTypeLabel)
+	return qr.firstOf(labels...)
 }
 
 func (qr *QueryResult) GetContainer() (string, error) {
-	value, err := qr.GetString(qr.keys.ContainerKey)
-	if value == "" || err != nil {
-		alternate, e := qr.GetString(qr.keys.ContainerKey + "_name")
-		if alternate == "" || e != nil {
-			return "", fmt.Errorf("'%s' and '%s' fields do not exist in data result vector", qr.keys.ContainerKey, qr.keys.ContainerKey+"_name")
-		}
-		return alternate, nil
-	}
-	return value, nil
+	labels := qr.fieldMapper.LabelsFor(ContainerLabel)
+	return qr.firstOf(labels...)
 }
 
 func (qr *QueryResult) GetPod() (string, error) {
-	value, err := qr.GetString(qr.keys.PodKey)
-	if value == "" || err != nil {
-		alternate, e := qr.GetString(qr.keys.PodKey + "_name")
-		if alternate == "" || e != nil {
-			return "", fmt.Errorf("'%s' and '%s' fields do not exist in data result vector", qr.keys.PodKey, qr.keys.PodKey+"_name")
-		}
-		return alternate, nil
-	}
-	return value, nil
+	labels := qr.fieldMapper.LabelsFor(PodLabel)
+	return qr.firstOf(labels...)
 }
 
 func (qr *QueryResult) GetProviderID() (string, error) {
-	return qr.GetString(qr.keys.ProviderIDKey)
+	labels := qr.fieldMapper.LabelsFor(ProviderIDLabel)
+	return qr.firstOf(labels...)
 }
 
 func (qr *QueryResult) GetDevice() (string, error) {
-	return qr.GetString(qr.keys.DeviceKey)
+	labels := qr.fieldMapper.LabelsFor(DeviceLabel)
+	return qr.firstOf(labels...)
 }
 
-// GetString returns the requested field, or an error if it does not exist
-func (qr *QueryResult) GetString(field string) (string, error) {
+// firstOf returns the first non-empty getResolvedString result from the provided list of fields
+func (qr *QueryResult) firstOf(fields ...string) (string, error) {
+	for _, field := range fields {
+		value, err := qr.getResolvedString(field)
+		if value != "" && err == nil {
+			return value, nil
+		}
+	}
+	return "", fmt.Errorf("none of the fields %v exist in data result vector", fields)
+}
+
+// getResolvedString returns the requested field, or an error if it does not exist
+func (qr *QueryResult) getResolvedString(field string) (string, error) {
 	f, ok := qr.Metric[field]
 	if !ok {
 		return "", fmt.Errorf("'%s' field does not exist in data result vector", field)
@@ -173,19 +130,26 @@ func (qr *QueryResult) GetString(field string) (string, error) {
 	return strField, nil
 }
 
+// GetString returns the requested field, or an error if it does not exist
+func (qr *QueryResult) GetString(field string) (string, error) {
+	// attempt to resolve the provided field. if we fail, assume the field is resolved!
+	fields, err := qr.fieldMapper.Resolve(field)
+	if err != nil {
+		return qr.getResolvedString(field)
+	}
+
+	// otherwise, return the first resolved field
+	return qr.firstOf(fields...)
+}
+
 // GetStrings returns the requested fields, or an error if it does not exist
 func (qr *QueryResult) GetStrings(fields ...string) (map[string]string, error) {
 	values := map[string]string{}
 
 	for _, field := range fields {
-		f, ok := qr.Metric[field]
-		if !ok {
-			return nil, fmt.Errorf("'%s' field does not exist in data result vector", field)
-		}
-
-		value, ok := f.(string)
-		if !ok {
-			return nil, fmt.Errorf("'%s' field is improperly formatted and cannot be converted to string", field)
+		value, err := qr.GetString(field)
+		if err != nil {
+			return nil, err
 		}
 
 		values[field] = value

+ 251 - 20
modules/prometheus-source/pkg/env/promenv.go

@@ -6,11 +6,45 @@ import (
 	"time"
 
 	"github.com/opencost/opencost/core/pkg/env"
+	"github.com/opencost/opencost/core/pkg/source"
 )
 
 const (
 	PrometheusServerEndpointEnvVar = "PROMETHEUS_SERVER_ENDPOINT"
 
+	PromClusterIDLabelEnvVar       = "PROM_CLUSTER_ID_LABEL"
+	PromNamespaceLabelEnvVar       = "PROM_NAMESPACE_LABEL"
+	PromNodeLabelEnvVar            = "PROM_NODE_LABEL"
+	PromInstanceLabelEnvVar        = "PROM_INSTANCE_LABEL"
+	PromInstanceTypeLabelEnvVar    = "PROM_INSTANCE_TYPE_LABEL"
+	PromContainerLabelEnvVar       = "PROM_CONTAINER_LABEL"
+	PromPodLabelEnvVar             = "PROM_POD_LABEL"
+	PromProviderIDLabelEnvVar      = "PROM_PROVIDER_ID_LABEL"
+	PromDeviceLabelEnvVar          = "PROM_DEVICE_LABEL"
+	PromPVCLabelEnvVar             = "PROM_PVC_LABEL"
+	PromPVLabelEnvVar              = "PROM_PV_LABEL"
+	PromStorageClassLabelEnvVar    = "PROM_STORAGE_CLASS_LABEL"
+	PromVolumeNameLabelEnvVar      = "PROM_VOLUME_NAME_LABEL"
+	PromServiceLabelEnvVar         = "PROM_SERVICE_LABEL"
+	PromServiceNameLabelEnvVar     = "PROM_SERVICE_NAME_LABEL"
+	PromIngressIPLabelEnvVar       = "PROM_INGRESS_IP_LABEL"
+	PromProvisionerNameLabelEnvVar = "PROM_PROVISIONER_NAME_LABEL"
+	PromUIDLabelEnvVar             = "PROM_UID_LABEL"
+	PromKubernetesNodeLabelEnvVar  = "PROM_KUBERNETES_NODE_LABEL"
+	PromModeLabelEnvVar            = "PROM_MODE_LABEL"
+	PromModelNameLabelEnvVar       = "PROM_MODEL_NAME_LABEL"
+	PromUUIDLabelEnvVar            = "PROM_UUID_LABEL"
+	PromResourceLabelEnvVar        = "PROM_RESOURCE_LABEL"
+	PromDeploymentLabelEnvVar      = "PROM_DEPLOYMENT_LABEL"
+	PromStatefulSetLabelEnvVar     = "PROM_STATEFUL_SET_LABEL"
+	PromReplicaSetLabelEnvVar      = "PROM_REPLICA_SET_LABEL"
+	PromOwnerNameLabelEnvVar       = "PROM_OWNER_NAME_LABEL"
+	PromOwnerKindLabelEnvVar       = "PROM_OWNER_KIND_LABEL"
+	PromUnitLabelEnvVar            = "PROM_UNIT_LABEL"
+	PromInternetLabelEnvVar        = "PROM_INTERNET_LABEL"
+	PromSameZoneLabelEnvVar        = "PROM_SAME_ZONE_LABEL"
+	PromSameRegionLabelEnvVar      = "PROM_SAME_REGION_LABEL"
+
 	PrometheusRetryOnRateLimitResponseEnvVar    = "PROMETHEUS_RETRY_ON_RATE_LIMIT"
 	PrometheusRetryOnRateLimitMaxRetriesEnvVar  = "PROMETHEUS_RETRY_ON_RATE_LIMIT_MAX_RETRIES"
 	PrometheusRetryOnRateLimitDefaultWaitEnvVar = "PROMETHEUS_RETRY_ON_RATE_LIMIT_DEFAULT_WAIT"
@@ -18,13 +52,13 @@ const (
 	PrometheusQueryTimeoutEnvVar        = "PROMETHEUS_QUERY_TIMEOUT"
 	PrometheusKeepAliveEnvVar           = "PROMETHEUS_KEEP_ALIVE"
 	PrometheusTLSHandshakeTimeoutEnvVar = "PROMETHEUS_TLS_HANDSHAKE_TIMEOUT"
-	ScrapeIntervalEnvVar                = "KUBECOST_SCRAPE_INTERVAL"
+	PrometheusScrapeJobNameEnvVar       = "PROMETHEUS_SCRAPE_JOB_NAME"
+	PrometheusScrapeIntervalEnvVar      = "PROMETHEUS_SCRAPE_INTERVAL"
 
 	PrometheusMaxQueryDurationMinutesEnvVar = "PROMETHEUS_MAX_QUERY_DURATION_MINUTES"
 	PrometheusQueryResolutionSecondsEnvVar  = "PROMETHEUS_QUERY_RESOLUTION_SECONDS"
 
 	MaxQueryConcurrencyEnvVar = "MAX_QUERY_CONCURRENCY"
-	PromClusterIDLabelEnvVar  = "PROM_CLUSTER_ID_LABEL"
 
 	PrometheusHeaderXScopeOrgIdEnvVar = "PROMETHEUS_HEADER_X_SCOPE_ORGID"
 	InsecureSkipVerifyEnvVar          = "INSECURE_SKIP_VERIFY"
@@ -36,11 +70,19 @@ const (
 
 	CurrentClusterIdFilterEnabledVar = "CURRENT_CLUSTER_ID_FILTER_ENABLED"
 
-	KubecostJobNameEnvVar = "KUBECOST_JOB_NAME"
+	// Deprecated env vars that we can use to fallback on temporarily
+	DeprecatedScrapeIntervalEnvVar = "KUBECOST_SCRAPE_INTERVAL"
+	DeprecatedJobNameEnvVar        = "KUBECOST_JOB_NAME"
 )
 
+// GetPrometheusServerEndpoint returns the environment variable value for PrometheusServerEndpointEnvVar which
+// represents the prometheus server endpoint used to execute prometheus queries.
+//
 // In sharded Prometheus setups, PROMETHEUS_SERVER_ENDPOINT should point to a global query endpoint (e.g., Thanos Query, Cortex, or Mimir)
 // to ensure OpenCost receives complete data. Pointing to a single Prometheus pod may result in incomplete or intermittent export results.
+func GetPrometheusServerEndpoint() string {
+	return env.Get(PrometheusServerEndpointEnvVar, "")
+}
 
 // IsPrometheusRetryOnRateLimitResponse will attempt to retry if a 429 response is received OR a 400 with a body containing
 // ThrottleException (common in AWS services like AMP)
@@ -69,14 +111,20 @@ func GetPrometheusHeaderXScopeOrgId() string {
 	return env.Get(PrometheusHeaderXScopeOrgIdEnvVar, "")
 }
 
-// GetPrometheusServerEndpoint returns the environment variable value for PrometheusServerEndpointEnvVar which
-// represents the prometheus server endpoint used to execute prometheus queries.
-func GetPrometheusServerEndpoint() string {
-	return env.Get(PrometheusServerEndpointEnvVar, "")
+// GetScrapeInterval returns the environment variable for PrometheusScrapeIntervalEnvVar, specifying the scrape interval for Prometheus,
+// should opencost not be able to deduce the configuration automatically.
+func GetScrapeInterval() time.Duration {
+	// return the current scrape interval env var, fallback on deprecated env var, default to 60s
+	// (as per the helm installation defaults)
+	return env.GetDuration(PrometheusScrapeIntervalEnvVar, env.GetDuration(DeprecatedScrapeIntervalEnvVar, 60*time.Second))
 }
 
-func GetScrapeInterval() time.Duration {
-	return env.GetDuration(ScrapeIntervalEnvVar, 0)
+// GetJobName returns the environment variable value for PrometheusScrapeJobNameEnvVar, specifying which job name
+// is used for prometheus to scrape the provided metrics.
+func GetJobName() string {
+	// return the current job name env var, fallback on deprecated env var, default to "opencost"
+	// (as per the helm installation defaults)
+	return env.Get(PrometheusScrapeJobNameEnvVar, env.Get(DeprecatedJobNameEnvVar, "opencost"))
 }
 
 func GetPrometheusQueryTimeout() time.Duration {
@@ -91,12 +139,6 @@ func GetPrometheusTLSHandshakeTimeout() time.Duration {
 	return env.GetDuration(PrometheusTLSHandshakeTimeoutEnvVar, 10*time.Second)
 }
 
-// GetJobName returns the environment variable value for JobNameEnvVar, specifying which job name
-// is used for prometheus to scrape the provided metrics.
-func GetJobName() string {
-	return env.Get(KubecostJobNameEnvVar, "kubecost")
-}
-
 func IsInsecureSkipVerify() bool {
 	return env.GetBool(InsecureSkipVerifyEnvVar, false)
 }
@@ -142,11 +184,6 @@ func GetPrometheusMaxQueryDuration() time.Duration {
 	return mins * time.Minute
 }
 
-// GetPromClusterLabel returns the environment variable value for PromClusterIDLabel
-func GetPromClusterLabel() string {
-	return env.Get(PromClusterIDLabelEnvVar, "cluster_id")
-}
-
 // GetPromClusterFilter returns environment variable value CurrentClusterIdFilterEnabledVar which
 // represents additional prometheus filter for all metrics for current cluster id
 func GetPromClusterFilter() string {
@@ -155,3 +192,197 @@ func GetPromClusterFilter() string {
 	}
 	return ""
 }
+
+// GetPromClusterLabel returns the environment variable value for PromClusterIDLabel: `PROM_CLUSTER_ID_LABEL`
+// Prometheus query formatting and results parsers will use this label to determine the ClusterID
+func GetPromClusterLabel() string {
+	return env.Get(PromClusterIDLabelEnvVar, source.ClusterIDLabel)
+}
+
+// GetPromNamespaceLabel returns the value set by PromNamespaceLabelEnvVar: `PROM_NAMESPACE_LABEL`
+// Prometheus query formatting and results parsers will use this label to determine the Namespace.
+func GetPromNamespaceLabel() []string {
+	return GetListWithDefaults(PromNamespaceLabelEnvVar, source.NamespaceLabel)
+}
+
+// GetPromNodeLabel returns the value set by PromNodeLabelEnvVar: `PROM_NODE_LABEL`
+// Prometheus query formatting and results parsers will use this label to determine the Node.
+func GetPromNodeLabel() []string {
+	return GetListWithDefaults(PromNodeLabelEnvVar, source.NodeLabel)
+}
+
+// GetPromInstanceLabel returns the value set by PromInstanceLabelEnvVar: `PROM_INSTANCE_LABEL`
+// Prometheus query formatting and results parsers will use this label to determine the Instance.
+func GetPromInstanceLabel() []string {
+	return GetListWithDefaults(PromInstanceLabelEnvVar, source.InstanceLabel)
+}
+
+// GetPromInstanceTypeLabel returns the value set by PromInstanceTypeLabelEnvVar: `PROM_INSTANCE_TYPE_LABEL`
+// Prometheus query formatting and results parsers will use this label to determine the InstanceType.
+func GetPromInstanceTypeLabel() []string {
+	return GetListWithDefaults(PromInstanceTypeLabelEnvVar, source.InstanceTypeLabel)
+}
+
+// GetPromContainerLabel returns the value set by PromContainerLabelEnvVar: `PROM_CONTAINER_LABEL`
+// Prometheus query formatting and results parsers will use this label to determine the Container.
+func GetPromContainerLabel() []string {
+	return GetListWithDefaults(PromContainerLabelEnvVar, source.ContainerLabel, source.ContainerNameLabel)
+}
+
+// GetPromPodLabel returns the value set by PromPodLabelEnvVar: `PROM_POD_LABEL`
+// Prometheus query formatting and results parsers will use this label to determine the Pod.
+func GetPromPodLabel() []string {
+	return GetListWithDefaults(PromPodLabelEnvVar, source.PodLabel, source.PodNameLabel)
+}
+
+// GetPromProviderIDLabel returns the value set by PromProviderIDLabelEnvVar: `PROM_PROVIDER_ID_LABEL`
+// Prometheus query formatting and results parsers will use this label to determine the ProviderID.
+func GetPromProviderIDLabel() []string {
+	return GetListWithDefaults(PromProviderIDLabelEnvVar, source.ProviderIDLabel)
+}
+
+// GetPromDeviceLabel returns the value set by PromDeviceLabelEnvVar: `PROM_DEVICE_LABEL`
+// Prometheus query formatting and results parsers will use this label to determine the Device.
+func GetPromDeviceLabel() []string {
+	return GetListWithDefaults(PromDeviceLabelEnvVar, source.DeviceLabel)
+}
+
+// GetPromPVCLabel returns the value set by PromPVCLabelEnvVar: `PROM_PVC_LABEL`
+// Prometheus query formatting and results parsers will use this label to determine the PVC.
+func GetPromPVCLabel() []string {
+	return GetListWithDefaults(PromPVCLabelEnvVar, source.PVCLabel)
+}
+
+// GetPromPVLabel returns the value set by PromPVLabelEnvVar: `PROM_PV_LABEL`
+// Prometheus query formatting and results parsers will use this label to determine the PV.
+func GetPromPVLabel() []string {
+	return GetListWithDefaults(PromPVLabelEnvVar, source.PVLabel)
+}
+
+// GetPromStorageClassLabel returns the value set by PromStorageClassLabelEnvVar: `PROM_STORAGE_CLASS_LABEL`
+// Prometheus query formatting and results parsers will use this label to determine the StorageClass.
+func GetPromStorageClassLabel() []string {
+	return GetListWithDefaults(PromStorageClassLabelEnvVar, source.StorageClassLabel)
+}
+
+// GetPromVolumeNameLabel returns the value set by PromVolumeNameLabelEnvVar: `PROM_VOLUME_NAME_LABEL`
+// Prometheus query formatting and results parsers will use this label to determine the VolumeName.
+func GetPromVolumeNameLabel() []string {
+	return GetListWithDefaults(PromVolumeNameLabelEnvVar, source.VolumeNameLabel)
+}
+
+// GetPromServiceLabel returns the value set by PromServiceLabelEnvVar: `PROM_SERVICE_LABEL`
+// Prometheus query formatting and results parsers will use this label to determine the Service.
+func GetPromServiceLabel() []string {
+	return GetListWithDefaults(PromServiceLabelEnvVar, source.ServiceLabel, source.ServiceNameLabel)
+}
+
+// GetPromIngressIPLabel returns the value set by PromIngressIPLabelEnvVar: `PROM_INGRESS_IP_LABEL`
+// Prometheus query formatting and results parsers will use this label to determine the IngressIP.
+func GetPromIngressIPLabel() []string {
+	return GetListWithDefaults(PromIngressIPLabelEnvVar, source.IngressIPLabel)
+}
+
+// GetPromProvisionerNameLabel returns the value set by PromProvisionerNameLabelEnvVar: `PROM_PROVISIONER_NAME_LABEL`
+// Prometheus query formatting and results parsers will use this label to determine the ProvisionerName.
+func GetPromProvisionerNameLabel() []string {
+	return GetListWithDefaults(PromProvisionerNameLabelEnvVar, source.ProvisionerNameLabel)
+}
+
+// GetPromUIDLabel returns the value set by PromUIDLabelEnvVar: `PROM_UID_LABEL`
+// Prometheus query formatting and results parsers will use this label to determine the UID.
+func GetPromUIDLabel() []string {
+	return GetListWithDefaults(PromUIDLabelEnvVar, source.UIDLabel)
+}
+
+// GetPromKubernetesNodeLabel returns the value set by PromKubernetesNodeLabelEnvVar: `PROM_KUBERNETES_NODE_LABEL`
+// Prometheus query formatting and results parsers will use this label to determine the KubernetesNode.
+func GetPromKubernetesNodeLabel() []string {
+	return GetListWithDefaults(PromKubernetesNodeLabelEnvVar, source.KubernetesNodeLabel)
+}
+
+// GetPromModeLabel returns the value set by PromModeLabelEnvVar: `PROM_MODE_LABEL`
+// Prometheus query formatting and results parsers will use this label to determine the Mode.
+func GetPromModeLabel() []string {
+	return GetListWithDefaults(PromModeLabelEnvVar, source.ModeLabel)
+}
+
+// GetPromModelNameLabel returns the value set by PromModelNameLabelEnvVar: `PROM_MODEL_NAME_LABEL`
+// Prometheus query formatting and results parsers will use this label to determine the ModelName.
+func GetPromModelNameLabel() []string {
+	return GetListWithDefaults(PromModelNameLabelEnvVar, source.ModelNameLabel)
+}
+
+// GetPromUUIDLabel returns the value set by PromUUIDLabelEnvVar: `PROM_UUID_LABEL`
+// Prometheus query formatting and results parsers will use this label to determine the UUID.
+func GetPromUUIDLabel() []string {
+	return GetListWithDefaults(PromUUIDLabelEnvVar, source.UUIDLabel)
+}
+
+// GetPromResourceLabel returns the value set by PromResourceLabelEnvVar: `PROM_RESOURCE_LABEL`
+// Prometheus query formatting and results parsers will use this label to determine the Resource.
+func GetPromResourceLabel() []string {
+	return GetListWithDefaults(PromResourceLabelEnvVar, source.ResourceLabel)
+}
+
+// GetPromDeploymentLabel returns the value set by PromDeploymentLabelEnvVar: `PROM_DEPLOYMENT_LABEL`
+// Prometheus query formatting and results parsers will use this label to determine the Deployment.
+func GetPromDeploymentLabel() []string {
+	return GetListWithDefaults(PromDeploymentLabelEnvVar, source.DeploymentLabel)
+}
+
+// GetPromStatefulSetLabel returns the value set by PromStatefulSetLabelEnvVar: `PROM_STATEFUL_SET_LABEL`
+// Prometheus query formatting and results parsers will use this label to determine the StatefulSet.
+func GetPromStatefulSetLabel() []string {
+	return GetListWithDefaults(PromStatefulSetLabelEnvVar, source.StatefulSetLabel)
+}
+
+// GetPromReplicaSetLabel returns the value set by PromReplicaSetLabelEnvVar: `PROM_REPLICA_SET_LABEL`
+// Prometheus query formatting and results parsers will use this label to determine the ReplicaSet.
+func GetPromReplicaSetLabel() []string {
+	return GetListWithDefaults(PromReplicaSetLabelEnvVar, source.ReplicaSetLabel)
+}
+
+// GetPromOwnerNameLabel returns the value set by PromOwnerNameLabelEnvVar: `PROM_OWNER_NAME_LABEL`
+// Prometheus query formatting and results parsers will use this label to determine the OwnerName.
+func GetPromOwnerNameLabel() []string {
+	return GetListWithDefaults(PromOwnerNameLabelEnvVar, source.OwnerNameLabel)
+}
+
+// GetPromOwnerKindLabel returns the value set by PromOwnerKindLabelEnvVar: `PROM_OWNER_KIND_LABEL`
+// Prometheus query formatting and results parsers will use this label to determine the OwnerKind.
+func GetPromOwnerKindLabel() []string {
+	return GetListWithDefaults(PromOwnerKindLabelEnvVar, source.OwnerKindLabel)
+}
+
+// GetPromUnitLabel returns the value set by PromUnitLabelEnvVar: `PROM_UNIT_LABEL`
+// Prometheus query formatting and results parsers will use this label to determine the Unit.
+func GetPromUnitLabel() []string {
+	return GetListWithDefaults(PromUnitLabelEnvVar, source.UnitLabel)
+}
+
+// GetPromInternetLabel returns the value set by PromInternetLabelEnvVar: `PROM_INTERNET_LABEL`
+// Prometheus query formatting and results parsers will use this label to determine the Internet.
+func GetPromInternetLabel() []string {
+	return GetListWithDefaults(PromInternetLabelEnvVar, source.InternetLabel)
+}
+
+// GetPromSameZoneLabel returns the value set by PromSameZoneLabelEnvVar: `PROM_SAME_ZONE_LABEL`
+// Prometheus query formatting and results parsers will use this label to determine the SameZone.
+func GetPromSameZoneLabel() []string {
+	return GetListWithDefaults(PromSameZoneLabelEnvVar, source.SameZoneLabel)
+}
+
+// GetPromSameRegionLabel returns the value set by PromSameRegionLabelEnvVar: `PROM_SAME_REGION_LABEL`
+// Prometheus query formatting and results parsers will use this label to determine the SameRegion.
+func GetPromSameRegionLabel() []string {
+	return GetListWithDefaults(PromSameRegionLabelEnvVar, source.SameRegionLabel)
+}
+
+func GetListWithDefaults(key string, defaultValues ...string) []string {
+	list := env.GetList(key, ",")
+	if len(list) == 0 {
+		return defaultValues
+	}
+	return list
+}

+ 48 - 1
modules/prometheus-source/pkg/prom/config.go

@@ -7,6 +7,7 @@ import (
 
 	coreenv "github.com/opencost/opencost/core/pkg/env"
 	"github.com/opencost/opencost/core/pkg/log"
+	"github.com/opencost/opencost/core/pkg/source"
 	"github.com/opencost/opencost/modules/prometheus-source/pkg/env"
 
 	restclient "k8s.io/client-go/rest"
@@ -17,6 +18,48 @@ const (
 	ServiceCA = `/var/run/secrets/kubernetes.io/serviceaccount/service-ca.crt`
 )
 
+func NewPrometheusLabelMappingFromEnv() source.FieldMapper {
+	check := func(err error) {
+		if err != nil {
+			panic(fmt.Sprintf("Failed to create PrometheusLabelMapping from environment: %s", err))
+		}
+	}
+
+	rfm := source.NewReverseFieldMapper()
+	check(rfm.Set(source.ClusterIDLabel, env.GetPromClusterLabel()))
+	check(rfm.Set(source.NamespaceLabel, env.GetPromNamespaceLabel()...))
+	check(rfm.Set(source.NodeLabel, env.GetPromNodeLabel()...))
+	check(rfm.Set(source.InstanceLabel, env.GetPromInstanceLabel()...))
+	check(rfm.Set(source.InstanceTypeLabel, env.GetPromInstanceTypeLabel()...))
+	check(rfm.Set(source.ContainerLabel, env.GetPromContainerLabel()...))
+	check(rfm.Set(source.PodLabel, env.GetPromPodLabel()...))
+	check(rfm.Set(source.ProviderIDLabel, env.GetPromProviderIDLabel()...))
+	check(rfm.Set(source.DeviceLabel, env.GetPromDeviceLabel()...))
+	check(rfm.Set(source.PVCLabel, env.GetPromPVCLabel()...))
+	check(rfm.Set(source.PVLabel, env.GetPromPVLabel()...))
+	check(rfm.Set(source.StorageClassLabel, env.GetPromStorageClassLabel()...))
+	check(rfm.Set(source.VolumeNameLabel, env.GetPromVolumeNameLabel()...))
+	check(rfm.Set(source.ServiceLabel, env.GetPromServiceLabel()...))
+	check(rfm.Set(source.IngressIPLabel, env.GetPromIngressIPLabel()...))
+	check(rfm.Set(source.ProvisionerNameLabel, env.GetPromProvisionerNameLabel()...))
+	check(rfm.Set(source.UIDLabel, env.GetPromUIDLabel()...))
+	check(rfm.Set(source.KubernetesNodeLabel, env.GetPromKubernetesNodeLabel()...))
+	check(rfm.Set(source.ModeLabel, env.GetPromModeLabel()...))
+	check(rfm.Set(source.ModelNameLabel, env.GetPromModelNameLabel()...))
+	check(rfm.Set(source.UUIDLabel, env.GetPromUUIDLabel()...))
+	check(rfm.Set(source.ResourceLabel, env.GetPromResourceLabel()...))
+	check(rfm.Set(source.DeploymentLabel, env.GetPromDeploymentLabel()...))
+	check(rfm.Set(source.StatefulSetLabel, env.GetPromStatefulSetLabel()...))
+	check(rfm.Set(source.ReplicaSetLabel, env.GetPromReplicaSetLabel()...))
+	check(rfm.Set(source.OwnerNameLabel, env.GetPromOwnerNameLabel()...))
+	check(rfm.Set(source.OwnerKindLabel, env.GetPromOwnerKindLabel()...))
+	check(rfm.Set(source.UnitLabel, env.GetPromUnitLabel()...))
+	check(rfm.Set(source.InternetLabel, env.GetPromInternetLabel()...))
+	check(rfm.Set(source.SameZoneLabel, env.GetPromSameZoneLabel()...))
+	check(rfm.Set(source.SameRegionLabel, env.GetPromSameRegionLabel()...))
+	return rfm
+}
+
 type OpenCostPrometheusConfig struct {
 	ServerEndpoint        string
 	Version               string
@@ -32,6 +75,7 @@ type OpenCostPrometheusConfig struct {
 	ClusterFilter         string
 	DataResolution        time.Duration
 	DataResolutionMinutes int
+	LabelMapping          source.FieldMapper
 }
 
 func (ocpc *OpenCostPrometheusConfig) IsRateLimitRetryEnabled() bool {
@@ -46,7 +90,7 @@ func NewOpenCostPrometheusConfigFromEnv() (*OpenCostPrometheusConfig, error) {
 	}
 
 	queryConcurrency := env.GetMaxQueryConcurrency()
-	log.Infof("Prometheus Client Max Concurrency set to %d", queryConcurrency)
+	log.Debugf("[Prometheus]: Client Max Concurrency set to: %d", queryConcurrency)
 
 	timeout := env.GetPrometheusQueryTimeout()
 	keepAlive := env.GetPrometheusKeepAlive()
@@ -98,6 +142,8 @@ func NewOpenCostPrometheusConfigFromEnv() (*OpenCostPrometheusConfig, error) {
 		resolutionMinutes = 1
 	}
 
+	labelMapping := NewPrometheusLabelMappingFromEnv()
+
 	clientConfig := &PrometheusClientConfig{
 		Timeout:               timeout,
 		KeepAlive:             keepAlive,
@@ -126,5 +172,6 @@ func NewOpenCostPrometheusConfigFromEnv() (*OpenCostPrometheusConfig, error) {
 		ClusterFilter:         clusterFilter,
 		DataResolution:        dataResolution,
 		DataResolutionMinutes: resolutionMinutes,
+		LabelMapping:          labelMapping,
 	}, nil
 }

+ 1 - 1
modules/prometheus-source/pkg/prom/datasource.go

@@ -149,7 +149,7 @@ func NewPrometheusDataSource(infoProvider clusters.ClusterInfoProvider, promConf
 		}
 	}
 
-	log.Infof("Using scrape interval of %f", promConfig.ScrapeInterval.Seconds())
+	log.Debugf("[Prometheus]: Resolved Scrape Interval to: %fs", promConfig.ScrapeInterval.Seconds())
 
 	promContexts := NewContextFactory(promClient, promConfig)
 

+ 4 - 3
modules/prometheus-source/pkg/prom/helpers.go

@@ -3,6 +3,7 @@ package prom
 import (
 	"context"
 	"fmt"
+	"strings"
 	"time"
 
 	prometheus "github.com/prometheus/client_golang/api"
@@ -45,15 +46,15 @@ func ScrapeIntervalFor(client prometheus.Client, jobName string) (time.Duration,
 	}
 
 	for _, sc := range cfg.ScrapeConfigs {
-		if sc.JobName == jobName {
+		if strings.EqualFold(sc.JobName, jobName) {
 			if sc.ScrapeInterval != "" {
 				si := sc.ScrapeInterval
 				sid, err := time.ParseDuration(si)
 				if err != nil {
 					return 0, fmt.Errorf("Error parsing scrape config for %s", sc.JobName)
-				} else {
-					return sid, nil
 				}
+
+				return sid, nil
 			}
 		}
 	}

+ 4 - 13
modules/prometheus-source/pkg/prom/query.go

@@ -174,10 +174,7 @@ func (ctx *Context) QuerySync(query string) ([]*source.QueryResult, v1.Warnings,
 		return nil, warnings, err
 	}
 
-	// create result keys from custom cluster label
-	resultKeys := source.ClusterKeyWithDefaults(ctx.config.ClusterLabel)
-
-	results := NewQueryResults(query, raw, resultKeys)
+	results := NewQueryResults(query, raw, ctx.config.LabelMapping)
 	if results.Error != nil {
 		return nil, warnings, results.Error
 	}
@@ -204,9 +201,7 @@ func runQuery(query string, ctx *Context, resCh source.QueryResultsChan, t time.
 	if requestError != nil {
 		results = NewQueryResultError(query, requestError)
 	} else {
-		// create result keys from custom cluster label
-		resultKeys := source.ClusterKeyWithDefaults(ctx.config.ClusterLabel)
-		results = NewQueryResults(query, raw, resultKeys)
+		results = NewQueryResults(query, raw, ctx.config.LabelMapping)
 
 		parseError = results.Error
 	}
@@ -329,9 +324,7 @@ func (ctx *Context) QueryRangeSync(query string, start, end time.Time, step time
 		return nil, warnings, err
 	}
 
-	// create result keys from custom cluster label
-	resultKeys := source.ClusterKeyWithDefaults(ctx.config.ClusterLabel)
-	results := NewQueryResults(query, raw, resultKeys)
+	results := NewQueryResults(query, raw, ctx.config.LabelMapping)
 	if results.Error != nil {
 		return nil, warnings, results.Error
 	}
@@ -358,9 +351,7 @@ func runQueryRange(query string, start, end time.Time, step time.Duration, ctx *
 	if requestError != nil {
 		results = NewQueryResultError(query, requestError)
 	} else {
-		// create result keys from custom cluster label
-		resultKeys := source.ClusterKeyWithDefaults(ctx.config.ClusterLabel)
-		results = NewQueryResults(query, raw, resultKeys)
+		results = NewQueryResults(query, raw, ctx.config.LabelMapping)
 
 		parseError = results.Error
 	}

+ 13 - 0
modules/prometheus-source/pkg/prom/queryformatter.go

@@ -0,0 +1,13 @@
+package prom
+
+import "github.com/opencost/opencost/core/pkg/source"
+
+type PrometheusQueryFormatter struct {
+	fieldMapper source.FieldMapper
+}
+
+func NewPrometheusQueryFormatter(fieldMapper source.FieldMapper) *PrometheusQueryFormatter {
+	return &PrometheusQueryFormatter{
+		fieldMapper: fieldMapper,
+	}
+}

+ 2 - 2
modules/prometheus-source/pkg/prom/result.go

@@ -70,7 +70,7 @@ func NewQueryResultError(query string, err error) *source.QueryResults {
 
 // NewQueryResults accepts the raw prometheus query result and returns an array of
 // QueryResult objects
-func NewQueryResults(query string, queryResult interface{}, resultKeys *source.ResultKeys) *source.QueryResults {
+func NewQueryResults(query string, queryResult interface{}, fieldMapper source.FieldMapper) *source.QueryResults {
 	qrs := source.NewQueryResults(query)
 
 	if queryResult == nil {
@@ -179,7 +179,7 @@ func NewQueryResults(query string, queryResult interface{}, resultKeys *source.R
 			}
 		}
 
-		results = append(results, source.NewQueryResult(metricMap, vectors, resultKeys))
+		results = append(results, source.NewQueryResult(metricMap, vectors, fieldMapper))
 	}
 
 	qrs.Results = results

+ 6 - 6
modules/prometheus-source/pkg/prom/validate.go

@@ -12,7 +12,7 @@ const UpQuery = "up"
 // opencost.
 type PrometheusMetadata struct {
 	Running            bool `json:"running"`
-	KubecostDataExists bool `json:"kubecostDataExists"`
+	OpencostDataExists bool `json:"kubecostDataExists"`
 }
 
 // Validate tells the model what data prometheus has on it.
@@ -36,14 +36,14 @@ func validate(cli prometheus.Client, q string, config *OpenCostPrometheusConfig)
 	if err != nil {
 		return &PrometheusMetadata{
 			Running:            false,
-			KubecostDataExists: false,
+			OpencostDataExists: false,
 		}, err
 	}
 
 	if len(resUp) == 0 {
 		return &PrometheusMetadata{
 			Running:            false,
-			KubecostDataExists: false,
+			OpencostDataExists: false,
 		}, fmt.Errorf("no running jobs on Prometheus at %s", ctx.QueryURL().Path)
 	}
 
@@ -63,7 +63,7 @@ func validate(cli prometheus.Client, q string, config *OpenCostPrometheusConfig)
 		if job == config.JobName {
 			return &PrometheusMetadata{
 				Running:            true,
-				KubecostDataExists: true,
+				OpencostDataExists: true,
 			}, err
 		}
 	}
@@ -71,12 +71,12 @@ func validate(cli prometheus.Client, q string, config *OpenCostPrometheusConfig)
 	if !running {
 		return &PrometheusMetadata{
 			Running:            false,
-			KubecostDataExists: false,
+			OpencostDataExists: false,
 		}, fmt.Errorf("up query does not have job names")
 	}
 
 	return &PrometheusMetadata{
 		Running:            true,
-		KubecostDataExists: false,
+		OpencostDataExists: false,
 	}, nil
 }

+ 8 - 8
pkg/costmodel/allocation_helpers_test.go

@@ -230,7 +230,7 @@ func TestBuildPVMap(t *testing.T) {
 							Value: 0.05,
 						},
 					},
-					source.DefaultResultKeys(),
+					source.NewNoOpFieldMapper(),
 				),
 				source.NewQueryResult(
 					map[string]interface{}{
@@ -242,7 +242,7 @@ func TestBuildPVMap(t *testing.T) {
 							Value: 0.05,
 						},
 					},
-					source.DefaultResultKeys(),
+					source.NewNoOpFieldMapper(),
 				),
 				source.NewQueryResult(
 					map[string]interface{}{
@@ -254,7 +254,7 @@ func TestBuildPVMap(t *testing.T) {
 							Value: 0.03,
 						},
 					},
-					source.DefaultResultKeys(),
+					source.NewNoOpFieldMapper(),
 				),
 				source.NewQueryResult(
 					map[string]interface{}{
@@ -266,7 +266,7 @@ func TestBuildPVMap(t *testing.T) {
 							Value: 0.05,
 						},
 					},
-					source.DefaultResultKeys(),
+					source.NewNoOpFieldMapper(),
 				),
 			},
 			resultsActiveMinutes: []*source.QueryResult{
@@ -289,7 +289,7 @@ func TestBuildPVMap(t *testing.T) {
 							Timestamp: startFloat + (hour * 18),
 						},
 					},
-					source.DefaultResultKeys(),
+					source.NewNoOpFieldMapper(),
 				),
 				source.NewQueryResult(
 					map[string]interface{}{
@@ -313,7 +313,7 @@ func TestBuildPVMap(t *testing.T) {
 							Timestamp: startFloat + (hour * 24),
 						},
 					},
-					source.DefaultResultKeys(),
+					source.NewNoOpFieldMapper(),
 				),
 				source.NewQueryResult(
 					map[string]interface{}{
@@ -331,7 +331,7 @@ func TestBuildPVMap(t *testing.T) {
 							Timestamp: startFloat + (hour * 18),
 						},
 					},
-					source.DefaultResultKeys(),
+					source.NewNoOpFieldMapper(),
 				),
 				source.NewQueryResult(
 					map[string]interface{}{
@@ -352,7 +352,7 @@ func TestBuildPVMap(t *testing.T) {
 							Timestamp: startFloat + (hour * 18),
 						},
 					},
-					source.DefaultResultKeys(),
+					source.NewNoOpFieldMapper(),
 				),
 			},
 			expected: pvMap1NoBytes,

+ 13 - 13
pkg/costmodel/cluster_helpers_test.go

@@ -750,7 +750,7 @@ func TestBuildGPUCostMap(t *testing.T) {
 							Value:     0,
 						},
 					},
-					source.DefaultResultKeys(),
+					source.NewNoOpFieldMapper(),
 				),
 			},
 			countMap: map[NodeIdentifier]float64{
@@ -784,7 +784,7 @@ func TestBuildGPUCostMap(t *testing.T) {
 							Value:     2,
 						},
 					},
-					source.DefaultResultKeys(),
+					source.NewNoOpFieldMapper(),
 				),
 			},
 			countMap: map[NodeIdentifier]float64{
@@ -818,7 +818,7 @@ func TestBuildGPUCostMap(t *testing.T) {
 							Value:     2,
 						},
 					},
-					source.DefaultResultKeys(),
+					source.NewNoOpFieldMapper(),
 				),
 			},
 			countMap: map[NodeIdentifier]float64{},
@@ -858,7 +858,7 @@ func TestBuildGPUCostMap(t *testing.T) {
 							Value:     2,
 						},
 					},
-					source.DefaultResultKeys(),
+					source.NewNoOpFieldMapper(),
 				),
 			},
 			countMap: map[NodeIdentifier]float64{
@@ -915,7 +915,7 @@ func TestAssetCustompricing(t *testing.T) {
 					Value:     0.5,
 				},
 			},
-			source.DefaultResultKeys(),
+			source.NewNoOpFieldMapper(),
 		),
 	}
 
@@ -932,7 +932,7 @@ func TestAssetCustompricing(t *testing.T) {
 					Value:     1.0,
 				},
 			},
-			source.DefaultResultKeys(),
+			source.NewNoOpFieldMapper(),
 		),
 	}
 
@@ -949,7 +949,7 @@ func TestAssetCustompricing(t *testing.T) {
 					Value:     1073741824.0,
 				},
 			},
-			source.DefaultResultKeys(),
+			source.NewNoOpFieldMapper(),
 		),
 	}
 
@@ -970,7 +970,7 @@ func TestAssetCustompricing(t *testing.T) {
 					Value:     1.0,
 				},
 			},
-			source.DefaultResultKeys(),
+			source.NewNoOpFieldMapper(),
 		),
 	}
 
@@ -991,7 +991,7 @@ func TestAssetCustompricing(t *testing.T) {
 					Value:     1.0,
 				},
 			},
-			source.DefaultResultKeys(),
+			source.NewNoOpFieldMapper(),
 		),
 	}
 
@@ -1012,7 +1012,7 @@ func TestAssetCustompricing(t *testing.T) {
 					Value:     1.0,
 				},
 			},
-			source.DefaultResultKeys(),
+			source.NewNoOpFieldMapper(),
 		),
 	}
 
@@ -1030,7 +1030,7 @@ func TestAssetCustompricing(t *testing.T) {
 					Value:     1.0,
 				},
 			},
-			source.DefaultResultKeys(),
+			source.NewNoOpFieldMapper(),
 		),
 	}
 
@@ -1158,7 +1158,7 @@ func TestBuildLabelsMap(t *testing.T) {
 					Value:     0.5,
 				},
 			},
-			source.DefaultResultKeys(),
+			source.NewNoOpFieldMapper(),
 		),
 		source.NewQueryResult(
 			map[string]interface{}{
@@ -1175,7 +1175,7 @@ func TestBuildLabelsMap(t *testing.T) {
 					Value:     0.5,
 				},
 			},
-			source.DefaultResultKeys(),
+			source.NewNoOpFieldMapper(),
 		),
 	}