agent.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. package agent
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "time"
  7. "github.com/kubecost/cost-model/pkg/cloud"
  8. "github.com/kubecost/cost-model/pkg/clustercache"
  9. "github.com/kubecost/cost-model/pkg/config"
  10. "github.com/kubecost/cost-model/pkg/costmodel"
  11. "github.com/kubecost/cost-model/pkg/costmodel/clusters"
  12. "github.com/kubecost/cost-model/pkg/env"
  13. "github.com/kubecost/cost-model/pkg/log"
  14. "github.com/kubecost/cost-model/pkg/prom"
  15. "github.com/kubecost/cost-model/pkg/util/watcher"
  16. prometheus "github.com/prometheus/client_golang/api"
  17. prometheusAPI "github.com/prometheus/client_golang/api/prometheus/v1"
  18. "github.com/prometheus/client_golang/prometheus/promhttp"
  19. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  20. "k8s.io/klog"
  21. "github.com/rs/cors"
  22. "k8s.io/client-go/kubernetes"
  23. "k8s.io/client-go/rest"
  24. "k8s.io/client-go/tools/clientcmd"
  25. )
  26. // AgentOpts contain configuration options that can be passed to the Execute() method
  27. type AgentOpts struct {
  28. // Stubbed for future configuration
  29. }
  30. // ClusterExportInterval is the interval used to export the cluster if env.IsExportClusterCacheEnabled() is true
  31. const ClusterExportInterval = 5 * time.Minute
  32. // clusterExporter is used if env.IsExportClusterCacheEnabled() is set to true
  33. // it will export the kubernetes cluster data to a file on a specific interval
  34. var clusterExporter *clustercache.ClusterExporter
  35. func Healthz(w http.ResponseWriter, _ *http.Request) {
  36. w.WriteHeader(200)
  37. w.Header().Set("Content-Length", "0")
  38. w.Header().Set("Content-Type", "text/plain")
  39. }
  40. // initializes the kubernetes client cache
  41. func newKubernetesClusterCache() (kubernetes.Interface, clustercache.ClusterCache, error) {
  42. var err error
  43. // Kubernetes API setup
  44. var kc *rest.Config
  45. if kubeconfig := env.GetKubeConfigPath(); kubeconfig != "" {
  46. kc, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
  47. } else {
  48. kc, err = rest.InClusterConfig()
  49. }
  50. if err != nil {
  51. return nil, nil, err
  52. }
  53. kubeClientset, err := kubernetes.NewForConfig(kc)
  54. if err != nil {
  55. return nil, nil, err
  56. }
  57. // Create Kubernetes Cluster Cache + Watchers
  58. k8sCache := clustercache.NewKubernetesClusterCache(kubeClientset)
  59. k8sCache.Run()
  60. return kubeClientset, k8sCache, nil
  61. }
  62. func newPrometheusClient() (prometheus.Client, error) {
  63. address := env.GetPrometheusServerEndpoint()
  64. if address == "" {
  65. return nil, fmt.Errorf("No address for prometheus set in $%s. Aborting.", env.PrometheusServerEndpointEnvVar)
  66. }
  67. queryConcurrency := env.GetMaxQueryConcurrency()
  68. log.Infof("Prometheus Client Max Concurrency set to %d", queryConcurrency)
  69. timeout := 120 * time.Second
  70. keepAlive := 120 * time.Second
  71. promCli, err := prom.NewPrometheusClient(address, timeout, keepAlive, queryConcurrency, "")
  72. if err != nil {
  73. return nil, fmt.Errorf("Failed to create prometheus client, Error: %v", err)
  74. }
  75. m, err := prom.Validate(promCli)
  76. if err != nil || !m.Running {
  77. if err != nil {
  78. log.Errorf("Failed to query prometheus at %s. Error: %s . Troubleshooting help available at: %s", address, err.Error(), prom.PrometheusTroubleshootingURL)
  79. } else if !m.Running {
  80. log.Errorf("Prometheus at %s is not running. Troubleshooting help available at: %s", address, prom.PrometheusTroubleshootingURL)
  81. }
  82. } else {
  83. log.Infof("Success: retrieved the 'up' query against prometheus at: %s", address)
  84. }
  85. api := prometheusAPI.NewAPI(promCli)
  86. _, err = api.Config(context.Background())
  87. if err != nil {
  88. log.Infof("No valid prometheus config file at %s. Error: %s . Troubleshooting help available at: %s. Ignore if using cortex/thanos here.", address, err.Error(), prom.PrometheusTroubleshootingURL)
  89. } else {
  90. log.Infof("Retrieved a prometheus config file from: %s", address)
  91. }
  92. return promCli, nil
  93. }
  94. func Execute(opts *AgentOpts) error {
  95. log.Infof("Starting Kubecost Agent")
  96. configWatchers := watcher.NewConfigMapWatchers()
  97. scrapeInterval := time.Minute
  98. promCli, err := newPrometheusClient()
  99. if err != nil {
  100. panic(err.Error())
  101. }
  102. // Lookup scrape interval for kubecost job, update if found
  103. si, err := prom.ScrapeIntervalFor(promCli, env.GetKubecostJobName())
  104. if err == nil {
  105. scrapeInterval = si
  106. }
  107. klog.Infof("Using scrape interval of %f", scrapeInterval.Seconds())
  108. // initialize kubernetes client and cluster cache
  109. k8sClient, clusterCache, err := newKubernetesClusterCache()
  110. if err != nil {
  111. panic(err.Error())
  112. }
  113. // Create ConfigFileManager for synchronization of shared configuration
  114. confManager := config.NewConfigFileManager(&config.ConfigFileManagerOpts{
  115. BucketStoreConfig: env.GetKubecostConfigBucket(),
  116. LocalConfigPath: "/",
  117. })
  118. cloudProviderKey := env.GetCloudProviderAPIKey()
  119. cloudProvider, err := cloud.NewProvider(clusterCache, cloudProviderKey, confManager)
  120. if err != nil {
  121. panic(err.Error())
  122. }
  123. // Append the pricing config watcher
  124. configWatchers.AddWatcher(cloud.ConfigWatcherFor(cloudProvider))
  125. watchConfigFunc := configWatchers.ToWatchFunc()
  126. watchedConfigs := configWatchers.GetWatchedConfigs()
  127. kubecostNamespace := env.GetKubecostNamespace()
  128. // We need an initial invocation because the init of the cache has happened before we had access to the provider.
  129. for _, cw := range watchedConfigs {
  130. configs, err := k8sClient.CoreV1().ConfigMaps(kubecostNamespace).Get(context.Background(), cw, metav1.GetOptions{})
  131. if err != nil {
  132. klog.Infof("No %s configmap found at install time, using existing configs: %s", cw, err.Error())
  133. } else {
  134. watchConfigFunc(configs)
  135. }
  136. }
  137. clusterCache.SetConfigMapUpdateFunc(watchConfigFunc)
  138. // Initialize cluster exporting if it's enabled
  139. if env.IsExportClusterCacheEnabled() {
  140. cacheLocation := confManager.ConfigFileAt("/var/configs/cluster-cache.json")
  141. clusterExporter = clustercache.NewClusterExporter(clusterCache, cacheLocation, ClusterExportInterval)
  142. clusterExporter.Run()
  143. }
  144. // ClusterInfo Provider to provide the cluster map with local and remote cluster data
  145. clusterInfoConf := confManager.ConfigFileAt("/var/configs/cluster-info.json")
  146. localClusterInfo := costmodel.NewLocalClusterInfoProvider(k8sClient, cloudProvider)
  147. clusterInfoProvider := costmodel.NewClusterInfoWriteOnRequest(localClusterInfo, clusterInfoConf)
  148. // Initialize ClusterMap for maintaining ClusterInfo by ClusterID
  149. clusterMap := clusters.NewClusterMap(promCli, clusterInfoProvider, 5*time.Minute)
  150. costModel := costmodel.NewCostModel(promCli, cloudProvider, clusterCache, clusterMap, scrapeInterval)
  151. // initialize Kubernetes Metrics Emitter
  152. metricsEmitter := costmodel.NewCostModelMetricsEmitter(promCli, clusterCache, cloudProvider, clusterInfoProvider, costModel)
  153. // download pricing data
  154. err = cloudProvider.DownloadPricingData()
  155. if err != nil {
  156. klog.Errorf("Error downloading pricing data: %s", err)
  157. }
  158. // start emitting metrics
  159. metricsEmitter.Start()
  160. rootMux := http.NewServeMux()
  161. rootMux.HandleFunc("/healthz", Healthz)
  162. rootMux.Handle("/metrics", promhttp.Handler())
  163. handler := cors.AllowAll().Handler(rootMux)
  164. return http.ListenAndServe(fmt.Sprintf(":%d", env.GetKubecostMetricsPort()), handler)
  165. }