Просмотр исходного кода

return cluster importer/exporter

Signed-off-by: r2k1 <yokree@gmail.com>
r2k1 2 лет назад
Родитель
Сommit
7a35ecf8f3

+ 107 - 0
pkg/clustercache/clusterexporter.go

@@ -0,0 +1,107 @@
+package clustercache
+
+import (
+	"time"
+
+	"github.com/opencost/opencost/core/pkg/log"
+	"github.com/opencost/opencost/core/pkg/util/atomic"
+	"github.com/opencost/opencost/core/pkg/util/json"
+	"github.com/opencost/opencost/pkg/config"
+)
+
+// clusterEncoding is used to represent the cluster objects in the encoded states.
+type clusterEncoding struct {
+	Namespaces             []*Namespace             `json:"namespaces,omitempty"`
+	Nodes                  []*Node                  `json:"nodes,omitempty"`
+	Pods                   []*Pod                   `json:"pods,omitempty"`
+	Services               []*Service               `json:"services,omitempty"`
+	DaemonSets             []*DaemonSet             `json:"daemonSets,omitempty"`
+	Deployments            []*Deployment            `json:"deployments,omitempty"`
+	StatefulSets           []*StatefulSet           `json:"statefulSets,omitempty"`
+	ReplicaSets            []*ReplicaSet            `json:"replicaSets,omitempty"`
+	PersistentVolumes      []*PersistentVolume      `json:"persistentVolumes,omitempty"`
+	PersistentVolumeClaims []*PersistentVolumeClaim `json:"persistentVolumeClaims,omitempty"`
+	StorageClasses         []*StorageClass          `json:"storageClasses,omitempty"`
+	Jobs                   []*Job                   `json:"jobs,omitempty"`
+	PodDisruptionBudgets   []*PodDisruptionBudget   `json:"podDisruptionBudgets,omitempty"`
+	ReplicationControllers []*ReplicationController `json:"replicationController,omitempty"`
+}
+
+// ClusterExporter manages and runs an file export process which dumps the local kubernetes cluster to a target location.
+type ClusterExporter struct {
+	cluster  ClusterCache
+	target   *config.ConfigFile
+	interval time.Duration
+	runState atomic.AtomicRunState
+}
+
+// NewClusterExporter creates a new ClusterExporter instance for exporting the kubernetes cluster.
+func NewClusterExporter(cluster ClusterCache, target *config.ConfigFile, interval time.Duration) *ClusterExporter {
+	return &ClusterExporter{
+		cluster:  cluster,
+		target:   target,
+		interval: interval,
+	}
+}
+
+// Run starts the automated process of running Export on a specific interval.
+func (ce *ClusterExporter) Run() {
+	// in the event there is a race that occurs between Run() and Stop(), we
+	// ensure that we wait for the reset to occur before starting again
+	ce.runState.WaitForReset()
+
+	if !ce.runState.Start() {
+		log.Warnf("ClusterExporter already running")
+		return
+	}
+
+	go func() {
+		for {
+			err := ce.Export()
+			if err != nil {
+				log.Warnf("Failed to export cluster: %s", err)
+			}
+
+			select {
+			case <-time.After(ce.interval):
+			case <-ce.runState.OnStop():
+				ce.runState.Reset()
+				return
+			}
+		}
+	}()
+}
+
+// Stop halts the Cluster export on an interval
+func (ce *ClusterExporter) Stop() {
+	ce.runState.Stop()
+}
+
+// Export stores the cluster cache data into a PODO, marshals as JSON, and saves it to the
+// target location.
+func (ce *ClusterExporter) Export() error {
+	c := ce.cluster
+	encoding := &clusterEncoding{
+		Namespaces:             c.GetAllNamespaces(),
+		Nodes:                  c.GetAllNodes(),
+		Pods:                   c.GetAllPods(),
+		Services:               c.GetAllServices(),
+		DaemonSets:             c.GetAllDaemonSets(),
+		Deployments:            c.GetAllDeployments(),
+		StatefulSets:           c.GetAllStatefulSets(),
+		ReplicaSets:            c.GetAllReplicaSets(),
+		PersistentVolumes:      c.GetAllPersistentVolumes(),
+		PersistentVolumeClaims: c.GetAllPersistentVolumeClaims(),
+		StorageClasses:         c.GetAllStorageClasses(),
+		Jobs:                   c.GetAllJobs(),
+		PodDisruptionBudgets:   c.GetAllPodDisruptionBudgets(),
+		ReplicationControllers: c.GetAllReplicationControllers(),
+	}
+
+	data, err := json.Marshal(encoding)
+	if err != nil {
+		return err
+	}
+
+	return ce.target.Write(data)
+}

