ソースを参照

Pathing for KubeModel; revisions to KubeModel structure

Niko Kovacevic 6 ヶ月 前
コミット
883b039c2c

+ 18 - 10
core/pkg/exporter/pathing/bingenpath.go

@@ -7,12 +7,14 @@ import (
 
 	"github.com/opencost/opencost/core/pkg/exporter/pathing/pathutils"
 	"github.com/opencost/opencost/core/pkg/opencost"
+	"github.com/opencost/opencost/core/pkg/pipelines"
 	"github.com/opencost/opencost/core/pkg/util/timeutil"
 )
 
 const (
-	DefaultRootDir string = "federated"
-	BaseStorageDir string = "etl/bingen"
+	DefaultRootDir   string = "federated"
+	BaseStorageDir   string = "etl/bingen"
+	FinOpsAgentAppID string = "finops-agent"
 )
 
 // BingenStoragePathFormatter is an implementation of the StoragePathFormatter interface for
@@ -27,18 +29,24 @@ type BingenStoragePathFormatter struct {
 }
 
 func NewDefaultStoragePathFormatter(clusterId, pipeline string, resolution *time.Duration) (StoragePathFormatter[opencost.Window], error) {
-	return NewBingenStoragePathFormatter(DefaultRootDir, clusterId, pipeline, resolution)
-}
-
-// NewBingenStoragePathFormatter creates a StoragePathFormatter for a cluster separated storage path
-// with the given root directory, cluster id, pipeline, and resolution. To omit the resolution directory
-// structure, provide a `nil` resolution.
-func NewBingenStoragePathFormatter(rootDir, clusterId, pipeline string, resolution *time.Duration) (StoragePathFormatter[opencost.Window], error) {
 	res := "."
 	if resolution != nil {
 		res = timeutil.FormatStoreResolution(*resolution)
 	}
 
+	// KubeModel uses a distinct pathing pattern which breaks with the original
+	// Allocations and Assets bingen pathing.
+	if pipeline == pipelines.KubeModelPipelineName {
+		return NewKubeModelStoragePathFormatter(FinOpsAgentAppID, clusterId, res)
+	}
+
+	return NewBingenStoragePathFormatter(DefaultRootDir, clusterId, pipeline, res)
+}
+
+// NewBingenStoragePathFormatter creates a StoragePathFormatter for a cluster separated storage path
+// with the given root directory, cluster id, pipeline, and resolution. To omit the resolution directory
+// structure, provide a `nil` resolution.
+func NewBingenStoragePathFormatter(rootDir, clusterId, pipeline, resolution string) (StoragePathFormatter[opencost.Window], error) {
 	if clusterId == "" {
 		return nil, fmt.Errorf("cluster id cannot be empty")
 	}
@@ -51,7 +59,7 @@ func NewBingenStoragePathFormatter(rootDir, clusterId, pipeline string, resoluti
 		rootDir:    rootDir,
 		clusterId:  clusterId,
 		pipeline:   pipeline,
-		resolution: res,
+		resolution: resolution,
 	}, nil
 }
 

+ 78 - 0
core/pkg/exporter/pathing/kubemodelpath.go

