agent.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. package agent
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "path"
  7. "time"
  8. "github.com/opencost/opencost/core/pkg/clusters"
  9. "github.com/opencost/opencost/core/pkg/log"
  10. "github.com/opencost/opencost/pkg/util/watcher"
  11. "github.com/opencost/opencost/core/pkg/version"
  12. "github.com/opencost/opencost/pkg/cloud/provider"
  13. "github.com/opencost/opencost/pkg/clustercache"
  14. "github.com/opencost/opencost/pkg/config"
  15. "github.com/opencost/opencost/pkg/costmodel"
  16. clustermap "github.com/opencost/opencost/pkg/costmodel/clusters"
  17. "github.com/opencost/opencost/pkg/env"
  18. "github.com/opencost/opencost/pkg/kubeconfig"
  19. "github.com/opencost/opencost/pkg/metrics"
  20. "github.com/opencost/opencost/pkg/prom"
  21. prometheus "github.com/prometheus/client_golang/api"
  22. prometheusAPI "github.com/prometheus/client_golang/api/prometheus/v1"
  23. "github.com/prometheus/client_golang/prometheus/promhttp"
  24. "github.com/rs/cors"
  25. "k8s.io/client-go/kubernetes"
  26. )
  27. // AgentOpts contain configuration options that can be passed to the Execute() method
  28. type AgentOpts struct {
  29. // Stubbed for future configuration
  30. }
  31. // ClusterExportInterval is the interval used to export the cluster if env.IsExportClusterCacheEnabled() is true
  32. const ClusterExportInterval = 5 * time.Minute
  33. // clusterExporter is used if env.IsExportClusterCacheEnabled() is set to true
  34. // it will export the kubernetes cluster data to a file on a specific interval
  35. var clusterExporter *clustercache.ClusterExporter
  36. func Healthz(w http.ResponseWriter, _ *http.Request) {
  37. w.WriteHeader(200)
  38. w.Header().Set("Content-Length", "0")
  39. w.Header().Set("Content-Type", "text/plain")
  40. }
  41. // initializes the kubernetes client cache
  42. func newKubernetesClusterCache() (kubernetes.Interface, clustercache.ClusterCache, error) {
  43. var err error
  44. // Kubernetes API setup
  45. kubeClientset, err := kubeconfig.LoadKubeClient("")
  46. if err != nil {
  47. return nil, nil, err
  48. }
  49. // Create Kubernetes Cluster Cache + Watchers
  50. k8sCache := clustercache.NewKubernetesClusterCache(kubeClientset)
  51. k8sCache.Run()
  52. return kubeClientset, k8sCache, nil
  53. }
  54. func newPrometheusClient() (prometheus.Client, error) {
  55. address := env.GetPrometheusServerEndpoint()
  56. if address == "" {
  57. return nil, fmt.Errorf("No address for prometheus set in $%s. Aborting.", env.PrometheusServerEndpointEnvVar)
  58. }
  59. queryConcurrency := env.GetMaxQueryConcurrency()
  60. log.Infof("Prometheus Client Max Concurrency set to %d", queryConcurrency)
  61. timeout := 120 * time.Second
  62. keepAlive := 120 * time.Second
  63. tlsHandshakeTimeout := 10 * time.Second
  64. var rateLimitRetryOpts *prom.RateLimitRetryOpts = nil
  65. if env.IsPrometheusRetryOnRateLimitResponse() {
  66. rateLimitRetryOpts = &prom.RateLimitRetryOpts{
  67. MaxRetries: env.GetPrometheusRetryOnRateLimitMaxRetries(),
  68. DefaultRetryWait: env.GetPrometheusRetryOnRateLimitDefaultWait(),
  69. }
  70. }
  71. promCli, err := prom.NewPrometheusClient(address, &prom.PrometheusClientConfig{
  72. Timeout: timeout,
  73. KeepAlive: keepAlive,
  74. TLSHandshakeTimeout: tlsHandshakeTimeout,
  75. TLSInsecureSkipVerify: env.GetInsecureSkipVerify(),
  76. RateLimitRetryOpts: rateLimitRetryOpts,
  77. Auth: &prom.ClientAuth{
  78. Username: env.GetDBBasicAuthUsername(),
  79. Password: env.GetDBBasicAuthUserPassword(),
  80. BearerToken: env.GetDBBearerToken(),
  81. },
  82. QueryConcurrency: queryConcurrency,
  83. QueryLogFile: "",
  84. })
  85. if err != nil {
  86. return nil, fmt.Errorf("Failed to create prometheus client, Error: %v", err)
  87. }
  88. m, err := prom.Validate(promCli)
  89. if err != nil || !m.Running {
  90. if err != nil {
  91. log.Errorf("Failed to query prometheus at %s. Error: %s . Troubleshooting help available at: %s", address, err.Error(), prom.PrometheusTroubleshootingURL)
  92. } else if !m.Running {
  93. log.Errorf("Prometheus at %s is not running. Troubleshooting help available at: %s", address, prom.PrometheusTroubleshootingURL)
  94. }
  95. } else {
  96. log.Infof("Success: retrieved the 'up' query against prometheus at: %s", address)
  97. }
  98. api := prometheusAPI.NewAPI(promCli)
  99. _, err = api.Buildinfo(context.Background())
  100. if err != nil {
  101. log.Infof("No valid prometheus config file at %s. Error: %s . Troubleshooting help available at: %s. Ignore if using cortex/mimir/thanos here.", address, err.Error(), prom.PrometheusTroubleshootingURL)
  102. } else {
  103. log.Infof("Retrieved a prometheus config file from: %s", address)
  104. }
  105. return promCli, nil
  106. }
  107. func Execute(opts *AgentOpts) error {
  108. log.Infof("Starting Kubecost Agent version %s", version.FriendlyVersion())
  109. scrapeInterval := env.GetKubecostScrapeInterval()
  110. promCli, err := newPrometheusClient()
  111. if err != nil {
  112. panic(err.Error())
  113. }
  114. if scrapeInterval == 0 {
  115. scrapeInterval = time.Minute
  116. // Lookup scrape interval for kubecost job, update if found
  117. si, err := prom.ScrapeIntervalFor(promCli, env.GetKubecostJobName())
  118. if err == nil {
  119. scrapeInterval = si
  120. }
  121. }
  122. log.Infof("Using scrape interval of %f", scrapeInterval.Seconds())
  123. // initialize kubernetes client and cluster cache
  124. k8sClient, clusterCache, err := newKubernetesClusterCache()
  125. if err != nil {
  126. panic(err.Error())
  127. }
  128. // Create ConfigFileManager for synchronization of shared configuration
  129. confManager := config.NewConfigFileManager(&config.ConfigFileManagerOpts{
  130. BucketStoreConfig: env.GetKubecostConfigBucket(),
  131. LocalConfigPath: "/",
  132. })
  133. cloudProviderKey := env.GetCloudProviderAPIKey()
  134. cloudProvider, err := provider.NewProvider(clusterCache, cloudProviderKey, confManager)
  135. if err != nil {
  136. panic(err.Error())
  137. }
  138. // Append the pricing config watcher
  139. kubecostNamespace := env.GetKubecostNamespace()
  140. configWatchers := watcher.NewConfigMapWatchers(k8sClient, kubecostNamespace)
  141. configWatchers.AddWatcher(provider.ConfigWatcherFor(cloudProvider))
  142. configWatchers.Watch()
  143. configPrefix := env.GetConfigPathWithDefault(env.DefaultConfigMountPath)
  144. // Initialize cluster exporting if it's enabled
  145. if env.IsExportClusterCacheEnabled() {
  146. cacheLocation := confManager.ConfigFileAt(path.Join(configPrefix, "cluster-cache.json"))
  147. clusterExporter = clustercache.NewClusterExporter(clusterCache, cacheLocation, ClusterExportInterval)
  148. clusterExporter.Run()
  149. }
  150. // ClusterInfo Provider to provide the cluster map with local and remote cluster data
  151. localClusterInfo := costmodel.NewLocalClusterInfoProvider(k8sClient, cloudProvider)
  152. var clusterInfoProvider clusters.ClusterInfoProvider
  153. if env.IsExportClusterInfoEnabled() {
  154. clusterInfoConf := confManager.ConfigFileAt(path.Join(configPrefix, "cluster-info.json"))
  155. clusterInfoProvider = costmodel.NewClusterInfoWriteOnRequest(localClusterInfo, clusterInfoConf)
  156. } else {
  157. clusterInfoProvider = localClusterInfo
  158. }
  159. // Initialize ClusterMap for maintaining ClusterInfo by ClusterID
  160. clusterMap := clustermap.NewClusterMap(promCli, clusterInfoProvider, 5*time.Minute)
  161. costModel := costmodel.NewCostModel(promCli, cloudProvider, clusterCache, clusterMap, scrapeInterval)
  162. // initialize Kubernetes Metrics Emitter
  163. metricsEmitter := costmodel.NewCostModelMetricsEmitter(promCli, clusterCache, cloudProvider, clusterInfoProvider, costModel)
  164. // download pricing data
  165. err = cloudProvider.DownloadPricingData()
  166. if err != nil {
  167. log.Errorf("Error downloading pricing data: %s", err)
  168. }
  169. // start emitting metrics
  170. metricsEmitter.Start()
  171. rootMux := http.NewServeMux()
  172. rootMux.HandleFunc("/healthz", Healthz)
  173. rootMux.Handle("/metrics", promhttp.Handler())
  174. telemetryHandler := metrics.ResponseMetricMiddleware(rootMux)
  175. handler := cors.AllowAll().Handler(telemetryHandler)
  176. return http.ListenAndServe(fmt.Sprintf(":%d", env.GetKubecostMetricsPort()), handler)
  177. }