+ 205 - 0
pkg/clustercache/clusterimporter.go

@@ -0,0 +1,205 @@
+package clustercache
+
+import (
+	"sync"
+
+	"github.com/opencost/opencost/core/pkg/log"
+	"github.com/opencost/opencost/core/pkg/util/json"
+	"github.com/opencost/opencost/pkg/config"
+	"golang.org/x/exp/slices"
+)
+
+// ClusterImporter is an implementation of ClusterCache which leverages a backing configuration file
+// as it's source of the cluster data.
+type ClusterImporter struct {
+	source          *config.ConfigFile
+	sourceHandlerID config.HandlerID
+	dataLock        *sync.Mutex
+	data            *clusterEncoding
+}
+
+// Creates a new ClusterCache implementation which uses an import process to provide cluster data
+func NewClusterImporter(source *config.ConfigFile) ClusterCache {
+	return &ClusterImporter{
+		source:   source,
+		dataLock: new(sync.Mutex),
+		data:     new(clusterEncoding),
+	}
+}
+
+// onImportSourceChanged handles the source data updating
+func (ci *ClusterImporter) onImportSourceChanged(changeType config.ChangeType, data []byte) {
+	if changeType == config.ChangeTypeDeleted {
+		ci.dataLock.Lock()
+		ci.data = new(clusterEncoding)
+		ci.dataLock.Unlock()
+		return
+	}
+
+	ci.update(data)
+}
+
+// update replaces the underlying cluster data with the provided new data if it decodes
+func (ci *ClusterImporter) update(data []byte) {
+	ce := new(clusterEncoding)
+	err := json.Unmarshal(data, ce)
+	if err != nil {
+		log.Warnf("Failed to unmarshal cluster during import: %s", err)
+		return
+	}
+
+	ci.dataLock.Lock()
+	ci.data = ce
+	ci.dataLock.Unlock()
+}
+
+// Run starts the watcher processes
+func (ci *ClusterImporter) Run() {
+	if ci.source == nil {
+		log.Errorf("ClusterImporter source does not exist, not running")
+		return
+	}
+
+	exists, err := ci.source.Exists()
+	if err != nil {
+		log.Errorf("Failed to import source for cluster: %s", err)
+		return
+	}
+
+	if exists {
+		data, err := ci.source.Read()
+		if err != nil {
+			log.Warnf("Failed to import cluster: %s", err)
+		} else {
+			ci.update(data)
+		}
+	}
+
+	ci.sourceHandlerID = ci.source.AddChangeHandler(ci.onImportSourceChanged)
+}
+
+// Stops the watcher processes
+func (ci *ClusterImporter) Stop() {
+	if ci.sourceHandlerID != "" {
+		ci.source.RemoveChangeHandler(ci.sourceHandlerID)
+		ci.sourceHandlerID = ""
+	}
+}
+
+// GetAllNamespaces returns all the cached namespaces
+func (ci *ClusterImporter) GetAllNamespaces() []*Namespace {
+	ci.dataLock.Lock()
+	defer ci.dataLock.Unlock()
+
+	return slices.Clone(ci.data.Namespaces)
+}
+
+// GetAllNodes returns all the cached nodes
+func (ci *ClusterImporter) GetAllNodes() []*Node {
+	ci.dataLock.Lock()
+	defer ci.dataLock.Unlock()
+
+	return slices.Clone(ci.data.Nodes)
+}
+
+// GetAllPods returns all the cached pods
+func (ci *ClusterImporter) GetAllPods() []*Pod {
+	ci.dataLock.Lock()
+	defer ci.dataLock.Unlock()
+
+	return slices.Clone(ci.data.Pods)
+}
+
+// GetAllServices returns all the cached services
+func (ci *ClusterImporter) GetAllServices() []*Service {
+	ci.dataLock.Lock()
+	defer ci.dataLock.Unlock()
+
+	return slices.Clone(ci.data.Services)
+}
+
+// GetAllDaemonSets returns all the cached DaemonSets
+func (ci *ClusterImporter) GetAllDaemonSets() []*DaemonSet {
+	ci.dataLock.Lock()
+	defer ci.dataLock.Unlock()
+
+	return slices.Clone(ci.data.DaemonSets)
+}
+
+// GetAllDeployments returns all the cached deployments
+func (ci *ClusterImporter) GetAllDeployments() []*Deployment {
+	ci.dataLock.Lock()
+	defer ci.dataLock.Unlock()
+
+	return slices.Clone(ci.data.Deployments)
+}
+
+// GetAllStatfulSets returns all the cached StatefulSets
+func (ci *ClusterImporter) GetAllStatefulSets() []*StatefulSet {
+	ci.dataLock.Lock()
+	defer ci.dataLock.Unlock()
+
+	return slices.Clone(ci.data.StatefulSets)
+}
+
+// GetAllReplicaSets returns all the cached ReplicaSets
+func (ci *ClusterImporter) GetAllReplicaSets() []*ReplicaSet {
+	ci.dataLock.Lock()
+	defer ci.dataLock.Unlock()
+
+	return slices.Clone(ci.data.ReplicaSets)
+}
+
+// GetAllPersistentVolumes returns all the cached persistent volumes
+func (ci *ClusterImporter) GetAllPersistentVolumes() []*PersistentVolume {
+	ci.dataLock.Lock()
+	defer ci.dataLock.Unlock()
+
+	return slices.Clone(ci.data.PersistentVolumes)
+}
+
+// GetAllPersistentVolumeClaims returns all the cached persistent volume claims
+func (ci *ClusterImporter) GetAllPersistentVolumeClaims() []*PersistentVolumeClaim {
+	ci.dataLock.Lock()
+	defer ci.dataLock.Unlock()
+
+	return slices.Clone(ci.data.PersistentVolumeClaims)
+}
+
+// GetAllStorageClasses returns all the cached storage classes
+func (ci *ClusterImporter) GetAllStorageClasses() []*StorageClass {
+	ci.dataLock.Lock()
+	defer ci.dataLock.Unlock()
+
+	return slices.Clone(ci.data.StorageClasses)
+}
+
+// GetAllJobs returns all the cached jobs
+func (ci *ClusterImporter) GetAllJobs() []*Job {
+	ci.dataLock.Lock()
+	defer ci.dataLock.Unlock()
+
+	return slices.Clone(ci.data.Jobs)
+}
+
+// GetAllPodDisruptionBudgets returns all cached pod disruption budgets
+func (ci *ClusterImporter) GetAllPodDisruptionBudgets() []*PodDisruptionBudget {
+	ci.dataLock.Lock()
+	defer ci.dataLock.Unlock()
+
+	return slices.Clone(ci.data.PodDisruptionBudgets)
+}
+
+func (ci *ClusterImporter) GetAllReplicationControllers() []*ReplicationController {
+	ci.dataLock.Lock()
+	defer ci.dataLock.Unlock()
+
+	return slices.Clone(ci.data.ReplicationControllers)
+}
+
+// SetConfigMapUpdateFunc sets the configmap update function
+func (ci *ClusterImporter) SetConfigMapUpdateFunc(_ func(interface{})) {
+	// TODO: (bolt) This function is still a bit strange to me for the ClusterCache interface.
+	// TODO: (bolt) no-op for now.
+	log.Warnf("SetConfigMapUpdateFunc is disabled for imported cluster data.")
+}