@@ -0,0 +1,78 @@
+package pathing
+
+import (
+	"fmt"
+	"path"
+	"time"
+
+	"github.com/opencost/opencost/core/pkg/opencost"
+	"github.com/opencost/opencost/core/pkg/pipelines"
+)
+
+const KubeModelStorageTimeFormat = "20060102150405"
+
+// KubeModelStoragePathFormatter is an implementation of the StoragePathFormatter interface for
+// a cluster separated storage path of the format:
+//
+//	<root>/<clusterid>/kubemodel/<resolution>/<YYYYMMDDHHiiSS>
+//
+// where <root> is, e.g., s3://<bucket>/<appid>
+type KubeModelStoragePathFormatter struct {
+	rootDir    string
+	clusterId  string
+	resolution string
+}
+
+func NewKubeModelStoragePathFormatter(rootDir, clusterId, resolution string) (StoragePathFormatter[opencost.Window], error) {
+	if clusterId == "" {
+		return nil, fmt.Errorf("cluster id cannot be empty")
+	}
+
+	return &KubeModelStoragePathFormatter{
+		rootDir:    rootDir,
+		clusterId:  clusterId,
+		resolution: resolution,
+	}, nil
+}
+
+// RootDir returns the root directory of the storage path formatter.
+func (kmspf *KubeModelStoragePathFormatter) RootDir() string {
+	return kmspf.rootDir
+}
+
+// Dir returns the director that files will be placed in
+func (kmspf *KubeModelStoragePathFormatter) Dir() string {
+	return path.Join(
+		kmspf.rootDir,
+		kmspf.clusterId,
+		pipelines.KubeModelPipelineName,
+		kmspf.resolution,
+	)
+}
+
+// ToFullPath returns the full path to a file name within the storage directory using the format:
+//
+//	<root>/<clusterid>/kubemodel/<resolution>/<prefix>.<YYYYMMDDHHiiSS>.<fileExt>
+func (kmspf *KubeModelStoragePathFormatter) ToFullPath(prefix string, window opencost.Window, fileExt string) string {
+	return path.Join(
+		kmspf.rootDir,
+		kmspf.clusterId,
+		pipelines.KubeModelPipelineName,
+		kmspf.resolution,
+		toKubeModelFileName(prefix, window.Start(), fileExt),
+	)
+}
+
+func toKubeModelFileName(prefix string, start *time.Time, fileExt string) string {
+	filename := derefTimeOrZero(start).Format(KubeModelStorageTimeFormat)
+
+	if fileExt != "" {
+		filename = fmt.Sprintf("%s.%s", filename, fileExt)
+	}
+
+	if prefix == "" {
+		return filename
+	}
+
+	return fmt.Sprintf("%s.%s", prefix, filename)
+}

+ 0 - 132
core/pkg/model/kubemodel/example.go

@@ -1,132 +0,0 @@
-package kubemodel
-
-import "github.com/opencost/opencost/core/pkg/stats"
-
-type ContainerCosts struct {
-	Container
-	CPUCost     float64
-	RAMCost     float64
-	StorageCost float64
-}
-
-// ApplyContainerCosts is a simplified example of a consumer of the
-// KubeModelSet data structure, which applies Node and PersistentVolume
-// pricing to container resources to assign container costs.
-func ApplyContainerCosts(kms *KubeModelSet, pm PricingModel, out chan map[string]*ContainerCosts) {
-	// 1. Option that uses the flat structure
-	flatAlgorithm(kms, pm, out)
-
-	// 2. Option that uses the cost-priority hierarchical structure
-	hierarchyAlgorithm(kms, pm, out)
-}
-
-// O(C * avg(PVC))
-// 42 lines
-// 2 loops
-// 3 max indentation
-func flatAlgorithm(kms *KubeModelSet, pm PricingModel, out chan map[string]*ContainerCosts) {
-	// O(1) -- slight advantage in being able to allocate memory for the whole set of Containers
-	ccs := make(map[string]*ContainerCosts, len(kms.Containers))
-
-	// O(C)
-	for _, container := range kms.Containers {
-		// O(1)
-		cc := &ContainerCosts{Container: *container}
-
-		// O(1)
-		pod := kms.Pods[container.PodUID]
-
-		// O(1)
-		node := kms.Nodes[pod.NodeUID]
-
-		// O(1) -- presumably
-		nodePricing := pm.GetNodePricing(node)
-
-		// O(1)
-		cc.CPUCost = float64(container.CPUAllocationMillicoreSeconds) * nodePricing[ResourceCPU].Price
-		cc.RAMCost = float64(container.RAMAllocationByteSeconds) * nodePricing[ResourceMemory].Price
-
-		// O(PVC)
-		for pvcUID, mountedVolume := range container.PVCMounts {
-			// O(1)
-			pvc := kms.PersistentVolumeClaims[pvcUID]
-
-			// O(1)
-			pv := kms.PersistentVolumes[pvc.PersistentVolumeUID]
-
-			// O(1)
-			pvPricing := pm.GetPersistentVolumePricing(pv)
-
-			// O(1)
-			cc.StorageCost += float64(mountedVolume.StorageCapacityBytes) * pvPricing[ResourceStorage].Price
-		}
-
-		// O(1)
-		ccs[container.UID] = cc
-	}
-
-	// O(1)
-	out <- ccs
-}
-
-// O(N * avg(P) * avg(C) * avg(PVC)) == O(C * avg(PVC))
-// 42 lines
-// 4 loops
-// 5 max indentation
-func hierarchyAlgorithm(kms *KubeModelSet, pm PricingModel, out chan map[string]*ContainerCosts) {
-	// O(1) -- total number of containers would have to be indexed / cached
-	ccs := map[string]*ContainerCosts{}
-
-	// O(N)
-	for _, node := range kms.Cluster.Nodes {
-		// O(avg(P))
-		for _, pod := range node.Pods {
-			// O(avg(C))
-			for _, container := range pod.Containers {
-				// O(1)
-				cc := &ContainerCosts{Container: *container}
-
-				// O(1) -- presumably
-				nodePricing := pm.GetNodePricing(node)
-
-				// O(1)
-				cc.CPUCost = container.Resources[ResourceCPU].Values[stats.Val] * nodePricing[ResourceCPU].Price
-				cc.RAMCost = container.Resources[ResourceMemory].Values[stats.Val] * nodePricing[ResourceMemory].Price
-
-				// O(PVC) -- going to need to use the same access pattern as the
-				// flat data structure, anyways, unless somehow PVCs / PVs will
-				// be nested within Containers (e.g. under VolumeMounts?)
-				for pvcUID, storage := range container.VolumeMounts {
-					// O(1)
-					pvc := kms.PersistentVolumeClaims[pvcUID]
-
-					// O(1)
-					pv := kms.PersistentVolumes[pvc.PersistentVolumeUID]
-
-					// O(1)
-					pvPricing := pm.GetPersistentVolumePricing(pv)
-
-					// O(1)
-					cc.StorageCost += storage.Values[stats.Val] * pvPricing[ResourceStorage].Price
-				}
-
-				ccs[container.UID] = cc
-			}
-		}
-	}
-
-	out <- ccs
-}
-
-type PricingModel interface {
-	GetNodePricing(node *Node) Pricing
-	GetPersistentVolumePricing(pv *PersistentVolume) Pricing
-}
-
-type Pricing map[Resource]ResourcePricing
-
-type ResourcePricing struct {
-	Resource Resource
-	Unit     Unit
-	Price    float64
-}

