main.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. package main
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "net/http"
  7. "time"
  8. "github.com/kubecost/cost-model/pkg/cloud"
  9. "github.com/kubecost/cost-model/pkg/clustercache"
  10. "github.com/kubecost/cost-model/pkg/costmodel"
  11. "github.com/kubecost/cost-model/pkg/costmodel/clusters"
  12. "github.com/kubecost/cost-model/pkg/env"
  13. "github.com/kubecost/cost-model/pkg/prom"
  14. "github.com/kubecost/cost-model/pkg/util/watcher"
  15. prometheus "github.com/prometheus/client_golang/api"
  16. prometheusAPI "github.com/prometheus/client_golang/api/prometheus/v1"
  17. "github.com/prometheus/client_golang/prometheus/promhttp"
  18. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  19. "github.com/rs/cors"
  20. "k8s.io/client-go/kubernetes"
  21. "k8s.io/client-go/rest"
  22. "k8s.io/client-go/tools/clientcmd"
  23. "k8s.io/klog"
  24. )
  25. func Healthz(w http.ResponseWriter, _ *http.Request) {
  26. w.WriteHeader(200)
  27. w.Header().Set("Content-Length", "0")
  28. w.Header().Set("Content-Type", "text/plain")
  29. }
  30. // initializes the kubernetes client cache
  31. func newKubernetesClusterCache() (clustercache.ClusterCache, error) {
  32. var err error
  33. // Kubernetes API setup
  34. var kc *rest.Config
  35. if kubeconfig := env.GetKubeConfigPath(); kubeconfig != "" {
  36. kc, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
  37. } else {
  38. kc, err = rest.InClusterConfig()
  39. }
  40. if err != nil {
  41. return nil, err
  42. }
  43. kubeClientset, err := kubernetes.NewForConfig(kc)
  44. if err != nil {
  45. return nil, err
  46. }
  47. // Create Kubernetes Cluster Cache + Watchers
  48. k8sCache := clustercache.NewKubernetesClusterCache(kubeClientset)
  49. k8sCache.Run()
  50. return k8sCache, nil
  51. }
  52. func newPrometheusClient() (prometheus.Client, error) {
  53. address := env.GetPrometheusServerEndpoint()
  54. if address == "" {
  55. return nil, fmt.Errorf("No address for prometheus set in $%s. Aborting.", env.PrometheusServerEndpointEnvVar)
  56. }
  57. queryConcurrency := env.GetMaxQueryConcurrency()
  58. klog.Infof("Prometheus Client Max Concurrency set to %d", queryConcurrency)
  59. timeout := 120 * time.Second
  60. keepAlive := 120 * time.Second
  61. promCli, err := prom.NewPrometheusClient(address, timeout, keepAlive, queryConcurrency, "")
  62. if err != nil {
  63. return nil, fmt.Errorf("Failed to create prometheus client, Error: %v", err)
  64. }
  65. m, err := prom.Validate(promCli)
  66. if err != nil || !m.Running {
  67. if err != nil {
  68. klog.Errorf("Failed to query prometheus at %s. Error: %s . Troubleshooting help available at: %s", address, err.Error(), prom.PrometheusTroubleshootingURL)
  69. } else if !m.Running {
  70. klog.Errorf("Prometheus at %s is not running. Troubleshooting help available at: %s", address, prom.PrometheusTroubleshootingURL)
  71. }
  72. } else {
  73. klog.V(1).Info("Success: retrieved the 'up' query against prometheus at: " + address)
  74. }
  75. api := prometheusAPI.NewAPI(promCli)
  76. _, err = api.Config(context.Background())
  77. if err != nil {
  78. klog.Infof("No valid prometheus config file at %s. Error: %s . Troubleshooting help available at: %s. Ignore if using cortex/thanos here.", address, err.Error(), prom.PrometheusTroubleshootingURL)
  79. } else {
  80. klog.Infof("Retrieved a prometheus config file from: %s", address)
  81. }
  82. return promCli, nil
  83. }
  84. func main() {
  85. klog.InitFlags(nil)
  86. flag.Set("v", "3")
  87. flag.Parse()
  88. klog.V(1).Infof("Starting kubecost-metrics...")
  89. configWatchers := watcher.NewConfigMapWatchers()
  90. scrapeInterval := time.Minute
  91. promCli, err := newPrometheusClient()
  92. if err != nil {
  93. panic(err.Error())
  94. }
  95. // Lookup scrape interval for kubecost job, update if found
  96. si, err := prom.ScrapeIntervalFor(promCli, env.GetKubecostJobName())
  97. if err == nil {
  98. scrapeInterval = si
  99. }
  100. klog.Infof("Using scrape interval of %f", scrapeInterval.Seconds())
  101. // initialize kubernetes client and cluster cache
  102. clusterCache, err := newKubernetesClusterCache()
  103. if err != nil {
  104. panic(err.Error())
  105. }
  106. cloudProviderKey := env.GetCloudProviderAPIKey()
  107. cloudProvider, err := cloud.NewProvider(clusterCache, cloudProviderKey)
  108. if err != nil {
  109. panic(err.Error())
  110. }
  111. // Append the pricing config watcher
  112. configWatchers.AddWatcher(cloud.ConfigWatcherFor(cloudProvider))
  113. watchConfigFunc := configWatchers.ToWatchFunc()
  114. watchedConfigs := configWatchers.GetWatchedConfigs()
  115. k8sClient := clusterCache.GetClient()
  116. kubecostNamespace := env.GetKubecostNamespace()
  117. // We need an initial invocation because the init of the cache has happened before we had access to the provider.
  118. for _, cw := range watchedConfigs {
  119. configs, err := k8sClient.CoreV1().ConfigMaps(kubecostNamespace).Get(context.Background(), cw, metav1.GetOptions{})
  120. if err != nil {
  121. klog.Infof("No %s configmap found at install time, using existing configs: %s", cw, err.Error())
  122. } else {
  123. watchConfigFunc(configs)
  124. }
  125. }
  126. clusterCache.SetConfigMapUpdateFunc(watchConfigFunc)
  127. // Initialize ClusterMap for maintaining ClusterInfo by ClusterID
  128. clusterMap := clusters.NewClusterMap(
  129. promCli,
  130. costmodel.NewLocalClusterInfoProvider(k8sClient, cloudProvider),
  131. 5*time.Minute)
  132. costModel := costmodel.NewCostModel(promCli, cloudProvider, clusterCache, clusterMap, scrapeInterval)
  133. // initialize Kubernetes Metrics Emitter
  134. metricsEmitter := costmodel.NewCostModelMetricsEmitter(promCli, clusterCache, cloudProvider, costModel)
  135. // download pricing data
  136. err = cloudProvider.DownloadPricingData()
  137. if err != nil {
  138. klog.Errorf("Error downloading pricing data: %s", err)
  139. }
  140. // start emitting metrics
  141. metricsEmitter.Start()
  142. rootMux := http.NewServeMux()
  143. rootMux.HandleFunc("/healthz", Healthz)
  144. rootMux.Handle("/metrics", promhttp.Handler())
  145. handler := cors.AllowAll().Handler(rootMux)
  146. klog.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", env.GetKubecostMetricsPort()), handler))
  147. }