| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175 |
- package agent
- import (
- "context"
- "fmt"
- "net/http"
- "time"
- "github.com/opencost/opencost/core/pkg/clusters"
- "github.com/opencost/opencost/core/pkg/log"
- "github.com/opencost/opencost/core/pkg/source"
- "github.com/opencost/opencost/core/pkg/util/retry"
- "github.com/opencost/opencost/pkg/util/watcher"
- "github.com/opencost/opencost/core/pkg/clustercache"
- "github.com/opencost/opencost/core/pkg/kubeconfig"
- "github.com/opencost/opencost/core/pkg/version"
- "github.com/opencost/opencost/modules/prometheus-source/pkg/prom"
- "github.com/opencost/opencost/pkg/cloud/provider"
- cluster "github.com/opencost/opencost/pkg/clustercache"
- "github.com/opencost/opencost/pkg/config"
- "github.com/opencost/opencost/pkg/costmodel"
- "github.com/opencost/opencost/pkg/env"
- "github.com/opencost/opencost/pkg/metrics"
- "github.com/prometheus/client_golang/prometheus/promhttp"
- "github.com/rs/cors"
- "k8s.io/client-go/kubernetes"
- )
- // AgentOpts contain configuration options that can be passed to the Execute() method
- type AgentOpts struct {
- // Port is the port the agent will bind to
- Port int
- // Stubbed for future configuration
- }
- // ClusterExportInterval is the interval used to export the cluster if env.IsExportClusterCacheEnabled() is true
- const ClusterExportInterval = 5 * time.Minute
- // clusterExporter is used if env.IsExportClusterCacheEnabled() is set to true
- // it will export the kubernetes cluster data to a file on a specific interval
- var clusterExporter *cluster.ClusterExporter
- func Healthz(w http.ResponseWriter, _ *http.Request) {
- w.WriteHeader(200)
- w.Header().Set("Content-Length", "0")
- w.Header().Set("Content-Type", "text/plain")
- }
- // initializes the kubernetes client cache
- func newKubernetesClusterCache() (kubernetes.Interface, clustercache.ClusterCache, error) {
- var err error
- // Kubernetes API setup
- kubeClientset, err := kubeconfig.LoadKubeClient("")
- if err != nil {
- return nil, nil, err
- }
- // Create Kubernetes Cluster Cache + Watchers
- k8sCache := cluster.NewKubernetesClusterCache(kubeClientset)
- k8sCache.Run()
- return kubeClientset, k8sCache, nil
- }
- func Execute(opts *AgentOpts) error {
- log.Infof("Starting Kubecost Agent version %s", version.FriendlyVersion())
- // initialize kubernetes client and cluster cache
- k8sClient, clusterCache, err := newKubernetesClusterCache()
- if err != nil {
- panic(err.Error())
- }
- clusterUID, err := kubeconfig.GetClusterUID(k8sClient)
- if err != nil {
- return fmt.Errorf("error getting cluster UID: %w", err)
- }
- // Create ConfigFileManager for synchronization of shared configuration
- confManager := config.NewConfigFileManager(nil)
- cloudProviderKey := env.GetCloudProviderAPIKey()
- cloudProvider, err := provider.NewProvider(clusterCache, cloudProviderKey, confManager)
- if err != nil {
- panic(err.Error())
- }
- // ClusterInfo Provider to provide the cluster map with local and remote cluster data
- localClusterInfo := costmodel.NewLocalClusterInfoProvider(k8sClient, cloudProvider)
- var clusterInfoProvider clusters.ClusterInfoProvider
- if env.IsExportClusterInfoEnabled() {
- clusterInfoConf := confManager.ConfigFileAt(env.GetClusterInfoFilePath())
- clusterInfoProvider = costmodel.NewClusterInfoWriteOnRequest(localClusterInfo, clusterInfoConf)
- } else {
- clusterInfoProvider = localClusterInfo
- }
- const maxRetries = 10
- const retryInterval = 10 * time.Second
- var fatalErr error
- ctx, cancel := context.WithCancel(context.Background())
- dataSource, err := retry.Retry(
- ctx,
- func() (source.OpenCostDataSource, error) {
- ds, e := prom.NewDefaultPrometheusDataSource(clusterInfoProvider)
- if e != nil {
- if source.IsRetryable(e) {
- return nil, e
- }
- fatalErr = e
- cancel()
- }
- return ds, e
- },
- maxRetries,
- retryInterval,
- )
- if fatalErr != nil {
- log.Fatalf("Failed to create Prometheus data source: %s", fatalErr)
- panic(fatalErr)
- }
- // Append the pricing config watcher
- installNamespace := env.GetOpencostNamespace()
- configWatchers := watcher.NewConfigMapWatchers(k8sClient, installNamespace)
- configWatchers.AddWatcher(provider.ConfigWatcherFor(cloudProvider))
- configWatchers.Watch()
- // Initialize cluster exporting if it's enabled
- if env.IsExportClusterCacheEnabled() {
- cacheLocation := confManager.ConfigFileAt(env.GetClusterCacheFilePath())
- clusterExporter = cluster.NewClusterExporter(clusterCache, cacheLocation, ClusterExportInterval)
- clusterExporter.Run()
- }
- // Initialize ClusterMap for maintaining ClusterInfo by ClusterID
- clusterMap := dataSource.ClusterMap()
- costModel := costmodel.NewCostModel(clusterUID, dataSource, cloudProvider, clusterCache, clusterMap, dataSource.BatchDuration())
- // initialize Kubernetes Metrics Emitter
- metricsEmitter := costmodel.NewCostModelMetricsEmitter(clusterCache, cloudProvider, clusterInfoProvider, costModel)
- // download pricing data
- err = cloudProvider.DownloadPricingData()
- if err != nil {
- log.Errorf("Error downloading pricing data: %s", err)
- }
- // start emitting metrics
- metricsEmitter.Start()
- rootMux := http.NewServeMux()
- rootMux.HandleFunc("/healthz", Healthz)
- rootMux.Handle("/metrics", promhttp.Handler())
- telemetryHandler := metrics.ResponseMetricMiddleware(rootMux)
- handler := cors.AllowAll().Handler(telemetryHandler)
- // Use the port from AgentOpts, or default to the environment variable value
- port := opts.Port
- if port == 0 {
- port = env.GetKubecostMetricsPort()
- }
- return http.ListenAndServe(fmt.Sprintf(":%d", port), handler)
- }
|