Просмотр исходного кода

add cluster complete metric

Signed-off-by: Sean Holcomb <seanholcomb@gmail.com>
Sean Holcomb 17 часов назад
Родитель
Сommit
75d89324c9

+ 4 - 3
core/pkg/env/core.go

@@ -23,6 +23,7 @@ const (
 
 	ExportLegacyDataModelEnvVar = "EXPORT_LEGACY_DATA_MODEL"
 	ExportKubeModelEnvVar       = "EXPORT_KUBEMODEL"
+	ExportKubeModelV2EnvVar
 )
 
 // GetAPIPort returns the environment variable value for APIPortEnvVar which
@@ -65,10 +66,10 @@ func GetInstallNamespace(def string) string {
 	return Get(InstallNamespaceEnvVar, def)
 }
 
-func GetExportLegacyDataModel() bool {
+func IsLegacyDataModelExported() bool {
 	return GetBool(ExportLegacyDataModelEnvVar, true)
 }
 
-func GetExportKubeModel() bool {
-	return GetBool(ExportKubeModelEnvVar, true)
+func IsKubeModelExported() bool {
+	return GetBool(ExportKubeModelEnvVar, false)
 }

+ 2 - 0
core/pkg/source/datasource.go

@@ -41,6 +41,7 @@ const (
 
 	// Cluster Management
 	QueryClusterInfo                 = "QueryClusterInfo"
+	QueryClusterCompleteKubeModel    = "QueryClusterCompleteKubeModel"
 	QueryClusterUptime               = "QueryClusterUptime"
 	QueryClusterManagementDuration   = "QueryClusterManagementDuration"
 	QueryClusterManagementPricePerHr = "QueryClusterManagementPricePerHr"
@@ -253,6 +254,7 @@ type MetricsQuerier interface {
 
 	// Cluster Management
 	QueryClusterInfo(start, end time.Time) *Future[ClusterInfoResult]
+	QueryClusterCompleteKubeModel(start, end time.Time) *Future[ClusterCompleteKubeModelResult]
 	QueryClusterUptime(start, end time.Time) *Future[UptimeResult]
 	QueryClusterManagementDuration(start, end time.Time) *Future[ClusterManagementDurationResult]
 	QueryClusterManagementPricePerHr(start, end time.Time) *Future[ClusterManagementPricePerHrResult]

+ 18 - 0
core/pkg/source/decoders.go

@@ -1,6 +1,7 @@
 package source
 
 import (
+	"strconv"
 	"time"
 
 	"github.com/opencost/opencost/core/pkg/log"
@@ -62,6 +63,7 @@ const (
 	SameZoneLabel        = "same_zone"
 	SameRegionLabel      = "same_region"
 	NatGatewayLabel      = "nat_gateway"
+	KubeModelV2Label     = "complete_kubemodel"
 )
 
 const (
@@ -671,6 +673,22 @@ func DecodeClusterInfoResult(result *QueryResult) *ClusterInfoResult {
 	}
 }
 
+type ClusterCompleteKubeModelResult struct {
+	UID      string
+	Complete bool
+}
+
+func DecodeClusterCompleteKubeModelResult(result *QueryResult) *ClusterCompleteKubeModelResult {
+	uid, _ := result.GetString(UIDLabel)
+	completeStr, _ := result.GetString(KubeModelV2Label)
+	complete, _ := strconv.ParseBool(completeStr)
+
+	return &ClusterCompleteKubeModelResult{
+		UID:      uid,
+		Complete: complete,
+	}
+}
+
 type ClusterManagementDurationResult struct {
 	UID         string
 	Cluster     string

+ 6 - 0
core/pkg/source/mock.go

@@ -202,6 +202,12 @@ func (m *MockMetricsQuerier) QueryClusterInfo(start, end time.Time) *Future[Clus
 	})
 }
 
+func (m *MockMetricsQuerier) QueryClusterCompleteKubeModel(start, end time.Time) *Future[ClusterCompleteKubeModelResult] {
+	return getFutureFromOverride(m.overrides, QueryClusterCompleteKubeModel, func() *Future[ClusterCompleteKubeModelResult] {
+		return m.noop.QueryClusterCompleteKubeModel(start, end)
+	})
+}
+
 func (m *MockMetricsQuerier) QueryClusterUptime(start, end time.Time) *Future[UptimeResult] {
 	return getFutureFromOverride(m.overrides, QueryClusterUptime, func() *Future[UptimeResult] {
 		return m.noop.QueryClusterUptime(start, end)

+ 4 - 0
core/pkg/source/noop.go

@@ -121,6 +121,10 @@ func (m *NoOpMetricsQuerier) QueryClusterInfo(start, end time.Time) *Future[Clus
 	return newEmptyResult(DecodeClusterInfoResult)
 }
 
+func (m *NoOpMetricsQuerier) QueryClusterCompleteKubeModel(start, end time.Time) *Future[ClusterCompleteKubeModelResult] {
+	return newEmptyResult(DecodeClusterCompleteKubeModelResult)
+}
+
 func (m *NoOpMetricsQuerier) QueryClusterUptime(start, end time.Time) *Future[UptimeResult] {
 	return newEmptyResult(DecodeUptimeResult)
 }

+ 5 - 0
core/pkg/source/record.go

@@ -154,6 +154,11 @@ func (m *RecordMetricsQuerier) QueryClusterInfo(start, end time.Time) *Future[Cl
 	return m.Querier.QueryClusterInfo(start, end)
 }
 
+func (m *RecordMetricsQuerier) QueryClusterCompleteKubeModel(start, end time.Time) *Future[ClusterCompleteKubeModelResult] {
+	m.recordCall(QueryClusterCompleteKubeModel)
+	return m.Querier.QueryClusterCompleteKubeModel(start, end)
+}
+
 func (m *RecordMetricsQuerier) QueryClusterUptime(start, end time.Time) *Future[UptimeResult] {
 	m.recordCall(QueryClusterUptime)
 	return m.Querier.QueryClusterUptime(start, end)

+ 20 - 0
modules/collector-source/pkg/collector/collector.go

@@ -50,6 +50,7 @@ func NewOpenCostMetricStore() metric.MetricStore {
 	memStore.Register(NewLBPricePerHourMetricCollector())
 	memStore.Register(NewLBActiveMinutesMetricCollector())
 	memStore.Register(NewClusterInfoMetricCollector())
+	memStore.Register(NewClusterCompleteKubeModelMetricCollector())
 	memStore.Register(NewClusterUptimeMetricCollector())
 	memStore.Register(NewClusterManagementDurationMetricCollector())
 	memStore.Register(NewClusterManagementPricePerHourMetricCollector())
@@ -840,6 +841,25 @@ func NewClusterInfoMetricCollector() *metric.MetricCollector {
 	)
 }
 
+//	avg(
+//		cluster_info{
+//			<some_custom_filter>
+//		}
+//	) by (uid, complete_kubemodel)[%s:%dm]
+
+func NewClusterCompleteKubeModelMetricCollector() *metric.MetricCollector {
+	return metric.NewMetricCollector(
+		metric.ClusterCompleteKubeModelID,
+		metric.ClusterInfo,
+		[]string{
+			source.UIDLabel,
+			source.KubeModelV2Label,
+		},
+		aggregator.Info,
+		nil,
+	)
+}
+
 //	avg(
 //		cluster_info{
 //			<some_custom_filter>

+ 4 - 0
modules/collector-source/pkg/collector/metricsquerier.go

@@ -223,6 +223,10 @@ func (c *collectorMetricsQuerier) QueryClusterInfo(start, end time.Time) *source
 	return queryCollector(c, start, end, metric.ClusterInfoID, source.DecodeClusterInfoResult)
 }
 
+func (c *collectorMetricsQuerier) QueryClusterCompleteKubeModel(start, end time.Time) *source.Future[source.ClusterCompleteKubeModelResult] {
+	return queryCollector(c, start, end, metric.ClusterCompleteKubeModelID, source.DecodeClusterCompleteKubeModelResult)
+}
+
 func (c *collectorMetricsQuerier) QueryClusterUptime(start, end time.Time) *source.Future[source.UptimeResult] {
 	return queryCollector(c, start, end, metric.ClusterUptimeID, source.DecodeUptimeResult)
 }

+ 1 - 0
modules/collector-source/pkg/metric/collector.go

@@ -48,6 +48,7 @@ const (
 	LBPricePerHourID                           MetricCollectorID = "LBPricePerHour"
 	LBActiveMinutesID                          MetricCollectorID = "LBActiveMinutes"
 	ClusterInfoID                              MetricCollectorID = "ClusterInfo"
+	ClusterCompleteKubeModelID                 MetricCollectorID = "ClusterCompleteKubeModel"
 	ClusterUptimeID                            MetricCollectorID = "ClusterUptime"
 	ClusterManagementDurationID                MetricCollectorID = "ClusterManagementDuration"
 	ClusterManagementPricePerHourID            MetricCollectorID = "ClusterManagementPricePerHour"

+ 1 - 0
modules/collector-source/pkg/scrape/clusterinfo.go

@@ -43,6 +43,7 @@ func (cis *ClusterInfoScrapper) Scrape() []metric.Update {
 		source.AccountIDLabel:       accountID,
 		source.ProvisionerNameLabel: provisioner,
 		source.RegionLabel:          region,
+		source.KubeModelV2Label:     "true",
 	}
 
 	scrapeResults = append(scrapeResults, metric.Update{

+ 18 - 0
modules/prometheus-source/pkg/prom/metricsquerier.go

@@ -661,6 +661,24 @@ func (pds *PrometheusMetricsQuerier) QueryClusterInfo(start, end time.Time) *sou
 	return source.NewFuture(source.DecodeClusterInfoResult, ctx.QueryAtTime(queryClusterInfo, end))
 }
 
+func (pds *PrometheusMetricsQuerier) QueryClusterCompleteKubeModel(start, end time.Time) *source.Future[source.ClusterCompleteKubeModelResult] {
+	const queryName = "QueryClusterCompleteKubeModel"
+	const queryFmtClusterCompleteKubeModel = `avg(avg_over_time(cluster_info{%s}[%s])) by (%s, uid, complete_kubemodel)`
+
+	cfg := pds.promConfig
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
+	}
+
+	queryClusterCompleteKubeModel := fmt.Sprintf(queryFmtClusterCompleteKubeModel, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
+	log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), queryClusterCompleteKubeModel)
+
+	ctx := pds.promContexts.NewNamedContext(ClusterContextName)
+	return source.NewFuture(source.DecodeClusterCompleteKubeModelResult, ctx.QueryAtTime(queryClusterCompleteKubeModel, end))
+}
+
 func (pds *PrometheusMetricsQuerier) QueryClusterUptime(start, end time.Time) *source.Future[source.UptimeResult] {
 	const queryName = "QueryClusterUptime"
 	const queryFmtClusterUptime = `avg(cluster_info{%s}) by (%s, uid)[%s:%dm]`

+ 6 - 5
pkg/costmodel/metrics.go

@@ -9,6 +9,7 @@ import (
 
 	"github.com/opencost/opencost/core/pkg/clustercache"
 	"github.com/opencost/opencost/core/pkg/clusters"
+	coreenv "github.com/opencost/opencost/core/pkg/env"
 	"github.com/opencost/opencost/core/pkg/errors"
 	"github.com/opencost/opencost/core/pkg/log"
 	"github.com/opencost/opencost/core/pkg/source"
@@ -361,11 +362,11 @@ func NewCostModelMetricsEmitter(clusterCache clustercache.ClusterCache, provider
 
 	metrics.InitKubeMetrics(clusterInfo, clusterCache, metricsConfig, &metrics.KubeMetricsOpts{
 		EmitKubecostControllerMetrics: true,
-		EmitNamespaceAnnotations:      env.IsEmitNamespaceAnnotationsMetric(),
-		EmitPodAnnotations:            env.IsEmitPodAnnotationsMetric(),
-		EmitKubeStateMetrics:          env.IsEmitKsmV1Metrics(),
-		EmitKubeStateMetricsV1Only:    env.IsEmitKsmV1MetricsOnly(),
-		EmitDeprecatedMetrics:         env.IsEmitDeprecatedMetrics(),
+		EmitNamespaceAnnotations:      coreenv.IsEmitNamespaceAnnotationsMetric(),
+		EmitPodAnnotations:            coreenv.IsEmitPodAnnotationsMetric(),
+		EmitKubeStateMetrics:          coreenv.IsEmitKsmV1Metrics(),
+		EmitKubeStateMetricsV1Only:    coreenv.IsEmitKsmV1MetricsOnly(),
+		EmitDeprecatedMetrics:         coreenv.IsEmitDeprecatedMetrics(),
 	})
 
 	metrics.InitOpencostTelemetry(metricsConfig)

+ 1 - 1
pkg/costmodel/router.go

@@ -533,7 +533,7 @@ func Initialize(router *httprouter.Router, additionalConfigWatchers ...*watcher.
 
 	var kubeModelPipeline *km.Pipeline
 	var kubeModelQuerier km.Querier
-	if sysenv.GetExportKubeModel() {
+	if sysenv.IsKubeModelExported() {
 		appName := sysenv.GetAppName()
 
 		if p, err := km.NewPipeline(appName, clusterUID, store, costModel); err != nil {

+ 1 - 1
pkg/kubemodel/pipeline.go

@@ -30,7 +30,7 @@ func NewPipeline(appName, clusterUID string, store storage.Storage, cm ocexporte
 		return nil, fmt.Errorf("NewPipeline: clusterUID cannot be empty")
 	}
 
-	config := ocexporter.NewPipelinesExportConfig(appName, clusterUID, "", false, env.GetExportKubeModel())
+	config := ocexporter.NewPipelinesExportConfig(appName, clusterUID, "", false, env.IsKubeModelExported())
 
 	controllers := ocexporter.NewPipelineExportControllers(store, cm, config)
 

+ 7 - 5
pkg/metrics/kubemodel.go

@@ -6,6 +6,7 @@ import (
 	"github.com/opencost/opencost/core/pkg/clustercache"
 	"github.com/opencost/opencost/core/pkg/clusters"
 	"github.com/opencost/opencost/core/pkg/log"
+	"github.com/opencost/opencost/core/pkg/source"
 	coreutil "github.com/opencost/opencost/core/pkg/util"
 	"github.com/opencost/opencost/core/pkg/util/promutil"
 	"github.com/prometheus/client_golang/prometheus"
@@ -184,11 +185,12 @@ func (c KubeModelCollector) scrapeClusterInfo(disabled map[string]struct{}) []ku
 	}
 	info := c.ClusterInfo.GetClusterInfo()
 	labels := map[string]string{
-		"uid":              info[clusters.ClusterInfoIdKey],
-		"provider":         info[clusters.ClusterInfoProviderKey],
-		"account_id":       info[clusters.ClusterInfoAccountKey],
-		"provisioner_name": info[clusters.ClusterInfoProvisionerKey],
-		"region":           info[clusters.ClusterInfoRegionKey],
+		"uid":                   info[clusters.ClusterInfoIdKey],
+		"provider":              info[clusters.ClusterInfoProviderKey],
+		"account_id":            info[clusters.ClusterInfoAccountKey],
+		"provisioner_name":      info[clusters.ClusterInfoProvisionerKey],
+		"region":                info[clusters.ClusterInfoRegionKey],
+		source.KubeModelV2Label: "true",
 	}
 	// GCP uses "project" instead of "account"
 	if labels["account_id"] == "" {