+ 11 - 0
pkg/cmd/agent/agent.go

@@ -39,6 +39,10 @@ type AgentOpts struct {
 // ClusterExportInterval is the interval used to export the cluster if env.IsExportClusterCacheEnabled() is true
 const ClusterExportInterval = 5 * time.Minute
 
+// clusterExporter is used if env.IsExportClusterCacheEnabled() is set to true
+// it will export the kubernetes cluster data to a file on a specific interval
+var clusterExporter *clustercache.ClusterExporter
+
 func Healthz(w http.ResponseWriter, _ *http.Request) {
 	w.WriteHeader(200)
 	w.Header().Set("Content-Length", "0")
@@ -184,6 +188,13 @@ func Execute(opts *AgentOpts) error {
 
 	configPrefix := env.GetConfigPathWithDefault(env.DefaultConfigMountPath)
 
+	// Initialize cluster exporting if it's enabled
+	if env.IsExportClusterCacheEnabled() {
+		cacheLocation := confManager.ConfigFileAt(path.Join(configPrefix, "cluster-cache.json"))
+		clusterExporter = clustercache.NewClusterExporter(clusterCache, cacheLocation, ClusterExportInterval)
+		clusterExporter.Run()
+	}
+
 	// ClusterInfo Provider to provide the cluster map with local and remote cluster data
 	localClusterInfo := costmodel.NewLocalClusterInfoProvider(k8sClient, cloudProvider)
 

+ 9 - 2
pkg/env/costmodelenv.go

@@ -89,8 +89,9 @@ const (
 	MetricsConfigmapName  = "METRICS_CONFIGMAP_NAME"
 	KubecostJobNameEnvVar = "KUBECOST_JOB_NAME"
 
-	KubecostConfigBucketEnvVar   = "KUBECOST_CONFIG_BUCKET"
-	ClusterInfoFileEnabledEnvVar = "CLUSTER_INFO_FILE_ENABLED"
+	KubecostConfigBucketEnvVar    = "KUBECOST_CONFIG_BUCKET"
+	ClusterInfoFileEnabledEnvVar  = "CLUSTER_INFO_FILE_ENABLED"
+	ClusterCacheFileEnabledEnvVar = "CLUSTER_CACHE_FILE_ENABLED"
 
 	PrometheusQueryOffsetEnvVar                 = "PROMETHEUS_QUERY_OFFSET"
 	PrometheusRetryOnRateLimitResponseEnvVar    = "PROMETHEUS_RETRY_ON_RATE_LIMIT"
@@ -185,6 +186,12 @@ func IsClusterInfoFileEnabled() bool {
 	return env.GetBool(ClusterInfoFileEnabledEnvVar, false)
 }
 
+// IsClusterCacheFileEnabled returns true if the kubernetes cluster data is read from a file or pulled from the local
+// kubernetes API.
+func IsClusterCacheFileEnabled() bool {
+	return env.GetBool(ClusterCacheFileEnabledEnvVar, false)
+}
+
 // IsPrometheusRetryOnRateLimitResponse will attempt to retry if a 429 response is received OR a 400 with a body containing
 // ThrottleException (common in AWS services like AMP)
 func IsPrometheusRetryOnRateLimitResponse() bool {

+ 7 - 0
pkg/env/kubemetricsenv.go

@@ -5,6 +5,7 @@ import "github.com/opencost/opencost/core/pkg/env"
 const (
 	KubecostMetricsPodEnabledEnvVar = "KUBECOST_METRICS_POD_ENABLED"
 	KubecostMetricsPodPortEnvVar    = "KUBECOST_METRICS_PORT"
+	ExportClusterCacheEnabledEnvVar = "EXPORT_CLUSTER_CACHE_ENABLED"
 	ExportClusterInfoEnabledEnvVar  = "EXPORT_CLUSTER_INFO_ENABLED"
 )
 
@@ -17,6 +18,12 @@ func IsKubecostMetricsPodEnabled() bool {
 	return env.GetBool(KubecostMetricsPodEnabledEnvVar, false)
 }
 
+// IsExportClusterCacheEnabled is set to true if the metrics pod should export the cluster cache
+// data to a target file location
+func IsExportClusterCacheEnabled() bool {
+	return env.GetBool(ExportClusterCacheEnabledEnvVar, false)
+}
+
 // IsExportClusterInfoEnabled is set to true if the metrics pod should export its own cluster info
 // data to a target file location
 func IsExportClusterInfoEnabled() bool {