+ 1 - 1
core/pkg/opencost/exporter/controllers.go

@@ -137,7 +137,7 @@ func NewPipelineExportControllers(clusterId string, store storage.Storage, cm Co
 	}
 
 	// KubeModel sources and exporters
-	kubeModelSource := exporterkubemodel.NewKubeModelComputeSource(cm) // TODO: CREATE AN IMPLEMENTATION OF ComputeSource[kubemodel.KubeModelSet]
+	kubeModelSource := exporterkubemodel.NewKubeModelComputeSource(cm)
 	kubeModelExportControllers := []*export.ComputeExportController[kubemodel.KubeModelSet]{}
 
 	for _, res := range config.KubeModelPipelineResolutions {

+ 16 - 8
pkg/costmodel/kubemodel.go

@@ -119,13 +119,15 @@ func (cm *CostModel) kmComputeResourceQuotas(kms *kubemodel.KubeModelSet, start,
 	rqSpecCPURequestAverageResult, _ := rqSpecCPURequestAverageResultFuture.Await()
 	for _, res := range rqSpecCPURequestAverageResult {
 		kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
-		kms.ResourceQuotas[res.UID].Spec.Hard.Requests.Set(kubemodel.ResourceCPU, kubemodel.UnitCPUCore, stats.Avg, res.Data[0].Value)
+		mcpu := res.Data[0].Value * 1000
+		kms.ResourceQuotas[res.UID].Spec.Hard.Requests.Set(kubemodel.ResourceCPU, kubemodel.UnitMillicore, stats.Avg, mcpu)
 	}
 
 	rqSpecCPURequestMaxResult, _ := rqSpecCPURequestMaxResultFuture.Await()
 	for _, res := range rqSpecCPURequestMaxResult {
 		kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
-		kms.ResourceQuotas[res.UID].Spec.Hard.Requests.Set(kubemodel.ResourceCPU, kubemodel.UnitCPUCore, stats.Max, res.Data[0].Value)
+		mcpu := res.Data[0].Value * 1000
+		kms.ResourceQuotas[res.UID].Spec.Hard.Requests.Set(kubemodel.ResourceCPU, kubemodel.UnitMillicore, stats.Max, mcpu)
 	}
 
 	rqSpecRAMRequestAverageResult, _ := rqSpecRAMRequestAverageResultFuture.Await()
@@ -143,13 +145,15 @@ func (cm *CostModel) kmComputeResourceQuotas(kms *kubemodel.KubeModelSet, start,
 	rqSpecCPULimitAverageResult, _ := rqSpecCPULimitAverageResultFuture.Await()
 	for _, res := range rqSpecCPULimitAverageResult {
 		kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
-		kms.ResourceQuotas[res.UID].Spec.Hard.Limits.Set(kubemodel.ResourceCPU, kubemodel.UnitCPUCore, stats.Avg, res.Data[0].Value)
+		mcpu := res.Data[0].Value * 1000
+		kms.ResourceQuotas[res.UID].Spec.Hard.Limits.Set(kubemodel.ResourceCPU, kubemodel.UnitMillicore, stats.Avg, mcpu)
 	}
 
 	rqSpecCPULimitMaxResult, _ := rqSpecCPULimitMaxResultFuture.Await()
 	for _, res := range rqSpecCPULimitMaxResult {
 		kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
-		kms.ResourceQuotas[res.UID].Spec.Hard.Limits.Set(kubemodel.ResourceCPU, kubemodel.UnitCPUCore, stats.Max, res.Data[0].Value)
+		mcpu := res.Data[0].Value * 1000
+		kms.ResourceQuotas[res.UID].Spec.Hard.Limits.Set(kubemodel.ResourceCPU, kubemodel.UnitMillicore, stats.Max, mcpu)
 	}
 
 	rqSpecRAMLimitAverageResult, _ := rqSpecRAMLimitAverageResultFuture.Await()
@@ -167,13 +171,15 @@ func (cm *CostModel) kmComputeResourceQuotas(kms *kubemodel.KubeModelSet, start,
 	rqStatusUsedCPURequestAverageResult, _ := rqStatusUsedCPURequestAverageResultFuture.Await()
 	for _, res := range rqStatusUsedCPURequestAverageResult {
 		kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
-		kms.ResourceQuotas[res.UID].Status.Used.Requests.Set(kubemodel.ResourceCPU, kubemodel.UnitCPUCore, stats.Avg, res.Data[0].Value)
+		mcpu := res.Data[0].Value * 1000
+		kms.ResourceQuotas[res.UID].Status.Used.Requests.Set(kubemodel.ResourceCPU, kubemodel.UnitMillicore, stats.Avg, mcpu)
 	}
 
 	rqStatusUsedCPURequestMaxResult, _ := rqStatusUsedCPURequestMaxResultFuture.Await()
 	for _, res := range rqStatusUsedCPURequestMaxResult {
 		kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
-		kms.ResourceQuotas[res.UID].Status.Used.Requests.Set(kubemodel.ResourceCPU, kubemodel.UnitCPUCore, stats.Max, res.Data[0].Value)
+		mcpu := res.Data[0].Value * 1000
+		kms.ResourceQuotas[res.UID].Status.Used.Requests.Set(kubemodel.ResourceCPU, kubemodel.UnitMillicore, stats.Max, mcpu)
 	}
 
 	rqStatusUsedRAMRequestAverageResult, _ := rqStatusUsedRAMRequestAverageResultFuture.Await()
@@ -191,13 +197,15 @@ func (cm *CostModel) kmComputeResourceQuotas(kms *kubemodel.KubeModelSet, start,
 	rqStatusUsedCPULimitAverageResult, _ := rqStatusUsedCPULimitAverageResultFuture.Await()
 	for _, res := range rqStatusUsedCPULimitAverageResult {
 		kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
-		kms.ResourceQuotas[res.UID].Status.Used.Limits.Set(kubemodel.ResourceCPU, kubemodel.UnitCPUCore, stats.Avg, res.Data[0].Value)
+		mcpu := res.Data[0].Value * 1000
+		kms.ResourceQuotas[res.UID].Status.Used.Limits.Set(kubemodel.ResourceCPU, kubemodel.UnitMillicore, stats.Avg, mcpu)
 	}
 
 	rqStatusUsedCPULimitMaxResult, _ := rqStatusUsedCPULimitMaxResultFuture.Await()
 	for _, res := range rqStatusUsedCPULimitMaxResult {
 		kms.RegisterResourceQuota(res.UID, res.ResourceQuota, res.Namespace)
-		kms.ResourceQuotas[res.UID].Status.Used.Limits.Set(kubemodel.ResourceCPU, kubemodel.UnitCPUCore, stats.Max, res.Data[0].Value)
+		mcpu := res.Data[0].Value * 1000
+		kms.ResourceQuotas[res.UID].Status.Used.Limits.Set(kubemodel.ResourceCPU, kubemodel.UnitMillicore, stats.Max, mcpu)
 	}
 
 	rqStatusUsedRAMLimitAverageResult, _ := rqStatusUsedRAMLimitAverageResultFuture.Await()