Просмотр исходного кода

Merge branch 'bolt/annotations-error-collection' into niko/aggapi

Matt Bolt 5 лет назад
Родитель
Сommit
10bb23ca30

+ 22 - 18
pkg/costmodel/cluster.go

@@ -117,7 +117,7 @@ type Disk struct {
 	Breakdown  *ClusterCostsBreakdown
 }
 
-func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, offset time.Duration) (map[string]*Disk, []error) {
+func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, offset time.Duration) (map[string]*Disk, error) {
 	durationStr := fmt.Sprintf("%dm", int64(duration.Minutes()))
 	offsetStr := fmt.Sprintf(" offset %dm", int64(offset.Minutes()))
 	if offset < time.Minute {
@@ -163,8 +163,8 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, o
 	resLocalStorageUsedCost, _ := resChLocalStorageUsedCost.Await()
 	resLocalStorageBytes, _ := resChLocalStorageBytes.Await()
 	resLocalActiveMins, _ := resChLocalActiveMins.Await()
-	if ctx.ErrorCollector.IsError() {
-		return nil, ctx.Errors()
+	if ctx.HasErrors() {
+		return nil, ctx.ErrorCollection()
 	}
 
 	diskMap := map[string]*Disk{}
@@ -399,7 +399,7 @@ var partialCPUMap = map[string]float64{
 	"e2-medium": 1.0,
 }
 
-func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset time.Duration) (map[string]*Node, []error) {
+func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset time.Duration) (map[string]*Node, error) {
 	durationStr := fmt.Sprintf("%dm", int64(duration.Minutes()))
 	offsetStr := fmt.Sprintf(" offset %dm", int64(offset.Minutes()))
 	if offset < time.Minute {
@@ -456,16 +456,17 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 	resNodeRAMUserPct, _ := resChNodeRAMUserPct.Await()
 	resActiveMins, _ := resChActiveMins.Await()
 
-	if optionalCtx.ErrorCollector.IsError() {
+	if optionalCtx.HasErrors() {
 		for _, err := range optionalCtx.Errors() {
 			log.Warningf("ClusterNodes: %s", err)
 		}
 	}
-	if requiredCtx.ErrorCollector.IsError() {
+	if requiredCtx.HasErrors() {
 		for _, err := range requiredCtx.Errors() {
 			log.Errorf("ClusterNodes: %s", err)
 		}
-		return nil, requiredCtx.Errors()
+
+		return nil, requiredCtx.ErrorCollection()
 	}
 
 	nodeMap := map[string]*Node{}
@@ -808,17 +809,17 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 
 	c, err := cp.GetConfig()
 	if err != nil {
-		return nil, []error{err}
+		return nil, err
 	}
 
 	discount, err := ParsePercentString(c.Discount)
 	if err != nil {
-		return nil, []error{err}
+		return nil, err
 	}
 
 	negotiatedDiscount, err := ParsePercentString(c.NegotiatedDiscount)
 	if err != nil {
-		return nil, []error{err}
+		return nil, err
 	}
 
 	for _, node := range nodeMap {
@@ -842,7 +843,7 @@ type LoadBalancer struct {
 	Minutes    float64
 }
 
-func ClusterLoadBalancers(cp cloud.Provider, client prometheus.Client, duration, offset time.Duration) (map[string]*LoadBalancer, []error) {
+func ClusterLoadBalancers(cp cloud.Provider, client prometheus.Client, duration, offset time.Duration) (map[string]*LoadBalancer, error) {
 	durationStr := fmt.Sprintf("%dm", int64(duration.Minutes()))
 	offsetStr := fmt.Sprintf(" offset %dm", int64(offset.Minutes()))
 	if offset < time.Minute {
@@ -869,8 +870,8 @@ func ClusterLoadBalancers(cp cloud.Provider, client prometheus.Client, duration,
 	resLBCost, _ := resChLBCost.Await()
 	resActiveMins, _ := resChActiveMins.Await()
 
-	if ctx.ErrorCollector.IsError() {
-		return nil, ctx.Errors()
+	if ctx.HasErrors() {
+		return nil, ctx.ErrorCollection()
 	}
 
 	loadBalancerMap := map[string]*LoadBalancer{}
@@ -1072,7 +1073,7 @@ func (a *Accesses) ComputeClusterCosts(client prometheus.Client, provider cloud.
 	resTotalRAM, _ := resChs[3].Await()
 	resTotalStorage, _ := resChs[4].Await()
 	if ctx.HasErrors() {
-		return nil, ctx.Errors()[0]
+		return nil, ctx.ErrorCollection()
 	}
 
 	defaultClusterID := env.GetClusterID()
@@ -1148,7 +1149,7 @@ func (a *Accesses) ComputeClusterCosts(client prometheus.Client, provider cloud.
 		resRAMSystemPct, _ := resChs[7].Await()
 		resRAMUserPct, _ := resChs[8].Await()
 		if ctx.HasErrors() {
-			return nil, ctx.Errors()[0]
+			return nil, ctx.ErrorCollection()
 		}
 
 		for _, result := range resCPUModePct {
@@ -1224,11 +1225,11 @@ func (a *Accesses) ComputeClusterCosts(client prometheus.Client, provider cloud.
 		}
 	}
 
-	if ctx.ErrorCollector.IsError() {
+	if ctx.HasErrors() {
 		for _, err := range ctx.Errors() {
 			log.Errorf("ComputeClusterCosts: %s", err)
 		}
-		return nil, ctx.Errors()[0]
+		return nil, ctx.ErrorCollection()
 	}
 
 	// Convert intermediate structure to Costs instances
@@ -1374,7 +1375,10 @@ func ClusterCostsOverTime(cli prometheus.Client, provider cloud.Provider, startS
 		// If that fails, return an error because something is actually wrong.
 		qNodes := fmt.Sprintf(queryNodes, localStorageQuery)
 
-		resultNodes, err := ctx.QueryRangeSync(qNodes, start, end, window)
+		resultNodes, warnings, err := ctx.QueryRangeSync(qNodes, start, end, window)
+		for _, warning := range warnings {
+			log.Warningf(warning)
+		}
 		if err != nil {
 			return nil, err
 		}

+ 2 - 2
pkg/costmodel/clusters/clustermap.go

@@ -120,7 +120,7 @@ func (pcm *PrometheusClusterMap) loadClusters() (map[string]*ClusterInfo, error)
 	}
 
 	// Execute Query
-	tryQuery := func() ([]*prom.QueryResult, error) {
+	tryQuery := func() ([]*prom.QueryResult, prometheus.Warnings, error) {
 		ctx := prom.NewContext(pcm.client)
 		return ctx.QuerySync(clusterInfoQuery(offset))
 	}
@@ -131,7 +131,7 @@ func (pcm *PrometheusClusterMap) loadClusters() (map[string]*ClusterInfo, error)
 	// Retry on failure
 	delay := LoadRetryDelay
 	for r := LoadRetries; r > 0; r-- {
-		qr, err = tryQuery()
+		qr, _, err = tryQuery()
 
 		// non-error breaks out of loop
 		if err == nil {

+ 27 - 8
pkg/costmodel/costmodel.go

@@ -280,13 +280,22 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 	resNetInternetRequests, _ := resChNetInternetRequests.Await()
 	resNormalization, _ := resChNormalization.Await()
 
+	// NOTE: The way we currently handle errors and warnings only early returns if there is an error. Warnings
+	// NOTE: will not propagate unless coupled with errors.
 	if ctx.HasErrors() {
+		// To keep the context of where the errors are occurring, we log the errors here and pass them the error
+		// back to the caller. The caller should handle the specific case where error is an ErrorCollection
 		for _, promErr := range ctx.Errors() {
-			log.Errorf("ComputeCostData: Prometheus error: %s", promErr.Error())
+			if promErr.Error != nil {
+				log.Errorf("ComputeCostData: Request Error: %s", promErr.Error)
+			}
+			if promErr.ParseError != nil {
+				log.Errorf("ComputeCostData: Parsing Error: %s", promErr.ParseError)
+			}
 		}
 
-		// TODO: Categorize fatal prometheus query failures
-		// return nil, fmt.Errorf("Error querying prometheus: %s", promErr.Error())
+		// ErrorCollection is an collection of errors wrapped in a single error implementation
+		return nil, ctx.ErrorCollection()
 	}
 
 	defer measureTime(time.Now(), profileThreshold, "ComputeCostData: Processing Query Data")
@@ -671,10 +680,11 @@ func findDeletedPodInfo(cli prometheusClient.Client, missingContainers map[strin
 	if len(missingContainers) > 0 {
 		queryHistoricalPodLabels := fmt.Sprintf(`kube_pod_labels{}[%s]`, window)
 
-		podLabelsResult, err := prom.NewContext(cli).QuerySync(queryHistoricalPodLabels)
+		podLabelsResult, _, err := prom.NewContext(cli).QuerySync(queryHistoricalPodLabels)
 		if err != nil {
 			log.Errorf("failed to parse historical pod labels: %s", err.Error())
 		}
+
 		podLabels := make(map[string]map[string]string)
 		if podLabelsResult != nil {
 			podLabels, err = parsePodLabels(podLabelsResult)
@@ -720,7 +730,7 @@ func findDeletedNodeInfo(cli prometheusClient.Client, missingNodes map[string]*c
 		ramCostRes, _ := ramCostResCh.Await()
 		gpuCostRes, _ := gpuCostResCh.Await()
 		if ctx.HasErrors() {
-			return ctx.Errors()[0]
+			return ctx.ErrorCollection()
 		}
 
 		cpuCosts, err := getCost(cpuCostRes)
@@ -1600,13 +1610,22 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 	measureTime(queryProfileStart, profileThreshold, fmt.Sprintf("costDataRange(%fh): Prom/k8s Queries", durHrs))
 	defer measureTime(time.Now(), profileThreshold, fmt.Sprintf("costDataRange(%fh): Processing Query Data", durHrs))
 
+	// NOTE: The way we currently handle errors and warnings only early returns if there is an error. Warnings
+	// NOTE: will not propagate unless coupled with errors.
 	if ctx.HasErrors() {
+		// To keep the context of where the errors are occurring, we log the errors here and pass them the error
+		// back to the caller. The caller should handle the specific case where error is an ErrorCollection
 		for _, promErr := range ctx.Errors() {
-			log.Errorf("CostDataRange: Prometheus error: %s", promErr.Error())
+			if promErr.Error != nil {
+				log.Errorf("CostDataRange: Request Error: %s", promErr.Error)
+			}
+			if promErr.ParseError != nil {
+				log.Errorf("CostDataRange: Parsing Error: %s", promErr.ParseError)
+			}
 		}
 
-		// TODO: Categorize fatal prometheus query failures
-		// return nil, fmt.Errorf("Error querying prometheus: %s", promErr.Error())
+		// ErrorCollection is an collection of errors wrapped in a single error implementation
+		return nil, ctx.ErrorCollection()
 	}
 
 	profileStart := time.Now()

+ 203 - 11
pkg/costmodel/metrics.go

@@ -10,13 +10,14 @@ import (
 	"time"
 
 	costAnalyzerCloud "github.com/kubecost/cost-model/pkg/cloud"
+	"github.com/kubecost/cost-model/pkg/clustercache"
 	"github.com/kubecost/cost-model/pkg/errors"
 	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/prom"
+
 	"github.com/prometheus/client_golang/prometheus"
 	dto "github.com/prometheus/client_model/go"
 	v1 "k8s.io/api/core/v1"
-	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	"k8s.io/client-go/kubernetes"
 
 	"k8s.io/klog"
@@ -32,7 +33,7 @@ var (
 
 // StatefulsetCollector is a prometheus collector that generates StatefulsetMetrics
 type StatefulsetCollector struct {
-	KubeClientSet kubernetes.Interface
+	KubeClusterCache clustercache.ClusterCache
 }
 
 // Describe sends the super-set of all possible descriptors of metrics
@@ -43,8 +44,8 @@ func (sc StatefulsetCollector) Describe(ch chan<- *prometheus.Desc) {
 
 // Collect is called by the Prometheus registry when collecting metrics.
 func (sc StatefulsetCollector) Collect(ch chan<- prometheus.Metric) {
-	ds, _ := sc.KubeClientSet.AppsV1().StatefulSets("").List(metav1.ListOptions{})
-	for _, statefulset := range ds.Items {
+	ds := sc.KubeClusterCache.GetAllStatefulSets()
+	for _, statefulset := range ds {
 		labels, values := kubeLabelsToPrometheusLabels(statefulset.Spec.Selector.MatchLabels)
 		m := newStatefulsetMetric(statefulset.GetName(), statefulset.GetNamespace(), "statefulSet_match_labels", labels, values)
 		ch <- m
@@ -118,7 +119,7 @@ func (s StatefulsetMetric) Write(m *dto.Metric) error {
 
 // DeploymentCollector is a prometheus collector that generates DeploymentMetrics
 type DeploymentCollector struct {
-	KubeClientSet kubernetes.Interface
+	KubeClusterCache clustercache.ClusterCache
 }
 
 // Describe sends the super-set of all possible descriptors of metrics
@@ -129,8 +130,8 @@ func (sc DeploymentCollector) Describe(ch chan<- *prometheus.Desc) {
 
 // Collect is called by the Prometheus registry when collecting metrics.
 func (sc DeploymentCollector) Collect(ch chan<- prometheus.Metric) {
-	ds, _ := sc.KubeClientSet.AppsV1().Deployments("").List(metav1.ListOptions{})
-	for _, deployment := range ds.Items {
+	ds := sc.KubeClusterCache.GetAllDeployments()
+	for _, deployment := range ds {
 		labels, values := kubeLabelsToPrometheusLabels(deployment.Spec.Selector.MatchLabels)
 		m := newDeploymentMetric(deployment.GetName(), deployment.GetNamespace(), "deployment_match_labels", labels, values)
 		ch <- m
@@ -204,7 +205,7 @@ func (s DeploymentMetric) Write(m *dto.Metric) error {
 
 // ServiceCollector is a prometheus collector that generates ServiceMetrics
 type ServiceCollector struct {
-	KubeClientSet kubernetes.Interface
+	KubeClusterCache clustercache.ClusterCache
 }
 
 // Describe sends the super-set of all possible descriptors of metrics
@@ -215,8 +216,8 @@ func (sc ServiceCollector) Describe(ch chan<- *prometheus.Desc) {
 
 // Collect is called by the Prometheus registry when collecting metrics.
 func (sc ServiceCollector) Collect(ch chan<- prometheus.Metric) {
-	svcs, _ := sc.KubeClientSet.CoreV1().Services("").List(metav1.ListOptions{})
-	for _, svc := range svcs.Items {
+	svcs := sc.KubeClusterCache.GetAllServices()
+	for _, svc := range svcs {
 		labels, values := kubeLabelsToPrometheusLabels(svc.Spec.Selector)
 		m := newServiceMetric(svc.GetName(), svc.GetNamespace(), "service_selector_labels", labels, values)
 		ch <- m
@@ -284,6 +285,172 @@ func (s ServiceMetric) Write(m *dto.Metric) error {
 	return nil
 }
 
+//--------------------------------------------------------------------------
+//  NamespaceAnnotationCollector
+//--------------------------------------------------------------------------
+
+// NamespaceAnnotationCollector is a prometheus collector that generates NamespaceAnnotationMetrics
+type NamespaceAnnotationCollector struct {
+	KubeClusterCache clustercache.ClusterCache
+}
+
+// Describe sends the super-set of all possible descriptors of metrics
+// collected by this Collector.
+func (nsac NamespaceAnnotationCollector) Describe(ch chan<- *prometheus.Desc) {
+	ch <- prometheus.NewDesc("kube_namespace_annotations", "namespace annotations", []string{}, nil)
+}
+
+// Collect is called by the Prometheus registry when collecting metrics.
+func (nsac NamespaceAnnotationCollector) Collect(ch chan<- prometheus.Metric) {
+	namespaces := nsac.KubeClusterCache.GetAllNamespaces()
+	for _, namespace := range namespaces {
+		labels, values := kubeAnnotationstoPrometheusLabels(namespace.Annotations)
+		m := newNamespaceAnnotationsMetric(namespace.GetName(), "kube_namespace_annotations", labels, values)
+		ch <- m
+	}
+}
+
+//--------------------------------------------------------------------------
+//  NamespaceAnnotationsMetric
+//--------------------------------------------------------------------------
+
+// NamespaceAnnotationsMetric is a prometheus.Metric used to encode namespace annotations
+type NamespaceAnnotationsMetric struct {
+	fqName      string
+	help        string
+	labelNames  []string
+	labelValues []string
+	namespace   string
+}
+
+// Creates a new NamespaceAnnotationsMetric, implementation of prometheus.Metric
+func newNamespaceAnnotationsMetric(namespace, fqname string, labelNames []string, labelValues []string) NamespaceAnnotationsMetric {
+	return NamespaceAnnotationsMetric{
+		namespace:   namespace,
+		fqName:      fqname,
+		labelNames:  labelNames,
+		labelValues: labelValues,
+		help:        "kube_namespace_annotations Namespace Annotations",
+	}
+}
+
+// Desc returns the descriptor for the Metric. This method idempotently
+// returns the same descriptor throughout the lifetime of the Metric.
+func (nam NamespaceAnnotationsMetric) Desc() *prometheus.Desc {
+	l := prometheus.Labels{"namespace": nam.namespace}
+	return prometheus.NewDesc(nam.fqName, nam.help, nam.labelNames, l)
+}
+
+// Write encodes the Metric into a "Metric" Protocol Buffer data
+// transmission object.
+func (nam NamespaceAnnotationsMetric) Write(m *dto.Metric) error {
+	h := float64(1)
+	m.Gauge = &dto.Gauge{
+		Value: &h,
+	}
+
+	var labels []*dto.LabelPair
+	for i := range nam.labelNames {
+		labels = append(labels, &dto.LabelPair{
+			Name:  &nam.labelNames[i],
+			Value: &nam.labelValues[i],
+		})
+	}
+	n := "namespace"
+	labels = append(labels, &dto.LabelPair{
+		Name:  &n,
+		Value: &nam.namespace,
+	})
+	m.Label = labels
+	return nil
+}
+
+//--------------------------------------------------------------------------
+//  PodAnnotationCollector
+//--------------------------------------------------------------------------
+
+// PodAnnotationCollector is a prometheus collector that generates PodAnnotationMetrics
+type PodAnnotationCollector struct {
+	KubeClusterCache clustercache.ClusterCache
+}
+
+// Describe sends the super-set of all possible descriptors of metrics
+// collected by this Collector.
+func (pac PodAnnotationCollector) Describe(ch chan<- *prometheus.Desc) {
+	ch <- prometheus.NewDesc("kube_pod_annotations", "pod annotations", []string{}, nil)
+}
+
+// Collect is called by the Prometheus registry when collecting metrics.
+func (pac PodAnnotationCollector) Collect(ch chan<- prometheus.Metric) {
+	pods := pac.KubeClusterCache.GetAllPods()
+	for _, pod := range pods {
+		labels, values := kubeAnnotationstoPrometheusLabels(pod.Annotations)
+		m := newPodAnnotationMetric(pod.GetNamespace(), pod.GetName(), "kube_pod_annotations", labels, values)
+		ch <- m
+	}
+}
+
+//--------------------------------------------------------------------------
+//  PodAnnotationsMetric
+//--------------------------------------------------------------------------
+
+// PodAnnotationsMetric is a prometheus.Metric used to encode namespace annotations
+type PodAnnotationsMetric struct {
+	name        string
+	fqName      string
+	help        string
+	labelNames  []string
+	labelValues []string
+	namespace   string
+}
+
+// Creates a new PodAnnotationsMetric, implementation of prometheus.Metric
+func newPodAnnotationMetric(namespace, name, fqname string, labelNames []string, labelValues []string) PodAnnotationsMetric {
+	return PodAnnotationsMetric{
+		namespace:   namespace,
+		fqName:      fqname,
+		labelNames:  labelNames,
+		labelValues: labelValues,
+		help:        "kube_pod_annotations Pod Annotations",
+	}
+}
+
+// Desc returns the descriptor for the Metric. This method idempotently
+// returns the same descriptor throughout the lifetime of the Metric.
+func (pam PodAnnotationsMetric) Desc() *prometheus.Desc {
+	l := prometheus.Labels{"namespace": pam.namespace, "pod": pam.name}
+	return prometheus.NewDesc(pam.fqName, pam.help, pam.labelNames, l)
+}
+
+// Write encodes the Metric into a "Metric" Protocol Buffer data
+// transmission object.
+func (pam PodAnnotationsMetric) Write(m *dto.Metric) error {
+	h := float64(1)
+	m.Gauge = &dto.Gauge{
+		Value: &h,
+	}
+
+	var labels []*dto.LabelPair
+	for i := range pam.labelNames {
+		labels = append(labels, &dto.LabelPair{
+			Name:  &pam.labelNames[i],
+			Value: &pam.labelValues[i],
+		})
+	}
+	n := "namespace"
+	labels = append(labels, &dto.LabelPair{
+		Name:  &n,
+		Value: &pam.namespace,
+	})
+	r := "pod"
+	labels = append(labels, &dto.LabelPair{
+		Name:  &r,
+		Value: &pam.name,
+	})
+	m.Label = labels
+	return nil
+}
+
 //--------------------------------------------------------------------------
 //  ClusterInfoCollector
 //--------------------------------------------------------------------------
@@ -453,7 +620,16 @@ func StartCostModelMetricRecording(a *Accesses) bool {
 
 			data, err := a.Model.ComputeCostData(a.PrometheusClient, a.KubeClientSet, a.CloudProvider, "2m", "", "")
 			if err != nil {
-				klog.V(1).Info("Error in price recording: " + err.Error())
+				// For an error collection, we'll just log the length of the errors (ComputeCostData already logs the
+				// actual errors)
+				if prom.IsErrorCollection(err) {
+					if ec, ok := err.(prom.QueryErrorCollection); ok {
+						klog.V(1).Info("Error in price recording: %d errors occurred", len(ec.Errors()))
+					}
+				} else {
+					klog.V(1).Info("Error in price recording: " + err.Error())
+				}
+
 				// zero the for loop so the time.Sleep will still work
 				data = map[string]*CostData{}
 			}
@@ -721,6 +897,22 @@ func kubeLabelsToPrometheusLabels(labels map[string]string) ([]string, []string)
 	return labelKeys, labelValues
 }
 
+// Converts kubernetes annotations into prometheus labels.
+func kubeAnnotationstoPrometheusLabels(labels map[string]string) ([]string, []string) {
+	labelKeys := make([]string, 0, len(labels))
+	for k := range labels {
+		labelKeys = append(labelKeys, k)
+	}
+	sort.Strings(labelKeys)
+
+	labelValues := make([]string, 0, len(labels))
+	for i, k := range labelKeys {
+		labelKeys[i] = "annotation_" + SanitizeLabelName(k)
+		labelValues = append(labelValues, labels[k])
+	}
+	return labelKeys, labelValues
+}
+
 // Replaces all illegal prometheus label characters with _
 func SanitizeLabelName(s string) string {
 	return invalidLabelCharRE.ReplaceAllString(s, "_")

+ 3 - 3
pkg/costmodel/router.go

@@ -1058,13 +1058,13 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) *Accesses {
 	prometheus.MustRegister(ClusterManagementCostRecorder)
 	prometheus.MustRegister(LBCostRecorder)
 	prometheus.MustRegister(ServiceCollector{
-		KubeClientSet: kubeClientset,
+		KubeClusterCache: k8sCache,
 	})
 	prometheus.MustRegister(DeploymentCollector{
-		KubeClientSet: kubeClientset,
+		KubeClusterCache: k8sCache,
 	})
 	prometheus.MustRegister(StatefulsetCollector{
-		KubeClientSet: kubeClientset,
+		KubeClusterCache: k8sCache,
 	})
 	prometheus.MustRegister(ClusterInfoCollector{
 		KubeClientSet: kubeClientset,

+ 15 - 0
pkg/env/costmodelenv.go

@@ -22,6 +22,9 @@ const (
 	ConfigPathEnvVar               = "CONFIG_PATH"
 	CloudProviderAPIKeyEnvVar      = "CLOUD_PROVIDER_API_KEY"
 
+	EmitPodAnnotationsMetricEnvVar       = "EMIT_POD_ANNOTATIONS_METRIC"
+	EmitNamespaceAnnotationsMetricEnvVar = "EMIT_NAMESPACE_ANNOTATIONS_METRIC"
+
 	ThanosEnabledEnvVar      = "THANOS_ENABLED"
 	ThanosQueryUrlEnvVar     = "THANOS_QUERY_URL"
 	ThanosOffsetEnvVar       = "THANOS_QUERY_OFFSET"
@@ -51,6 +54,18 @@ func GetAppVersion() string {
 	return Get(AppVersionEnvVar, "1.70.0")
 }
 
+// IsEmitNamespaceAnnotationsMetric returns true if cost-model is configured to emit the kube_namespace_annotations metric
+// containing the namespace annotations
+func IsEmitNamespaceAnnotationsMetric() bool {
+	return GetBool(EmitNamespaceAnnotationsMetricEnvVar, false)
+}
+
+// IsEmitPodAnnotationsMetric returns true if cost-model is configured to emit the kube_pod_annotations metric containing
+// pod annotations.
+func IsEmitPodAnnotationsMetric() bool {
+	return GetBool(EmitPodAnnotationsMetricEnvVar, false)
+}
+
 // GetAWSAccessKeyID returns the environment variable value for AWSAccessKeyIDEnvVar which represents
 // the AWS access key for authentication
 func GetAWSAccessKeyID() string {

+ 0 - 39
pkg/errors/errors.go

@@ -1,39 +0,0 @@
-package errors
-
-import "sync"
-
-// Error collection helper
-type ErrorCollector struct {
-	m      sync.Mutex
-	errors []error
-}
-
-// Reports an error to the collector. Ignores if the error is nil.
-func (ec *ErrorCollector) Report(e error) {
-	if e == nil {
-		return
-	}
-
-	ec.m.Lock()
-	defer ec.m.Unlock()
-
-	ec.errors = append(ec.errors, e)
-}
-
-// Whether or not the collector caught errors
-func (ec *ErrorCollector) IsError() bool {
-	ec.m.Lock()
-	defer ec.m.Unlock()
-
-	return len(ec.errors) > 0
-}
-
-// Errors caught by the collector
-func (ec *ErrorCollector) Errors() []error {
-	ec.m.Lock()
-	defer ec.m.Unlock()
-
-	errs := make([]error, len(ec.errors))
-	copy(errs, ec.errors)
-	return errs
-}

+ 199 - 0
pkg/prom/error.go

@@ -2,9 +2,208 @@ package prom
 
 import (
 	"fmt"
+	"reflect"
 	"strings"
+	"sync"
+
+	"github.com/kubecost/cost-model/pkg/log"
 )
 
+// errorType used to check HasError
+var errorType = reflect.TypeOf((*error)(nil)).Elem()
+
+//--------------------------------------------------------------------------
+//  Prometheus Error Collection
+//--------------------------------------------------------------------------
+
+type QueryError struct {
+	Query      string `json:"query"`
+	Error      error  `json:"error"`
+	ParseError error  `json:"parseError"`
+}
+
+// String returns a string representation of the QueryError
+func (qe *QueryError) String() string {
+	var sb strings.Builder
+	sb.WriteString("Errors:\n")
+	if qe.Error != nil {
+		sb.WriteString(fmt.Sprintf("  Request Error: %s\n", qe.Error))
+	}
+	if qe.ParseError != nil {
+		sb.WriteString(fmt.Sprintf("  Parse Error: %s\n", qe.ParseError))
+	}
+	sb.WriteString(fmt.Sprintf("for Query: %s\n", qe.Query))
+	return sb.String()
+}
+
+type QueryWarning struct {
+	Query    string   `json:"query"`
+	Warnings []string `json:"warnings"`
+}
+
+// String returns a string representation of the QueryWarning
+func (qw *QueryWarning) String() string {
+	var sb strings.Builder
+	sb.WriteString("Warnings:\n")
+	for i, w := range qw.Warnings {
+		sb.WriteString(fmt.Sprintf("  %d) %s\n", i+1, w))
+	}
+	sb.WriteString(fmt.Sprintf("for Query: %s\n", qw.Query))
+	return sb.String()
+}
+
+// QueryErrorCollection represents a collection of query errors and warnings made via context.
+type QueryErrorCollection interface {
+	//
+	Warnings() []*QueryWarning
+	Errors() []*QueryError
+}
+
+// QueryErrorCollector is used to collect prometheus query errors and warnings, and also meets the
+// Error
+type QueryErrorCollector struct {
+	m        sync.RWMutex
+	errors   []*QueryError
+	warnings []*QueryWarning
+}
+
+// Reports an error to the collector. Ignores if the error is nil and the warnings
+// are empty
+func (ec *QueryErrorCollector) Report(query string, warnings []string, requestError error, parseError error) {
+	if requestError == nil && parseError == nil && len(warnings) == 0 {
+		return
+	}
+
+	ec.m.Lock()
+	defer ec.m.Unlock()
+
+	if requestError != nil || parseError != nil {
+		ec.errors = append(ec.errors, &QueryError{
+			Query:      query,
+			Error:      requestError,
+			ParseError: parseError,
+		})
+	}
+
+	if len(warnings) > 0 {
+		ec.warnings = append(ec.warnings, &QueryWarning{
+			Query:    query,
+			Warnings: warnings,
+		})
+	}
+}
+
+// Whether or not the collector caught any warnings
+func (ec *QueryErrorCollector) IsWarning() bool {
+	ec.m.RLock()
+	defer ec.m.RUnlock()
+
+	return len(ec.warnings) > 0
+}
+
+// Whether or not the collector caught errors
+func (ec *QueryErrorCollector) IsError() bool {
+	ec.m.RLock()
+	defer ec.m.RUnlock()
+
+	return len(ec.errors) > 0
+}
+
+// Warnings caught by the collector
+func (ec *QueryErrorCollector) Warnings() []*QueryWarning {
+	ec.m.RLock()
+	defer ec.m.RUnlock()
+
+	warns := make([]*QueryWarning, len(ec.warnings))
+	copy(warns, ec.warnings)
+	return warns
+}
+
+// Errors caught by the collector
+func (ec *QueryErrorCollector) Errors() []*QueryError {
+	ec.m.RLock()
+	defer ec.m.RUnlock()
+
+	errs := make([]*QueryError, len(ec.errors))
+	copy(errs, ec.errors)
+	return errs
+}
+
+// Implement the error interface to allow returning as an aggregated error
+func (ec *QueryErrorCollector) Error() string {
+	ec.m.RLock()
+	defer ec.m.RUnlock()
+
+	var sb strings.Builder
+	if len(ec.errors) > 0 {
+		sb.WriteString("Error Collection:\n")
+		for i, e := range ec.errors {
+			sb.WriteString(fmt.Sprintf("%d) %s\n", i, e))
+		}
+	}
+	if len(ec.warnings) > 0 {
+		sb.WriteString("Warning Collection:\n")
+		for _, w := range ec.warnings {
+			sb.WriteString(w.String())
+		}
+	}
+
+	return sb.String()
+}
+
+// As is a special method that implicitly works with the `errors.As()` go
+// helper to locate the _first_ instance of the provided target type in the
+// collection.
+func (ec *QueryErrorCollector) As(target interface{}) bool {
+	if target == nil {
+		log.Errorf("ErrorCollection.As() target cannot be nil")
+		return false
+	}
+
+	val := reflect.ValueOf(target)
+	typ := val.Type()
+	if typ.Kind() != reflect.Ptr || val.IsNil() {
+		log.Errorf("ErrorCollection.As() target must be a non-nil pointer")
+		return false
+	}
+	if e := typ.Elem(); e.Kind() != reflect.Interface && !e.Implements(errorType) {
+		log.Errorf("ErrorCollection.As() *target must be interface or implement error")
+		return false
+	}
+
+	targetType := typ.Elem()
+	for _, err := range AllErrorsFor(ec) {
+		if reflect.TypeOf(err).AssignableTo(targetType) {
+			val.Elem().Set(reflect.ValueOf(err))
+			return true
+		}
+		if x, ok := err.(interface{ As(interface{}) bool }); ok && x.As(target) {
+			return true
+		}
+	}
+
+	return false
+}
+
+// IsErrorCollection returns true if the provided error is an ErrorCollection
+func IsErrorCollection(err error) bool {
+	_, ok := err.(QueryErrorCollection)
+	return ok
+}
+
+func AllErrorsFor(collection QueryErrorCollection) []error {
+	var errs []error
+	for _, qe := range collection.Errors() {
+		if qe.Error != nil {
+			errs = append(errs, qe.Error)
+		}
+		if qe.ParseError != nil {
+			errs = append(errs, qe.ParseError)
+		}
+	}
+	return errs
+}
+
 // WrapError wraps the given error with the given message, usually for adding
 // context, but persists the existing type of error.
 func WrapError(err error, msg string) error {

+ 59 - 42
pkg/prom/query.go

@@ -25,27 +25,48 @@ const (
 // parsing query responses and errors.
 type Context struct {
 	Client         prometheus.Client
-	ErrorCollector *errors.ErrorCollector
+	errorCollector *QueryErrorCollector
 }
 
 // NewContext creates a new Promethues querying context from the given client
 func NewContext(client prometheus.Client) *Context {
-	var ec errors.ErrorCollector
+	var ec QueryErrorCollector
 
 	return &Context{
 		Client:         client,
-		ErrorCollector: &ec,
+		errorCollector: &ec,
 	}
 }
 
-// Errors returns the errors collected from the Context's ErrorCollector
-func (ctx *Context) Errors() []error {
-	return ctx.ErrorCollector.Errors()
+// Warnings returns the warnings collected from the Context's ErrorCollector
+func (ctx *Context) Warnings() []*QueryWarning {
+	return ctx.errorCollector.Warnings()
+}
+
+// HasWarnings returns true if the ErrorCollector has warnings.
+func (ctx *Context) HasWarnings() bool {
+	return ctx.errorCollector.IsWarning()
+}
+
+// Errors returns the errors collected from the Context's ErrorCollector.
+func (ctx *Context) Errors() []*QueryError {
+	return ctx.errorCollector.Errors()
 }
 
 // HasErrors returns true if the ErrorCollector has errors
 func (ctx *Context) HasErrors() bool {
-	return ctx.ErrorCollector.IsError()
+	return ctx.errorCollector.IsError()
+}
+
+// ErrorCollection returns the aggregation of errors if there exists errors. Otherwise,
+// nil is returned
+func (ctx *Context) ErrorCollection() error {
+	if ctx.errorCollector.IsError() {
+		// errorCollector implements the error interface
+		return ctx.errorCollector
+	}
+
+	return nil
 }
 
 // Query returns a QueryResultsChan, then runs the given query and sends the
@@ -98,18 +119,18 @@ func (ctx *Context) ProfileQueryAll(queries ...string) []QueryResultsChan {
 	return resChs
 }
 
-func (ctx *Context) QuerySync(query string) ([]*QueryResult, error) {
-	raw, err := ctx.query(query)
+func (ctx *Context) QuerySync(query string) ([]*QueryResult, prometheus.Warnings, error) {
+	raw, warnings, err := ctx.query(query)
 	if err != nil {
-		return nil, err
+		return nil, warnings, err
 	}
 
 	results := NewQueryResults(query, raw)
 	if results.Error != nil {
-		return nil, results.Error
+		return nil, warnings, results.Error
 	}
 
-	return results.Results, nil
+	return results.Results, warnings, nil
 }
 
 // QueryURL returns the URL used to query Prometheus
@@ -123,13 +144,11 @@ func runQuery(query string, ctx *Context, resCh QueryResultsChan, profileLabel s
 	defer errors.HandlePanic()
 	startQuery := time.Now()
 
-	raw, promErr := ctx.query(query)
-	ctx.ErrorCollector.Report(promErr)
-
+	raw, warnings, requestError := ctx.query(query)
 	results := NewQueryResults(query, raw)
-	if results.Error != nil {
-		ctx.ErrorCollector.Report(results.Error)
-	}
+
+	// report all warnings, request, and parse errors (nils will be ignored)
+	ctx.errorCollector.Report(query, warnings, requestError, results.Error)
 
 	if profileLabel != "" {
 		log.Profile(startQuery, profileLabel)
@@ -138,7 +157,7 @@ func runQuery(query string, ctx *Context, resCh QueryResultsChan, profileLabel s
 	resCh <- results
 }
 
-func (ctx *Context) query(query string) (interface{}, error) {
+func (ctx *Context) query(query string) (interface{}, prometheus.Warnings, error) {
 	u := ctx.Client.URL(epQuery, nil)
 	q := u.Query()
 	q.Set("query", query)
@@ -146,7 +165,7 @@ func (ctx *Context) query(query string) (interface{}, error) {
 
 	req, err := http.NewRequest(http.MethodPost, u.String(), nil)
 	if err != nil {
-		return nil, err
+		return nil, nil, err
 	}
 
 	resp, body, warnings, err := ctx.Client.Do(context.Background(), req)
@@ -155,19 +174,19 @@ func (ctx *Context) query(query string) (interface{}, error) {
 	}
 	if err != nil {
 		if resp == nil {
-			return nil, fmt.Errorf("query error: '%s' fetching query '%s'", err.Error(), query)
+			return nil, warnings, fmt.Errorf("query error: '%s' fetching query '%s'", err.Error(), query)
 		}
 
-		return nil, fmt.Errorf("query error %d: '%s' fetching query '%s'", resp.StatusCode, err.Error(), query)
+		return nil, warnings, fmt.Errorf("query error %d: '%s' fetching query '%s'", resp.StatusCode, err.Error(), query)
 	}
 
 	var toReturn interface{}
 	err = json.Unmarshal(body, &toReturn)
 	if err != nil {
-		return nil, fmt.Errorf("query error: '%s' fetching query '%s'", err.Error(), query)
+		return nil, warnings, fmt.Errorf("query error: '%s' fetching query '%s'", err.Error(), query)
 	}
 
-	return toReturn, nil
+	return toReturn, warnings, nil
 }
 
 func (ctx *Context) QueryRange(query string, start, end time.Time, step time.Duration) QueryResultsChan {
@@ -186,18 +205,18 @@ func (ctx *Context) ProfileQueryRange(query string, start, end time.Time, step t
 	return resCh
 }
 
-func (ctx *Context) QueryRangeSync(query string, start, end time.Time, step time.Duration) ([]*QueryResult, error) {
-	raw, err := ctx.queryRange(query, start, end, step)
+func (ctx *Context) QueryRangeSync(query string, start, end time.Time, step time.Duration) ([]*QueryResult, prometheus.Warnings, error) {
+	raw, warnings, err := ctx.queryRange(query, start, end, step)
 	if err != nil {
-		return nil, err
+		return nil, warnings, err
 	}
 
 	results := NewQueryResults(query, raw)
 	if results.Error != nil {
-		return nil, results.Error
+		return nil, warnings, results.Error
 	}
 
-	return results.Results, nil
+	return results.Results, warnings, nil
 }
 
 // QueryRangeURL returns the URL used to query_range Prometheus
@@ -211,13 +230,11 @@ func runQueryRange(query string, start, end time.Time, step time.Duration, ctx *
 	defer errors.HandlePanic()
 	startQuery := time.Now()
 
-	raw, promErr := ctx.queryRange(query, start, end, step)
-	ctx.ErrorCollector.Report(promErr)
-
+	raw, warnings, requestError := ctx.queryRange(query, start, end, step)
 	results := NewQueryResults(query, raw)
-	if results.Error != nil {
-		ctx.ErrorCollector.Report(results.Error)
-	}
+
+	// report all warnings, request, and parse errors (nils will be ignored)
+	ctx.errorCollector.Report(query, warnings, requestError, results.Error)
 
 	if profileLabel != "" {
 		log.Profile(startQuery, profileLabel)
@@ -226,7 +243,7 @@ func runQueryRange(query string, start, end time.Time, step time.Duration, ctx *
 	resCh <- results
 }
 
-func (ctx *Context) queryRange(query string, start, end time.Time, step time.Duration) (interface{}, error) {
+func (ctx *Context) queryRange(query string, start, end time.Time, step time.Duration) (interface{}, prometheus.Warnings, error) {
 	u := ctx.Client.URL(epQueryRange, nil)
 	q := u.Query()
 	q.Set("query", query)
@@ -237,7 +254,7 @@ func (ctx *Context) queryRange(query string, start, end time.Time, step time.Dur
 
 	req, err := http.NewRequest(http.MethodPost, u.String(), nil)
 	if err != nil {
-		return nil, err
+		return nil, nil, err
 	}
 
 	resp, body, warnings, err := ctx.Client.Do(context.Background(), req)
@@ -246,24 +263,24 @@ func (ctx *Context) queryRange(query string, start, end time.Time, step time.Dur
 	}
 	if err != nil {
 		if resp == nil {
-			return nil, fmt.Errorf("Error: %s, Body: %s Query: %s", err.Error(), body, query)
+			return nil, warnings, fmt.Errorf("Error: %s, Body: %s Query: %s", err.Error(), body, query)
 		}
 
-		return nil, fmt.Errorf("%d (%s) Headers: %s Error: %s Body: %s Query: %s", resp.StatusCode, http.StatusText(resp.StatusCode), util.HeaderString(resp.Header), body, err.Error(), query)
+		return nil, warnings, fmt.Errorf("%d (%s) Headers: %s Error: %s Body: %s Query: %s", resp.StatusCode, http.StatusText(resp.StatusCode), util.HeaderString(resp.Header), body, err.Error(), query)
 	}
 
 	// Unsuccessful Status Code, log body and status
 	statusCode := resp.StatusCode
 	statusText := http.StatusText(statusCode)
 	if resp.StatusCode < 200 || resp.StatusCode >= 300 {
-		return nil, fmt.Errorf("%d (%s) Headers: %s, Body: %s Query: %s", statusCode, statusText, util.HeaderString(resp.Header), body, query)
+		return nil, warnings, fmt.Errorf("%d (%s) Headers: %s, Body: %s Query: %s", statusCode, statusText, util.HeaderString(resp.Header), body, query)
 	}
 
 	var toReturn interface{}
 	err = json.Unmarshal(body, &toReturn)
 	if err != nil {
-		return nil, fmt.Errorf("%d (%s) Headers: %s Error: %s Body: %s Query: %s", statusCode, statusText, util.HeaderString(resp.Header), err.Error(), body, query)
+		return nil, warnings, fmt.Errorf("%d (%s) Headers: %s Error: %s Body: %s Query: %s", statusCode, statusText, util.HeaderString(resp.Header), err.Error(), body, query)
 	}
 
-	return toReturn, nil
+	return toReturn, warnings, nil
 }

+ 1 - 1
pkg/prom/validate.go

@@ -33,7 +33,7 @@ func Validate(cli prometheus.Client) (*PrometheusMetadata, error) {
 func validate(cli prometheus.Client, q string) (*PrometheusMetadata, error) {
 	ctx := NewContext(cli)
 
-	resUp, err := ctx.QuerySync(q)
+	resUp, _, err := ctx.QuerySync(q)
 	if err != nil {
 		return &PrometheusMetadata{
 			Running:            false,