Browse Source

KubeModel: new KubeModel type for computing KubeModelSets attached to CostModel

Niko Kovacevic 6 months ago
parent
commit
bb0952ba1b

+ 2 - 0
core/pkg/exporter/pathing/kubemodelpath.go

@@ -24,6 +24,8 @@ type KubeModelStoragePathFormatter struct {
 	dir string
 }
 
+// TODO: we need to figure out how to get the proper clusterId here, if we're
+// going to use the kube-system namespace UID as the real cluster ID.
 func NewKubeModelStoragePathFormatter(rootDir, clusterId, resolution string) (StoragePathFormatter[opencost.Window], error) {
 	if clusterId == "" {
 		return nil, fmt.Errorf("cluster id cannot be empty")

+ 2 - 0
core/pkg/model/kubemodel/kubemodel_test.go

@@ -1 +1,3 @@
 package kubemodel
+
+// TODO: what tests, specifically, do we need here? Register funcs? Constructor?

+ 3 - 3
core/pkg/opencost/exporter/controllers.go

@@ -9,7 +9,7 @@ import (
 	"github.com/opencost/opencost/core/pkg/opencost"
 	"github.com/opencost/opencost/core/pkg/opencost/exporter/allocation"
 	"github.com/opencost/opencost/core/pkg/opencost/exporter/asset"
-	exporterkubemodel "github.com/opencost/opencost/core/pkg/opencost/exporter/kubemodel"
+	km "github.com/opencost/opencost/core/pkg/opencost/exporter/kubemodel"
 	"github.com/opencost/opencost/core/pkg/opencost/exporter/networkinsight"
 	"github.com/opencost/opencost/core/pkg/source"
 	"github.com/opencost/opencost/core/pkg/storage"
@@ -23,7 +23,7 @@ type ComputePipelineSource interface {
 	allocation.AllocationSource
 	asset.AssetSource
 	networkinsight.NetworkInsightSource
-	exporterkubemodel.KubeModelSource
+	km.KubeModelSource
 
 	GetDataSource() source.OpenCostDataSource
 }
@@ -137,7 +137,7 @@ func NewPipelineExportControllers(clusterId string, store storage.Storage, cm Co
 	}
 
 	// KubeModel sources and exporters
-	kubeModelSource := exporterkubemodel.NewKubeModelComputeSource(cm)
+	kubeModelSource := km.NewKubeModelComputeSource(cm)
 	kubeModelExportControllers := []*export.ComputeExportController[kubemodel.KubeModelSet]{}
 
 	for _, res := range config.KubeModelPipelineResolutions {

+ 33 - 2
core/pkg/opencost/exporter/exporter_test.go

@@ -61,7 +61,9 @@ func NewMockNetworkInsightSource() exporter.ComputeSource[opencost.NetworkInsigh
 
 func NewMockKubeModelSource() exporter.ComputeSource[kubemodel.KubeModelSet] {
 	return &MockSource[kubemodel.KubeModelSet]{
-		generate: func(start, end time.Time) *kubemodel.KubeModelSet { return opencost.GenerateMockKubeModelSet(start) },
+		generate: func(start, end time.Time) *kubemodel.KubeModelSet {
+			return opencost.GenerateMockKubeModelSet(start, end)
+		},
 	}
 }
 
@@ -124,7 +126,7 @@ func (mpcs *MockPipelineComputeSource) ComputeAssets(start, end time.Time) (*ope
 func (mpcs *MockPipelineComputeSource) ComputeNetworkInsights(start, end time.Time) (*opencost.NetworkInsightSet, error) {
 	return mpcs.netSource.Compute(start, end)
 }
-func (mpcs *MockPipelineComputeSource) ComputeKubeModel(start, end time.Time) (*kubemodel.KubeModelSet, error) {
+func (mpcs *MockPipelineComputeSource) ComputeKubeModelSet(start, end time.Time) (*kubemodel.KubeModelSet, error) {
 	return mpcs.kubeModelSource.Compute(start, end)
 }
 func (mpcs *MockPipelineComputeSource) GetDataSource() source.OpenCostDataSource {
@@ -241,6 +243,35 @@ func TestExporters(t *testing.T) {
 		validateFileCreation[opencost.NetworkInsightSet](t, memStore, p, start, end)
 	})
 
+	t.Run("KubeModel exporter", func(t *testing.T) {
+		kubeModelSource := NewMockKubeModelSource()
+		memStore := storage.NewMemoryStorage()
+		p, err := pathing.NewDefaultStoragePathFormatter(TestClusterId, pipelines.KubeModelPipelineName, ptr(TestResolution))
+		if err != nil {
+			t.Fatalf("failed to create path formatter: %v", err)
+		}
+
+		kubeModelExporter, err := NewComputePipelineExporter[kubemodel.KubeModelSet](TestClusterId, TestResolution, memStore)
+		if err != nil {
+			t.Fatalf("failed to create KubeModel exporter: %v", err)
+		}
+
+		end := time.Now().UTC().Truncate(TestResolution)
+		start := end.Add(-TestResolution)
+
+		data, err := kubeModelSource.Compute(start, end)
+		if err != nil {
+			t.Fatalf("failed to compute KubeModel data: %v", err)
+		}
+
+		err = kubeModelExporter.Export(opencost.NewClosedWindow(start, end), data)
+		if err != nil {
+			t.Fatalf("failed to export KubeModel data: %v", err)
+		}
+
+		validateFileCreation[kubemodel.KubeModelSet](t, memStore, p, start, end)
+	})
+
 	t.Run("unknown exporter", func(t *testing.T) {
 		memStore := storage.NewMemoryStorage()
 

+ 2 - 2
core/pkg/opencost/exporter/kubemodel/source.go

@@ -9,7 +9,7 @@ import (
 )
 
 type KubeModelSource interface {
-	ComputeKubeModel(start, end time.Time) (*kubemodel.KubeModelSet, error)
+	ComputeKubeModelSet(start, end time.Time) (*kubemodel.KubeModelSet, error)
 }
 
 type KubeModelComputeSource struct {
@@ -34,7 +34,7 @@ func (acs *KubeModelComputeSource) CanCompute(start, end time.Time) bool {
 
 // Compute should compute a single T for the given time range.
 func (acs *KubeModelComputeSource) Compute(start, end time.Time) (*kubemodel.KubeModelSet, error) {
-	return acs.src.ComputeKubeModel(start, end)
+	return acs.src.ComputeKubeModelSet(start, end)
 }
 
 // Name returns the name of the ComputeSource

+ 18 - 4
core/pkg/opencost/mock.go

@@ -1016,9 +1016,23 @@ func GenerateMockCloudCostSet(start, end time.Time, provider, integration string
 	return ccs
 }
 
-// GenerateMockKubeModelSet creates generic allocation set without idle allocations
-func GenerateMockKubeModelSet(start time.Time) *kubemodel.KubeModelSet {
-	// TODO: fill out mocks
+// GenerateMockKubeModelSet creates generic KubeModel set
+// TODO: does this require actual numeric data?
+func GenerateMockKubeModelSet(start, end time.Time) *kubemodel.KubeModelSet {
+	kms := kubemodel.NewKubeModelSet(start, end)
+
+	kms.Cluster = &kubemodel.Cluster{
+		UID:  "clusterUID",
+		Name: "cluster",
+	}
+
+	kms.RegisterNamespace("namespace-1", "namespace-1")
+	kms.RegisterNamespace("namespace-2", "namespace-2")
+
+	kms.RegisterResourceQuota("resourcequota-1", "resourcequota-1", "namespace-1")
+	kms.RegisterResourceQuota("resourcequota-2", "resourcequota-2", "namespace-1")
+	kms.RegisterResourceQuota("resourcequota-3", "resourcequota-3", "namespace-2")
+	kms.RegisterResourceQuota("resourcequota-4", "resourcequota-4", "namespace-2")
 
-	return &kubemodel.KubeModelSet{}
+	return kms
 }

+ 15 - 0
pkg/costmodel/costmodel.go

@@ -15,11 +15,13 @@ import (
 	coreenv "github.com/opencost/opencost/core/pkg/env"
 	"github.com/opencost/opencost/core/pkg/filter/allocation"
 	"github.com/opencost/opencost/core/pkg/log"
+	"github.com/opencost/opencost/core/pkg/model/kubemodel"
 	"github.com/opencost/opencost/core/pkg/opencost"
 	"github.com/opencost/opencost/core/pkg/source"
 	"github.com/opencost/opencost/core/pkg/util"
 	"github.com/opencost/opencost/core/pkg/util/promutil"
 	costAnalyzerCloud "github.com/opencost/opencost/pkg/cloud/models"
+	km "github.com/opencost/opencost/pkg/kubemodel"
 	v1 "k8s.io/api/core/v1"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	"k8s.io/apimachinery/pkg/labels"
@@ -47,6 +49,7 @@ type CostModel struct {
 	RequestGroup    *singleflight.Group
 	DataSource      source.OpenCostDataSource
 	Provider        costAnalyzerCloud.Provider
+	KubeModel       *km.KubeModel
 	pricingMetadata *costAnalyzerCloud.PricingMatchMetadata
 }
 
@@ -60,6 +63,13 @@ func NewCostModel(
 	// request grouping to prevent over-requesting the same data prior to caching
 	requestGroup := new(singleflight.Group)
 
+	// TODO: is this too contrived??
+	kubeModel, err := km.NewKubeModel(dataSource)
+	if err != nil {
+		// TODO: what to do here?
+		panic(err)
+	}
+
 	return &CostModel{
 		Cache:         cache,
 		ClusterMap:    clusterMap,
@@ -67,9 +77,14 @@ func NewCostModel(
 		DataSource:    dataSource,
 		Provider:      provider,
 		RequestGroup:  requestGroup,
+		KubeModel:     kubeModel,
 	}
 }
 
+func (cm *CostModel) ComputeKubeModelSet(start, end time.Time) (*kubemodel.KubeModelSet, error) {
+	return cm.KubeModel.ComputeKubeModelSet(start, end)
+}
+
 type CostData struct {
 	Name            string                       `json:"name,omitempty"`
 	PodName         string                       `json:"podName,omitempty"`

+ 77 - 32
pkg/costmodel/kubemodel.go → pkg/kubemodel/kubemodel.go

@@ -1,20 +1,44 @@
-package costmodel
+package kubemodel
 
 import (
+	"errors"
 	"fmt"
 	"time"
 
 	"github.com/opencost/opencost/core/pkg/env"
+	"github.com/opencost/opencost/core/pkg/log"
 	"github.com/opencost/opencost/core/pkg/model/kubemodel"
 	"github.com/opencost/opencost/core/pkg/source"
 )
 
 const logTimeFmt string = "2006-01-02T15:04:05"
 
+type KubeModel struct {
+	ds         source.OpenCostDataSource
+	clusterUID string
+}
+
+func NewKubeModel(dataSource source.OpenCostDataSource) (*KubeModel, error) {
+	if dataSource == nil {
+		return nil, errors.New("OpenCostDataSource cannot be nil")
+	}
+
+	km := &KubeModel{ds: dataSource}
+
+	clusterUID, err := km.computeClusterUID(time.Now().UTC())
+	if err != nil {
+		return nil, fmt.Errorf("error computing cluster UID: %w", err)
+	}
+
+	km.clusterUID = clusterUID
+
+	return km, nil
+}
+
 // ComputeKubeModel uses the CostModel instance to compute an KubeModelSet
 // for the window defined by the given start and end times. The KubeModels
 // returned are unaggregated (i.e. down to the container level).
-func (cm *CostModel) ComputeKubeModel(start, end time.Time) (*kubemodel.KubeModelSet, error) {
+func (km *KubeModel) ComputeKubeModelSet(start, end time.Time) (*kubemodel.KubeModelSet, error) {
 	// 1. Initialize new KubeModelSet for requested Window
 	kms := kubemodel.NewKubeModelSet(start, end)
 
@@ -22,20 +46,20 @@ func (cm *CostModel) ComputeKubeModel(start, end time.Time) (*kubemodel.KubeMode
 	var err error
 
 	// 2.1 Compute Cluster
-	err = cm.kmComputeCluster(kms, start, end)
+	err = km.computeCluster(kms)
 	if err != nil {
 		kms.Metadata.Errors = append(kms.Metadata.Errors, err.Error())
 		return kms, fmt.Errorf("error computing kubemodel.Cluster for (%s, %s): %w", start.Format(logTimeFmt), end.Format(logTimeFmt), err)
 	}
 
 	// 2.2 Compute Namespaces
-	err = cm.kmComputeNamespaces(kms, start, end)
+	err = km.computeNamespaces(kms, start, end)
 	if err != nil {
 		kms.Metadata.Errors = append(kms.Metadata.Errors, err.Error())
 	}
 
 	// 2.3 Compute ResourceQuotas
-	err = cm.kmComputeResourceQuotas(kms, start, end)
+	err = km.computeResourceQuotas(kms, start, end)
 	if err != nil {
 		kms.Metadata.Errors = append(kms.Metadata.Errors, err.Error())
 	}
@@ -46,26 +70,47 @@ func (cm *CostModel) ComputeKubeModel(start, end time.Time) (*kubemodel.KubeMode
 	return kms, nil
 }
 
-func (cm *CostModel) kmComputeCluster(kms *kubemodel.KubeModelSet, start, end time.Time) error {
+// TODO: come up with a better way to pull kube-system namespace UID from Metrics()?
+func (km *KubeModel) computeClusterUID(start time.Time) (string, error) {
+	// TODO: what (start, end) here? will this always work? or will it fail,
+	// e.g. right after a clean install?
+	start = start.Truncate(km.ds.Resolution())
+	end := start.Add(km.ds.Resolution())
+
+	nsLabelsResult, _ := km.ds.Metrics().QueryNamespaceLabels(start, end).Await()
+	for _, res := range nsLabelsResult {
+		if res.Namespace == "kube-system" {
+			log.Infof("KubeModel: detected cluster UID from kube-system: %s", res.UID)
+			return res.UID, nil
+		}
+	}
+
+	clusterUID := env.GetClusterID()
+	if clusterUID != "" {
+		log.Warnf("KubeModel: failed to infer cluster UID from kube-system: using env var: %s", clusterUID)
+		return clusterUID, nil
+	}
 
-	// TODO: determine where Cluster data comes from
-	//  - Should it come from direct queries?
-	//  - Or should it come from pre-processed data from other objects?
+	return "", errors.New("failed to detect cluster UID")
+}
 
+// TODO: should we periodically check the ClusterUID?
+// TODO: where do we get the additional information? km.ds.ClusterInfo().GetClusterInfo() is a map[string]string...
+func (km *KubeModel) computeCluster(kms *kubemodel.KubeModelSet) error {
 	kms.Cluster = &kubemodel.Cluster{
-		UID:  env.GetClusterID(), // TODO: should we instead grab these from Metrics()?
+		UID:  km.clusterUID,
 		Name: env.GetClusterID(), // TODO: do we still want to use this env var for Name?
 	}
 
 	return nil
 }
 
-func (cm *CostModel) kmComputeNamespaces(kms *kubemodel.KubeModelSet, start, end time.Time) error {
+func (km *KubeModel) computeNamespaces(kms *kubemodel.KubeModelSet, start, end time.Time) error {
 	grp := source.NewQueryGroup()
-	ds := cm.DataSource.Metrics()
+	metrics := km.ds.Metrics()
 
-	nsLabelsResultFuture := source.WithGroup(grp, ds.QueryNamespaceLabels(start, end))
-	nsAnnosResultFuture := source.WithGroup(grp, ds.QueryNamespaceAnnotations(start, end))
+	nsLabelsResultFuture := source.WithGroup(grp, metrics.QueryNamespaceLabels(start, end))
+	nsAnnosResultFuture := source.WithGroup(grp, metrics.QueryNamespaceAnnotations(start, end))
 
 	nsLabelsResult, _ := nsLabelsResultFuture.Await()
 	nsAnnosResult, _ := nsAnnosResultFuture.Await()
@@ -83,33 +128,33 @@ func (cm *CostModel) kmComputeNamespaces(kms *kubemodel.KubeModelSet, start, end
 	return nil
 }
 
-func (cm *CostModel) kmComputeResourceQuotas(kms *kubemodel.KubeModelSet, start, end time.Time) error {
+func (km *KubeModel) computeResourceQuotas(kms *kubemodel.KubeModelSet, start, end time.Time) error {
 	grp := source.NewQueryGroup()
-	ds := cm.DataSource.Metrics()
+	metrics := km.ds.Metrics()
 
 	// spec.hard.requests
-	rqSpecCPURequestAverageResultFuture := source.WithGroup(grp, ds.QueryResourceQuotaSpecCPURequestAverage(start, end))
-	rqSpecCPURequestMaxResultFuture := source.WithGroup(grp, ds.QueryResourceQuotaSpecCPURequestMax(start, end))
-	rqSpecRAMRequestAverageResultFuture := source.WithGroup(grp, ds.QueryResourceQuotaSpecRAMRequestAverage(start, end))
-	rqSpecRAMRequestMaxResultFuture := source.WithGroup(grp, ds.QueryResourceQuotaSpecRAMRequestMax(start, end))
+	rqSpecCPURequestAverageResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaSpecCPURequestAverage(start, end))
+	rqSpecCPURequestMaxResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaSpecCPURequestMax(start, end))
+	rqSpecRAMRequestAverageResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaSpecRAMRequestAverage(start, end))
+	rqSpecRAMRequestMaxResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaSpecRAMRequestMax(start, end))
 
 	// spec.hard.limits
-	rqSpecCPULimitAverageResultFuture := source.WithGroup(grp, ds.QueryResourceQuotaSpecCPULimitAverage(start, end))
-	rqSpecCPULimitMaxResultFuture := source.WithGroup(grp, ds.QueryResourceQuotaSpecCPULimitMax(start, end))
-	rqSpecRAMLimitAverageResultFuture := source.WithGroup(grp, ds.QueryResourceQuotaSpecRAMLimitAverage(start, end))
-	rqSpecRAMLimitMaxResultFuture := source.WithGroup(grp, ds.QueryResourceQuotaSpecRAMLimitMax(start, end))
+	rqSpecCPULimitAverageResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaSpecCPULimitAverage(start, end))
+	rqSpecCPULimitMaxResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaSpecCPULimitMax(start, end))
+	rqSpecRAMLimitAverageResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaSpecRAMLimitAverage(start, end))
+	rqSpecRAMLimitMaxResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaSpecRAMLimitMax(start, end))
 
 	// status.used.requests
-	rqStatusUsedCPURequestAverageResultFuture := source.WithGroup(grp, ds.QueryResourceQuotaStatusUsedCPURequestAverage(start, end))
-	rqStatusUsedCPURequestMaxResultFuture := source.WithGroup(grp, ds.QueryResourceQuotaStatusUsedCPURequestMax(start, end))
-	rqStatusUsedRAMRequestAverageResultFuture := source.WithGroup(grp, ds.QueryResourceQuotaStatusUsedRAMRequestAverage(start, end))
-	rqStatusUsedRAMRequestMaxResultFuture := source.WithGroup(grp, ds.QueryResourceQuotaStatusUsedRAMRequestMax(start, end))
+	rqStatusUsedCPURequestAverageResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaStatusUsedCPURequestAverage(start, end))
+	rqStatusUsedCPURequestMaxResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaStatusUsedCPURequestMax(start, end))
+	rqStatusUsedRAMRequestAverageResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaStatusUsedRAMRequestAverage(start, end))
+	rqStatusUsedRAMRequestMaxResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaStatusUsedRAMRequestMax(start, end))
 
 	// status.used.limits
-	rqStatusUsedCPULimitAverageResultFuture := source.WithGroup(grp, ds.QueryResourceQuotaStatusUsedCPULimitAverage(start, end))
-	rqStatusUsedCPULimitMaxResultFuture := source.WithGroup(grp, ds.QueryResourceQuotaStatusUsedCPULimitMax(start, end))
-	rqStatusUsedRAMLimitAverageResultFuture := source.WithGroup(grp, ds.QueryResourceQuotaStatusUsedRAMLimitAverage(start, end))
-	rqStatusUsedRAMLimitMaxResultFuture := source.WithGroup(grp, ds.QueryResourceQuotaStatusUsedRAMLimitMax(start, end))
+	rqStatusUsedCPULimitAverageResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaStatusUsedCPULimitAverage(start, end))
+	rqStatusUsedCPULimitMaxResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaStatusUsedCPULimitMax(start, end))
+	rqStatusUsedRAMLimitAverageResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaStatusUsedRAMLimitAverage(start, end))
+	rqStatusUsedRAMLimitMaxResultFuture := source.WithGroup(grp, metrics.QueryResourceQuotaStatusUsedRAMLimitMax(start, end))
 
 	rqSpecCPURequestAverageResult, _ := rqSpecCPURequestAverageResultFuture.Await()
 	for _, res := range rqSpecCPURequestAverageResult {