浏览代码

delete cluster importer/exporter

Signed-off-by: r2k1 <yokree@gmail.com>
r2k1 2 年之前
父节点
当前提交
ba0f5357b4

+ 0 - 113
pkg/clustercache/clusterexporter.go

@@ -1,113 +0,0 @@
-package clustercache
-
-import (
-	"time"
-
-	"github.com/opencost/opencost/core/pkg/log"
-	"github.com/opencost/opencost/core/pkg/util/atomic"
-	"github.com/opencost/opencost/core/pkg/util/json"
-	"github.com/opencost/opencost/pkg/config"
-
-	appsv1 "k8s.io/api/apps/v1"
-	batchv1 "k8s.io/api/batch/v1"
-	v1 "k8s.io/api/core/v1"
-	policyv1 "k8s.io/api/policy/v1"
-	stv1 "k8s.io/api/storage/v1"
-)
-
-// clusterEncoding is used to represent the cluster objects in the encoded states.
-type clusterEncoding struct {
-	Namespaces             []*v1.Namespace                 `json:"namespaces,omitempty"`
-	Nodes                  []*v1.Node                      `json:"nodes,omitempty"`
-	Pods                   []*v1.Pod                       `json:"pods,omitempty"`
-	Services               []*v1.Service                   `json:"services,omitempty"`
-	DaemonSets             []*appsv1.DaemonSet             `json:"daemonSets,omitempty"`
-	Deployments            []*appsv1.Deployment            `json:"deployments,omitempty"`
-	StatefulSets           []*appsv1.StatefulSet           `json:"statefulSets,omitempty"`
-	ReplicaSets            []*appsv1.ReplicaSet            `json:"replicaSets,omitempty"`
-	PersistentVolumes      []*v1.PersistentVolume          `json:"persistentVolumes,omitempty"`
-	PersistentVolumeClaims []*v1.PersistentVolumeClaim     `json:"persistentVolumeClaims,omitempty"`
-	StorageClasses         []*stv1.StorageClass            `json:"storageClasses,omitempty"`
-	Jobs                   []*batchv1.Job                  `json:"jobs,omitempty"`
-	PodDisruptionBudgets   []*policyv1.PodDisruptionBudget `json:"podDisruptionBudgets,omitempty"`
-	ReplicationControllers []*v1.ReplicationController     `json:"replicationController,omitempty"`
-}
-
-// ClusterExporter manages and runs an file export process which dumps the local kubernetes cluster to a target location.
-type ClusterExporter struct {
-	cluster  ClusterCache
-	target   *config.ConfigFile
-	interval time.Duration
-	runState atomic.AtomicRunState
-}
-
-// NewClusterExporter creates a new ClusterExporter instance for exporting the kubernetes cluster.
-func NewClusterExporter(cluster ClusterCache, target *config.ConfigFile, interval time.Duration) *ClusterExporter {
-	return &ClusterExporter{
-		cluster:  cluster,
-		target:   target,
-		interval: interval,
-	}
-}
-
-// Run starts the automated process of running Export on a specific interval.
-func (ce *ClusterExporter) Run() {
-	// in the event there is a race that occurs between Run() and Stop(), we
-	// ensure that we wait for the reset to occur before starting again
-	ce.runState.WaitForReset()
-
-	if !ce.runState.Start() {
-		log.Warnf("ClusterExporter already running")
-		return
-	}
-
-	go func() {
-		for {
-			err := ce.Export()
-			if err != nil {
-				log.Warnf("Failed to export cluster: %s", err)
-			}
-
-			select {
-			case <-time.After(ce.interval):
-			case <-ce.runState.OnStop():
-				ce.runState.Reset()
-				return
-			}
-		}
-	}()
-}
-
-// Stop halts the Cluster export on an interval
-func (ce *ClusterExporter) Stop() {
-	ce.runState.Stop()
-}
-
-// Export stores the cluster cache data into a PODO, marshals as JSON, and saves it to the
-// target location.
-func (ce *ClusterExporter) Export() error {
-	c := ce.cluster
-	encoding := &clusterEncoding{
-		Namespaces:             c.GetAllNamespaces(),
-		Nodes:                  c.GetAllNodes(),
-		Pods:                   c.GetAllPods(),
-		Services:               c.GetAllServices(),
-		DaemonSets:             c.GetAllDaemonSets(),
-		Deployments:            c.GetAllDeployments(),
-		StatefulSets:           c.GetAllStatefulSets(),
-		ReplicaSets:            c.GetAllReplicaSets(),
-		PersistentVolumes:      c.GetAllPersistentVolumes(),
-		PersistentVolumeClaims: c.GetAllPersistentVolumeClaims(),
-		StorageClasses:         c.GetAllStorageClasses(),
-		Jobs:                   c.GetAllJobs(),
-		PodDisruptionBudgets:   c.GetAllPodDisruptionBudgets(),
-		ReplicationControllers: c.GetAllReplicationControllers(),
-	}
-
-	data, err := json.Marshal(encoding)
-	if err != nil {
-		return err
-	}
-
-	return ce.target.Write(data)
-}

