Jelajahi Sumber

KubeModel source, exporter work

Niko Kovacevic 6 bulan lalu
induk
melakukan
f6c2733557

+ 21 - 6
core/pkg/model/kubemodel/kubemodel.go

@@ -1,14 +1,29 @@
 package kubemodel
 
-import "time"
+import (
+	"errors"
+	"time"
+)
 
-type KubeModel struct {
-	Metadata KubeModelMetadata
-	Cluster  Cluster
-	Window   Window
+type KubeModelSet struct {
+	Window         Window
+	Cluster        *Cluster
+	Namespaces     []*Namespace
+	ResourceQuotas []*ResourceQuota
+	Metadata       *KubeModelSetMetadata
 }
 
-type KubeModelMetadata struct {
+// TODO: determine what "IsEmpty()" should mean here
+func (kms *KubeModelSet) IsEmpty() bool {
+	return kms == nil
+}
+
+// TODO: generate bingen codec
+func (kms *KubeModelSet) MarshalBinary() ([]byte, error) {
+	return nil, errors.New("not implemented")
+}
+
+type KubeModelSetMetadata struct {
 	CreatedAt  time.Time
 	DataSource string
 	Warnings   []string

+ 2 - 2
core/pkg/model/kubemodel/resourcequota.go

@@ -11,8 +11,8 @@ type ResourceQuota struct {
 	NamespaceID string
 	Name        string
 	Kind        ResourceQuotaKind
-	Spec        ResourceQuotaSpec
-	Status      ResourceQuotaStatus
+	Spec        *ResourceQuotaSpec
+	Status      *ResourceQuotaStatus
 }
 
 type ResourceQuotaSpec struct {

+ 24 - 0
core/pkg/opencost/exporter/controllers.go

@@ -5,9 +5,11 @@ import (
 
 	export "github.com/opencost/opencost/core/pkg/exporter"
 	"github.com/opencost/opencost/core/pkg/log"
+	"github.com/opencost/opencost/core/pkg/model/kubemodel"
 	"github.com/opencost/opencost/core/pkg/opencost"
 	"github.com/opencost/opencost/core/pkg/opencost/exporter/allocation"
 	"github.com/opencost/opencost/core/pkg/opencost/exporter/asset"
+	exporterkubemodel "github.com/opencost/opencost/core/pkg/opencost/exporter/kubemodel"
 	"github.com/opencost/opencost/core/pkg/opencost/exporter/networkinsight"
 	"github.com/opencost/opencost/core/pkg/source"
 	"github.com/opencost/opencost/core/pkg/storage"
@@ -31,6 +33,7 @@ type PipelinesExportConfig struct {
 	AllocationPiplineResolutions      []time.Duration
 	AssetPipelineResolutons           []time.Duration
 	NetworkInsightPipelineResolutions []time.Duration
+	KubeModelPipelineResolutions      []time.Duration
 }
 
 // defaultPipelineExportResolutions returns the default export configuration for the pipeline
@@ -57,6 +60,7 @@ type PipelineExportControllers struct {
 	AllocationExportController     *export.ComputeExportControllerGroup[opencost.AllocationSet]
 	AssetExportController          *export.ComputeExportControllerGroup[opencost.AssetSet]
 	NetworkInsightExportController *export.ComputeExportControllerGroup[opencost.NetworkInsightSet]
+	KubeModelExportController      *export.ComputeExportControllerGroup[kubemodel.KubeModelSet]
 }
 
 // NewPipelineExportControllers creates a new PipelineExportControllers instance with the given cluster ID, storage implementation, cost model, and configuration.
@@ -131,10 +135,30 @@ func NewPipelineExportControllers(clusterId string, store storage.Storage, cm Co
 		networkInsightExportControllers = append(networkInsightExportControllers, networkInsightController)
 	}
 
+	// KubeModel sources and exporters
+	kubeModelSource := exporterkubemodel.NewKubeModelComputeSource(nil) // TODO: CREATE AN IMPLEMENTATION OF ComputeSource[kubemodel.KubeModelSet]
+	kubeModelExportControllers := []*export.ComputeExportController[kubemodel.KubeModelSet]{}
+
+	for _, res := range config.KubeModelPipelineResolutions {
+		if res < sourceResolution {
+			log.Warnf("Configured KubeModel pipeline resolution %dm is less than source resolution %dm. Not configuring the exporter for this resolution.", int64(res.Minutes()), int64(sourceResolution.Minutes()))
+			continue
+		}
+
+		kubeModelController, err := NewComputePipelineExportController(clusterId, store, kubeModelSource, res)
+		if err != nil {
+			log.Errorf("Failed to create KubeModel export controller for resolution: %s - %v", timeutil.DurationString(res), err)
+			continue
+		}
+
+		kubeModelExportControllers = append(kubeModelExportControllers, kubeModelController)
+	}
+
 	return &PipelineExportControllers{
 		AllocationExportController:     export.NewComputeExportControllerGroup(allocExportControllers...),
 		AssetExportController:          export.NewComputeExportControllerGroup(assetExportControllers...),
 		NetworkInsightExportController: export.NewComputeExportControllerGroup(networkInsightExportControllers...),
+		KubeModelExportController:      export.NewComputeExportControllerGroup(kubeModelExportControllers...),
 	}
 }
 

+ 43 - 0
core/pkg/opencost/exporter/kubemodel/source.go

@@ -0,0 +1,43 @@
+package kubemodel
+
+import (
+	"time"
+
+	"github.com/opencost/opencost/core/pkg/exporter"
+	"github.com/opencost/opencost/core/pkg/model/kubemodel"
+	"github.com/opencost/opencost/core/pkg/pipelines"
+)
+
+type KubeModelSource interface {
+	ComputeKubeModel(start, end time.Time) (*kubemodel.KubeModelSet, error)
+}
+
+type KubeModelComputeSource struct {
+	src KubeModelSource
+}
+
+// NewKubeModelComputeSource creates an `exporter.ComputeSource[opencost.KubeModelSet]` implementation
+func NewKubeModelComputeSource(src KubeModelSource) exporter.ComputeSource[kubemodel.KubeModelSet] {
+	return &KubeModelComputeSource{
+		src: src,
+	}
+}
+
+// CanCompute should return true iff the ComputeSource can effectively act as
+// a source of T data for the given time range. For example, a ComputeSource
+// with two-day coverage cannot fulfill a range from three days ago, and should
+// not be left to return an error in Compute. Instead, it should report that is
+// cannot compute and allow another Source to handle the computation.
+func (acs *KubeModelComputeSource) CanCompute(start, end time.Time) bool {
+	return true
+}
+
+// Compute should compute a single T for the given time range.
+func (acs *KubeModelComputeSource) Compute(start, end time.Time) (*kubemodel.KubeModelSet, error) {
+	return acs.src.ComputeKubeModel(start, end)
+}
+
+// Name returns the name of the ComputeSource
+func (acs *KubeModelComputeSource) Name() string {
+	return pipelines.KubeModelPipelineName
+}

+ 5 - 0
core/pkg/pipelines/name.go

@@ -3,6 +3,7 @@ package pipelines
 import (
 	"github.com/opencost/opencost/core/pkg/diagnostics"
 	"github.com/opencost/opencost/core/pkg/heartbeat"
+	"github.com/opencost/opencost/core/pkg/model/kubemodel"
 	"github.com/opencost/opencost/core/pkg/opencost"
 	"github.com/opencost/opencost/core/pkg/util/typeutil"
 )
@@ -16,6 +17,7 @@ const (
 	TurbonomicActionsPipelineName string = "turbonomicactions"
 	HeartbeatPipelineName         string = "heartbeat"
 	DiagnosticsPipelineName       string = "diagnostics"
+	KubeModelPipelineName         string = "kubemodel"
 )
 
 var nameByType map[string]string
@@ -37,6 +39,8 @@ func init() {
 	heartbeatKey := typeutil.TypeOf[heartbeat.Heartbeat]()
 	diagnosticsKey := typeutil.TypeOf[diagnostics.DiagnosticsRunReport]()
 
+	kubeModelSetKey := typeutil.TypeOf[kubemodel.KubeModelSet]()
+
 	nameByType = map[string]string{
 		allocSetKey:          AllocationPipelineName,
 		allocKey:             AllocationPipelineName,
@@ -48,6 +52,7 @@ func init() {
 		networkInsightKey:    NetworkInsightPipelineName,
 		heartbeatKey:         HeartbeatPipelineName,
 		diagnosticsKey:       DiagnosticsPipelineName,
+		kubeModelSetKey:      KubeModelPipelineName,
 	}
 }
 

+ 18 - 0
pkg/costmodel/kubemodel.go

@@ -0,0 +1,18 @@
+package costmodel
+
+import (
+	"errors"
+	"time"
+
+	"github.com/opencost/opencost/core/pkg/model/kubemodel"
+)
+
+// ComputeKubeModel uses the CostModel instance to compute an KubeModelSet
+// for the window defined by the given start and end times. The KubeModels
+// returned are unaggregated (i.e. down to the container level).
+func (cm *CostModel) ComputeKubeModel(start, end time.Time) (*kubemodel.KubeModelSet, error) {
+
+	// TODO: use cm.DataSource to query for metrics and hydrate a *kubemodel.KubeModelSet
+
+	return nil, errors.New("not implemented")
+}