+ 0 - 307
pkg/clustercache/clusterimporter.go

@@ -1,307 +0,0 @@
-package clustercache
-
-import (
-	"sync"
-
-	"github.com/opencost/opencost/core/pkg/log"
-	"github.com/opencost/opencost/core/pkg/util/json"
-	"github.com/opencost/opencost/pkg/config"
-	appsv1 "k8s.io/api/apps/v1"
-	batchv1 "k8s.io/api/batch/v1"
-	v1 "k8s.io/api/core/v1"
-	policyv1 "k8s.io/api/policy/v1"
-	stv1 "k8s.io/api/storage/v1"
-)
-
-// ClusterImporter is an implementation of ClusterCache which leverages a backing configuration file
-// as it's source of the cluster data.
-type ClusterImporter struct {
-	source          *config.ConfigFile
-	sourceHandlerID config.HandlerID
-	dataLock        *sync.Mutex
-	data            *clusterEncoding
-}
-
-// Creates a new ClusterCache implementation which uses an import process to provide cluster data
-func NewClusterImporter(source *config.ConfigFile) ClusterCache {
-	return &ClusterImporter{
-		source:   source,
-		dataLock: new(sync.Mutex),
-		data:     new(clusterEncoding),
-	}
-}
-
-// onImportSourceChanged handles the source data updating
-func (ci *ClusterImporter) onImportSourceChanged(changeType config.ChangeType, data []byte) {
-	if changeType == config.ChangeTypeDeleted {
-		ci.dataLock.Lock()
-		ci.data = new(clusterEncoding)
-		ci.dataLock.Unlock()
-		return
-	}
-
-	ci.update(data)
-}
-
-// update replaces the underlying cluster data with the provided new data if it decodes
-func (ci *ClusterImporter) update(data []byte) {
-	ce := new(clusterEncoding)
-	err := json.Unmarshal(data, ce)
-	if err != nil {
-		log.Warnf("Failed to unmarshal cluster during import: %s", err)
-		return
-	}
-
-	ci.dataLock.Lock()
-	ci.data = ce
-	ci.dataLock.Unlock()
-}
-
-// Run starts the watcher processes
-func (ci *ClusterImporter) Run() {
-	if ci.source == nil {
-		log.Errorf("ClusterImporter source does not exist, not running")
-		return
-	}
-
-	exists, err := ci.source.Exists()
-	if err != nil {
-		log.Errorf("Failed to import source for cluster: %s", err)
-		return
-	}
-
-	if exists {
-		data, err := ci.source.Read()
-		if err != nil {
-			log.Warnf("Failed to import cluster: %s", err)
-		} else {
-			ci.update(data)
-		}
-	}
-
-	ci.sourceHandlerID = ci.source.AddChangeHandler(ci.onImportSourceChanged)
-}
-
-// Stops the watcher processes
-func (ci *ClusterImporter) Stop() {
-	if ci.sourceHandlerID != "" {
-		ci.source.RemoveChangeHandler(ci.sourceHandlerID)
-		ci.sourceHandlerID = ""
-	}
-}
-
-// GetAllNamespaces returns all the cached namespaces
-func (ci *ClusterImporter) GetAllNamespaces() []*v1.Namespace {
-	ci.dataLock.Lock()
-	defer ci.dataLock.Unlock()
-
-	// Deep copy here to avoid callers from corrupting the cache
-	// This also mimics the behavior of the default cluster cache impl.
-	namespaces := ci.data.Namespaces
-	cloneList := make([]*v1.Namespace, 0, len(namespaces))
-	for _, v := range namespaces {
-		cloneList = append(cloneList, v.DeepCopy())
-	}
-	return cloneList
-}
-
-// GetAllNodes returns all the cached nodes
-func (ci *ClusterImporter) GetAllNodes() []*v1.Node {
-	ci.dataLock.Lock()
-	defer ci.dataLock.Unlock()
-
-	// Deep copy here to avoid callers from corrupting the cache
-	// This also mimics the behavior of the default cluster cache impl.
-	nodes := ci.data.Nodes
-	cloneList := make([]*v1.Node, 0, len(nodes))
-	for _, v := range nodes {
-		cloneList = append(cloneList, v.DeepCopy())
-	}
-	return cloneList
-}
-
-// GetAllPods returns all the cached pods
-func (ci *ClusterImporter) GetAllPods() []*v1.Pod {
-	ci.dataLock.Lock()
-	defer ci.dataLock.Unlock()
-
-	// Deep copy here to avoid callers from corrupting the cache
-	// This also mimics the behavior of the default cluster cache impl.
-	pods := ci.data.Pods
-	cloneList := make([]*v1.Pod, 0, len(pods))
-	for _, v := range pods {
-		cloneList = append(cloneList, v.DeepCopy())
-	}
-	return cloneList
-}
-
-// GetAllServices returns all the cached services
-func (ci *ClusterImporter) GetAllServices() []*v1.Service {
-	ci.dataLock.Lock()
-	defer ci.dataLock.Unlock()
-
-	// Deep copy here to avoid callers from corrupting the cache
-	// This also mimics the behavior of the default cluster cache impl.
-	services := ci.data.Services
-	cloneList := make([]*v1.Service, 0, len(services))
-	for _, v := range services {
-		cloneList = append(cloneList, v.DeepCopy())
-	}
-	return cloneList
-}
-
-// GetAllDaemonSets returns all the cached DaemonSets
-func (ci *ClusterImporter) GetAllDaemonSets() []*appsv1.DaemonSet {
-	ci.dataLock.Lock()
-	defer ci.dataLock.Unlock()
-
-	// Deep copy here to avoid callers from corrupting the cache
-	// This also mimics the behavior of the default cluster cache impl.
-	daemonSets := ci.data.DaemonSets
-	cloneList := make([]*appsv1.DaemonSet, 0, len(daemonSets))
-	for _, v := range daemonSets {
-		cloneList = append(cloneList, v.DeepCopy())
-	}
-	return cloneList
-}
-
-// GetAllDeployments returns all the cached deployments
-func (ci *ClusterImporter) GetAllDeployments() []*appsv1.Deployment {
-	ci.dataLock.Lock()
-	defer ci.dataLock.Unlock()
-
-	// Deep copy here to avoid callers from corrupting the cache
-	// This also mimics the behavior of the default cluster cache impl.
-	deployments := ci.data.Deployments
-	cloneList := make([]*appsv1.Deployment, 0, len(deployments))
-	for _, v := range deployments {
-		cloneList = append(cloneList, v.DeepCopy())
-	}
-	return cloneList
-}
-
-// GetAllStatfulSets returns all the cached StatefulSets
-func (ci *ClusterImporter) GetAllStatefulSets() []*appsv1.StatefulSet {
-	ci.dataLock.Lock()
-	defer ci.dataLock.Unlock()
-
-	// Deep copy here to avoid callers from corrupting the cache
-	// This also mimics the behavior of the default cluster cache impl.
-	statefulSets := ci.data.StatefulSets
-	cloneList := make([]*appsv1.StatefulSet, 0, len(statefulSets))
-	for _, v := range statefulSets {
-		cloneList = append(cloneList, v.DeepCopy())
-	}
-	return cloneList
-}
-
-// GetAllReplicaSets returns all the cached ReplicaSets
-func (ci *ClusterImporter) GetAllReplicaSets() []*appsv1.ReplicaSet {
-	ci.dataLock.Lock()
-	defer ci.dataLock.Unlock()
-
-	// Deep copy here to avoid callers from corrupting the cache
-	// This also mimics the behavior of the default cluster cache impl.
-	replicaSets := ci.data.ReplicaSets
-	cloneList := make([]*appsv1.ReplicaSet, 0, len(replicaSets))
-	for _, v := range replicaSets {
-		cloneList = append(cloneList, v.DeepCopy())
-	}
-	return cloneList
-}
-
-// GetAllPersistentVolumes returns all the cached persistent volumes
-func (ci *ClusterImporter) GetAllPersistentVolumes() []*v1.PersistentVolume {
-	ci.dataLock.Lock()
-	defer ci.dataLock.Unlock()
-
-	// Deep copy here to avoid callers from corrupting the cache
-	// This also mimics the behavior of the default cluster cache impl.
-	pvs := ci.data.PersistentVolumes
-	cloneList := make([]*v1.PersistentVolume, 0, len(pvs))
-	for _, v := range pvs {
-		cloneList = append(cloneList, v.DeepCopy())
-	}
-	return cloneList
-}
-
-// GetAllPersistentVolumeClaims returns all the cached persistent volume claims
-func (ci *ClusterImporter) GetAllPersistentVolumeClaims() []*v1.PersistentVolumeClaim {
-	ci.dataLock.Lock()
-	defer ci.dataLock.Unlock()
-
-	// Deep copy here to avoid callers from corrupting the cache
-	// This also mimics the behavior of the default cluster cache impl.
-	pvcs := ci.data.PersistentVolumeClaims
-	cloneList := make([]*v1.PersistentVolumeClaim, 0, len(pvcs))
-	for _, v := range pvcs {
-		cloneList = append(cloneList, v.DeepCopy())
-	}
-	return cloneList
-}
-
-// GetAllStorageClasses returns all the cached storage classes
-func (ci *ClusterImporter) GetAllStorageClasses() []*stv1.StorageClass {
-	ci.dataLock.Lock()
-	defer ci.dataLock.Unlock()
-
-	// Deep copy here to avoid callers from corrupting the cache
-	// This also mimics the behavior of the default cluster cache impl.
-	storageClasses := ci.data.StorageClasses
-	cloneList := make([]*stv1.StorageClass, 0, len(storageClasses))
-	for _, v := range storageClasses {
-		cloneList = append(cloneList, v.DeepCopy())
-	}
-	return cloneList
-}
-
-// GetAllJobs returns all the cached jobs
-func (ci *ClusterImporter) GetAllJobs() []*batchv1.Job {
-	ci.dataLock.Lock()
-	defer ci.dataLock.Unlock()
-
-	// Deep copy here to avoid callers from corrupting the cache
-	// This also mimics the behavior of the default cluster cache impl.
-	jobs := ci.data.Jobs
-	cloneList := make([]*batchv1.Job, 0, len(jobs))
-	for _, v := range jobs {
-		cloneList = append(cloneList, v.DeepCopy())
-	}
-	return cloneList
-}
-
-// GetAllPodDisruptionBudgets returns all cached pod disruption budgets
-func (ci *ClusterImporter) GetAllPodDisruptionBudgets() []*policyv1.PodDisruptionBudget {
-	ci.dataLock.Lock()
-	defer ci.dataLock.Unlock()
-
-	// Deep copy here to avoid callers from corrupting the cache
-	// This also mimics the behavior of the default cluster cache impl.
-	pdbs := ci.data.PodDisruptionBudgets
-	cloneList := make([]*policyv1.PodDisruptionBudget, 0, len(pdbs))
-	for _, v := range pdbs {
-		cloneList = append(cloneList, v.DeepCopy())
-	}
-	return cloneList
-}
-
-func (ci *ClusterImporter) GetAllReplicationControllers() []*v1.ReplicationController {
-	ci.dataLock.Lock()
-	defer ci.dataLock.Unlock()
-
-	// Deep copy here to avoid callers from corrupting the cache
-	// This also mimics the behavior of the default cluster cache impl.
-	rcs := ci.data.ReplicationControllers
-	cloneList := make([]*v1.ReplicationController, 0, len(rcs))
-	for _, v := range rcs {
-		cloneList = append(cloneList, v.DeepCopy())
-	}
-	return cloneList
-}
-
-// SetConfigMapUpdateFunc sets the configmap update function
-func (ci *ClusterImporter) SetConfigMapUpdateFunc(_ func(interface{})) {
-	// TODO: (bolt) This function is still a bit strange to me for the ClusterCache interface.
-	// TODO: (bolt) no-op for now.
-	log.Warnf("SetConfigMapUpdateFunc is disabled for imported cluster data.")
-}

+ 0 - 11
pkg/cmd/agent/agent.go

@@ -39,10 +39,6 @@ type AgentOpts struct {
 // ClusterExportInterval is the interval used to export the cluster if env.IsExportClusterCacheEnabled() is true
 const ClusterExportInterval = 5 * time.Minute
 
-// clusterExporter is used if env.IsExportClusterCacheEnabled() is set to true
-// it will export the kubernetes cluster data to a file on a specific interval
-var clusterExporter *clustercache.ClusterExporter
-
 func Healthz(w http.ResponseWriter, _ *http.Request) {
 	w.WriteHeader(200)
 	w.Header().Set("Content-Length", "0")
@@ -188,13 +184,6 @@ func Execute(opts *AgentOpts) error {
 
 	configPrefix := env.GetConfigPathWithDefault(env.DefaultConfigMountPath)
 
-	// Initialize cluster exporting if it's enabled
-	if env.IsExportClusterCacheEnabled() {
-		cacheLocation := confManager.ConfigFileAt(path.Join(configPrefix, "cluster-cache.json"))
-		clusterExporter = clustercache.NewClusterExporter(clusterCache, cacheLocation, ClusterExportInterval)
-		clusterExporter.Run()
-	}
-
 	// ClusterInfo Provider to provide the cluster map with local and remote cluster data
 	localClusterInfo := costmodel.NewLocalClusterInfoProvider(k8sClient, cloudProvider)
 

+ 1 - 7
pkg/costmodel/router.go

@@ -1602,13 +1602,7 @@ func Initialize(additionalConfigWatchers ...*watcher.ConfigMapWatcher) *Accesses
 	configPrefix := env.GetConfigPathWithDefault("/var/configs/")
 
 	// Create Kubernetes Cluster Cache + Watchers
-	var k8sCache clustercache.ClusterCache
-	if env.IsClusterCacheFileEnabled() {
-		importLocation := confManager.ConfigFileAt(path.Join(configPrefix, "cluster-cache.json"))
-		k8sCache = clustercache.NewClusterImporter(importLocation)
-	} else {
-		k8sCache = clustercache.NewKubernetesClusterCache(kubeClientset)
-	}
+	k8sCache := clustercache.NewKubernetesClusterCache(kubeClientset)
 	k8sCache.Run()
 
 	cloudProviderKey := env.GetCloudProviderAPIKey()

+ 2 - 9
pkg/env/costmodelenv.go

@@ -89,9 +89,8 @@ const (
 	MetricsConfigmapName  = "METRICS_CONFIGMAP_NAME"
 	KubecostJobNameEnvVar = "KUBECOST_JOB_NAME"
 
-	KubecostConfigBucketEnvVar    = "KUBECOST_CONFIG_BUCKET"
-	ClusterInfoFileEnabledEnvVar  = "CLUSTER_INFO_FILE_ENABLED"
-	ClusterCacheFileEnabledEnvVar = "CLUSTER_CACHE_FILE_ENABLED"
+	KubecostConfigBucketEnvVar   = "KUBECOST_CONFIG_BUCKET"
+	ClusterInfoFileEnabledEnvVar = "CLUSTER_INFO_FILE_ENABLED"
 
 	PrometheusQueryOffsetEnvVar                 = "PROMETHEUS_QUERY_OFFSET"
 	PrometheusRetryOnRateLimitResponseEnvVar    = "PROMETHEUS_RETRY_ON_RATE_LIMIT"
@@ -186,12 +185,6 @@ func IsClusterInfoFileEnabled() bool {
 	return env.GetBool(ClusterInfoFileEnabledEnvVar, false)
 }
 
-// IsClusterCacheFileEnabled returns true if the kubernetes cluster data is read from a file or pulled from the local
-// kubernetes API.
-func IsClusterCacheFileEnabled() bool {
-	return env.GetBool(ClusterCacheFileEnabledEnvVar, false)
-}
-
 // IsPrometheusRetryOnRateLimitResponse will attempt to retry if a 429 response is received OR a 400 with a body containing
 // ThrottleException (common in AWS services like AMP)
 func IsPrometheusRetryOnRateLimitResponse() bool {

+ 0 - 7
pkg/env/kubemetricsenv.go

@@ -5,7 +5,6 @@ import "github.com/opencost/opencost/core/pkg/env"
 const (
 	KubecostMetricsPodEnabledEnvVar = "KUBECOST_METRICS_POD_ENABLED"
 	KubecostMetricsPodPortEnvVar    = "KUBECOST_METRICS_PORT"
-	ExportClusterCacheEnabledEnvVar = "EXPORT_CLUSTER_CACHE_ENABLED"
 	ExportClusterInfoEnabledEnvVar  = "EXPORT_CLUSTER_INFO_ENABLED"
 )
 
@@ -18,12 +17,6 @@ func IsKubecostMetricsPodEnabled() bool {
 	return env.GetBool(KubecostMetricsPodEnabledEnvVar, false)
 }
 
-// IsExportClusterCacheEnabled is set to true if the metrics pod should export the cluster cache
-// data to a target file location
-func IsExportClusterCacheEnabled() bool {
-	return env.GetBool(ExportClusterCacheEnabledEnvVar, false)
-}
-
 // IsExportClusterInfoEnabled is set to true if the metrics pod should export its own cluster info
 // data to a target file location
 func IsExportClusterInfoEnabled() bool {