router.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664
  1. package costmodel
  2. import (
  3. "context"
  4. "encoding/base64"
  5. "fmt"
  6. "net/http"
  7. "os"
  8. "path"
  9. "reflect"
  10. "strconv"
  11. "strings"
  12. "sync"
  13. "time"
  14. "github.com/opencost/opencost/core/pkg/kubeconfig"
  15. "github.com/opencost/opencost/core/pkg/nodestats"
  16. "github.com/opencost/opencost/core/pkg/protocol"
  17. "github.com/opencost/opencost/core/pkg/source"
  18. "github.com/opencost/opencost/core/pkg/storage"
  19. "github.com/opencost/opencost/core/pkg/util/retry"
  20. "github.com/opencost/opencost/core/pkg/util/timeutil"
  21. "github.com/opencost/opencost/core/pkg/version"
  22. "github.com/opencost/opencost/pkg/cloud/aws"
  23. cloudconfig "github.com/opencost/opencost/pkg/cloud/config"
  24. "github.com/opencost/opencost/pkg/cloud/gcp"
  25. "github.com/opencost/opencost/pkg/cloud/provider"
  26. "github.com/opencost/opencost/pkg/cloudcost"
  27. "github.com/opencost/opencost/pkg/config"
  28. "github.com/opencost/opencost/pkg/customcost"
  29. "github.com/opencost/opencost/pkg/metrics"
  30. "github.com/opencost/opencost/pkg/util/watcher"
  31. "github.com/julienschmidt/httprouter"
  32. "github.com/opencost/opencost/core/pkg/clustercache"
  33. "github.com/opencost/opencost/core/pkg/clusters"
  34. sysenv "github.com/opencost/opencost/core/pkg/env"
  35. "github.com/opencost/opencost/core/pkg/log"
  36. "github.com/opencost/opencost/core/pkg/util/json"
  37. "github.com/opencost/opencost/modules/collector-source/pkg/collector"
  38. "github.com/opencost/opencost/modules/prometheus-source/pkg/prom"
  39. "github.com/opencost/opencost/pkg/cloud/azure"
  40. "github.com/opencost/opencost/pkg/cloud/models"
  41. clusterc "github.com/opencost/opencost/pkg/clustercache"
  42. "github.com/opencost/opencost/pkg/env"
  43. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  44. "github.com/patrickmn/go-cache"
  45. "k8s.io/client-go/kubernetes"
  46. )
  47. const (
  48. RFC3339Milli = "2006-01-02T15:04:05.000Z"
  49. maxCacheMinutes1d = 11
  50. maxCacheMinutes2d = 17
  51. maxCacheMinutes7d = 37
  52. maxCacheMinutes30d = 137
  53. CustomPricingSetting = "CustomPricing"
  54. DiscountSetting = "Discount"
  55. )
  56. var (
  57. // gitCommit is set by the build system
  58. gitCommit string
  59. proto = protocol.HTTP()
  60. )
  61. // Accesses defines a singleton application instance, providing access to
  62. // Prometheus, Kubernetes, the cloud provider, and caches.
  63. type Accesses struct {
  64. DataSource source.OpenCostDataSource
  65. KubeClientSet kubernetes.Interface
  66. ClusterCache clustercache.ClusterCache
  67. ClusterMap clusters.ClusterMap
  68. CloudProvider models.Provider
  69. ConfigFileManager *config.ConfigFileManager
  70. ClusterInfoProvider clusters.ClusterInfoProvider
  71. Model *CostModel
  72. MetricsEmitter *CostModelMetricsEmitter
  73. // SettingsCache stores current state of app settings
  74. SettingsCache *cache.Cache
  75. // settingsSubscribers tracks channels through which changes to different
  76. // settings will be published in a pub/sub model
  77. settingsSubscribers map[string][]chan string
  78. settingsMutex sync.Mutex
  79. }
  80. func filterFields(fields string, data map[string]*CostData) map[string]CostData {
  81. fs := strings.Split(fields, ",")
  82. fmap := make(map[string]bool)
  83. for _, f := range fs {
  84. fieldNameLower := strings.ToLower(f) // convert to go struct name by uppercasing first letter
  85. log.Debugf("to delete: %s", fieldNameLower)
  86. fmap[fieldNameLower] = true
  87. }
  88. filteredData := make(map[string]CostData)
  89. for cname, costdata := range data {
  90. s := reflect.TypeOf(*costdata)
  91. val := reflect.ValueOf(*costdata)
  92. costdata2 := CostData{}
  93. cd2 := reflect.New(reflect.Indirect(reflect.ValueOf(costdata2)).Type()).Elem()
  94. n := s.NumField()
  95. for i := 0; i < n; i++ {
  96. field := s.Field(i)
  97. value := val.Field(i)
  98. value2 := cd2.Field(i)
  99. if _, ok := fmap[strings.ToLower(field.Name)]; !ok {
  100. value2.Set(reflect.Value(value))
  101. }
  102. }
  103. filteredData[cname] = cd2.Interface().(CostData)
  104. }
  105. return filteredData
  106. }
  107. // ParsePercentString takes a string of expected format "N%" and returns a floating point 0.0N.
  108. // If the "%" symbol is missing, it just returns 0.0N. Empty string is interpreted as "0%" and
  109. // return 0.0.
  110. func ParsePercentString(percentStr string) (float64, error) {
  111. if len(percentStr) == 0 {
  112. return 0.0, nil
  113. }
  114. if percentStr[len(percentStr)-1:] == "%" {
  115. percentStr = percentStr[:len(percentStr)-1]
  116. }
  117. discount, err := strconv.ParseFloat(percentStr, 64)
  118. if err != nil {
  119. return 0.0, err
  120. }
  121. discount *= 0.01
  122. return discount, nil
  123. }
  124. func WriteData(w http.ResponseWriter, data interface{}, err error) {
  125. if err != nil {
  126. proto.WriteError(w, proto.InternalServerError(err.Error()))
  127. return
  128. }
  129. proto.WriteData(w, data)
  130. }
  131. // RefreshPricingData needs to be called when a new node joins the fleet, since we cache the relevant subsets of pricing data to avoid storing the whole thing.
  132. func (a *Accesses) RefreshPricingData(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  133. w.Header().Set("Content-Type", "application/json")
  134. w.Header().Set("Access-Control-Allow-Origin", "*")
  135. err := a.CloudProvider.DownloadPricingData()
  136. if err != nil {
  137. log.Errorf("Error refreshing pricing data: %s", err.Error())
  138. }
  139. WriteData(w, nil, err)
  140. }
  141. func (a *Accesses) CostDataModel(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  142. w.Header().Set("Content-Type", "application/json")
  143. w.Header().Set("Access-Control-Allow-Origin", "*")
  144. window := r.URL.Query().Get("timeWindow")
  145. offset := r.URL.Query().Get("offset")
  146. fields := r.URL.Query().Get("filterFields")
  147. namespace := r.URL.Query().Get("namespace")
  148. duration, err := timeutil.ParseDuration(window)
  149. if err != nil {
  150. WriteData(w, nil, fmt.Errorf("error parsing window (%s): %s", window, err))
  151. return
  152. }
  153. end := time.Now()
  154. if offset != "" {
  155. offsetDur, err := timeutil.ParseDuration(offset)
  156. if err != nil {
  157. WriteData(w, nil, fmt.Errorf("error parsing offset (%s): %s", offset, err))
  158. return
  159. }
  160. end = end.Add(-offsetDur)
  161. }
  162. start := end.Add(-duration)
  163. data, err := a.Model.ComputeCostData(start, end)
  164. // apply filter by removing if != namespace
  165. if namespace != "" {
  166. for key, costData := range data {
  167. if costData.Namespace != namespace {
  168. delete(data, key)
  169. }
  170. }
  171. }
  172. if fields != "" {
  173. filteredData := filterFields(fields, data)
  174. WriteData(w, filteredData, err)
  175. } else {
  176. WriteData(w, data, err)
  177. }
  178. }
  179. func (a *Accesses) GetAllNodePricing(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  180. w.Header().Set("Content-Type", "application/json")
  181. w.Header().Set("Access-Control-Allow-Origin", "*")
  182. data, err := a.CloudProvider.AllNodePricing()
  183. WriteData(w, data, err)
  184. }
  185. func (a *Accesses) GetConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  186. w.Header().Set("Content-Type", "application/json")
  187. w.Header().Set("Access-Control-Allow-Origin", "*")
  188. data, err := a.CloudProvider.GetConfig()
  189. WriteData(w, data, err)
  190. }
  191. func (a *Accesses) UpdateSpotInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  192. w.Header().Set("Content-Type", "application/json")
  193. w.Header().Set("Access-Control-Allow-Origin", "*")
  194. data, err := a.CloudProvider.UpdateConfig(r.Body, aws.SpotInfoUpdateType)
  195. WriteData(w, data, err)
  196. err = a.CloudProvider.DownloadPricingData()
  197. if err != nil {
  198. log.Errorf("Error redownloading data on config update: %s", err.Error())
  199. }
  200. }
  201. func (a *Accesses) UpdateAthenaInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  202. w.Header().Set("Content-Type", "application/json")
  203. w.Header().Set("Access-Control-Allow-Origin", "*")
  204. data, err := a.CloudProvider.UpdateConfig(r.Body, aws.AthenaInfoUpdateType)
  205. WriteData(w, data, err)
  206. }
  207. func (a *Accesses) UpdateBigQueryInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  208. w.Header().Set("Content-Type", "application/json")
  209. w.Header().Set("Access-Control-Allow-Origin", "*")
  210. data, err := a.CloudProvider.UpdateConfig(r.Body, gcp.BigqueryUpdateType)
  211. WriteData(w, data, err)
  212. }
  213. func (a *Accesses) UpdateAzureStorageConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  214. w.Header().Set("Content-Type", "application/json")
  215. w.Header().Set("Access-Control-Allow-Origin", "*")
  216. data, err := a.CloudProvider.UpdateConfig(r.Body, azure.AzureStorageUpdateType)
  217. if err != nil {
  218. WriteData(w, nil, err)
  219. return
  220. }
  221. WriteData(w, data, err)
  222. }
  223. func (a *Accesses) UpdateConfigByKey(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  224. w.Header().Set("Content-Type", "application/json")
  225. w.Header().Set("Access-Control-Allow-Origin", "*")
  226. data, err := a.CloudProvider.UpdateConfig(r.Body, "")
  227. WriteData(w, data, err)
  228. }
  229. func (a *Accesses) ManagementPlatform(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  230. w.Header().Set("Content-Type", "application/json")
  231. w.Header().Set("Access-Control-Allow-Origin", "*")
  232. data, err := a.CloudProvider.GetManagementPlatform()
  233. WriteData(w, data, err)
  234. }
  235. func (a *Accesses) ClusterInfo(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  236. w.Header().Set("Content-Type", "application/json")
  237. w.Header().Set("Access-Control-Allow-Origin", "*")
  238. data := a.ClusterInfoProvider.GetClusterInfo()
  239. WriteData(w, data, nil)
  240. }
  241. func (a *Accesses) GetClusterInfoMap(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  242. w.Header().Set("Content-Type", "application/json")
  243. w.Header().Set("Access-Control-Allow-Origin", "*")
  244. data := a.ClusterMap.AsMap()
  245. WriteData(w, data, nil)
  246. }
  247. func (a *Accesses) GetServiceAccountStatus(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  248. w.Header().Set("Content-Type", "application/json")
  249. w.Header().Set("Access-Control-Allow-Origin", "*")
  250. WriteData(w, a.CloudProvider.ServiceAccountStatus(), nil)
  251. }
  252. func (a *Accesses) GetPricingSourceStatus(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  253. w.Header().Set("Content-Type", "application/json")
  254. w.Header().Set("Access-Control-Allow-Origin", "*")
  255. data := a.CloudProvider.PricingSourceStatus()
  256. WriteData(w, data, nil)
  257. }
  258. func (a *Accesses) GetPricingSourceCounts(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  259. w.Header().Set("Content-Type", "application/json")
  260. w.Header().Set("Access-Control-Allow-Origin", "*")
  261. data, err := a.Model.GetPricingSourceCounts()
  262. WriteData(w, data, err)
  263. }
  264. func (a *Accesses) GetPricingSourceSummary(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
  265. w.Header().Set("Content-Type", "application/json")
  266. w.Header().Set("Access-Control-Allow-Origin", "*")
  267. data := a.CloudProvider.PricingSourceSummary()
  268. WriteData(w, data, nil)
  269. }
  270. func (a *Accesses) GetOrphanedPods(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  271. w.Header().Set("Content-Type", "application/json")
  272. w.Header().Set("Access-Control-Allow-Origin", "*")
  273. podlist := a.ClusterCache.GetAllPods()
  274. var lonePods []*clustercache.Pod
  275. for _, pod := range podlist {
  276. if len(pod.OwnerReferences) == 0 {
  277. lonePods = append(lonePods, pod)
  278. }
  279. }
  280. body, err := json.Marshal(lonePods)
  281. if err != nil {
  282. fmt.Fprintf(w, "Error decoding pod: %s", err)
  283. } else {
  284. w.Write(body)
  285. }
  286. }
  287. func (a *Accesses) GetInstallNamespace(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  288. w.Header().Set("Content-Type", "application/json")
  289. w.Header().Set("Access-Control-Allow-Origin", "*")
  290. ns := env.GetInstallNamespace()
  291. w.Write([]byte(ns))
  292. }
  293. type InstallInfo struct {
  294. Containers []ContainerInfo `json:"containers"`
  295. ClusterInfo map[string]string `json:"clusterInfo"`
  296. Version string `json:"version"`
  297. }
  298. type ContainerInfo struct {
  299. ContainerName string `json:"containerName"`
  300. Image string `json:"image"`
  301. StartTime string `json:"startTime"`
  302. }
  303. func (a *Accesses) GetInstallInfo(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  304. w.Header().Set("Content-Type", "application/json")
  305. w.Header().Set("Access-Control-Allow-Origin", "*")
  306. containers, err := GetKubecostContainers(a.KubeClientSet)
  307. if err != nil {
  308. http.Error(w, fmt.Sprintf("Unable to list pods: %s", err.Error()), http.StatusInternalServerError)
  309. return
  310. }
  311. info := InstallInfo{
  312. Containers: containers,
  313. ClusterInfo: make(map[string]string),
  314. Version: version.FriendlyVersion(),
  315. }
  316. nodes := a.ClusterCache.GetAllNodes()
  317. cachePods := a.ClusterCache.GetAllPods()
  318. info.ClusterInfo["nodeCount"] = strconv.Itoa(len(nodes))
  319. info.ClusterInfo["podCount"] = strconv.Itoa(len(cachePods))
  320. body, err := json.Marshal(info)
  321. if err != nil {
  322. http.Error(w, fmt.Sprintf("Error decoding pod: %s", err.Error()), http.StatusInternalServerError)
  323. return
  324. }
  325. w.Write(body)
  326. }
  327. func GetKubecostContainers(kubeClientSet kubernetes.Interface) ([]ContainerInfo, error) {
  328. pods, err := kubeClientSet.CoreV1().Pods(env.GetInstallNamespace()).List(context.Background(), metav1.ListOptions{
  329. LabelSelector: "app=cost-analyzer",
  330. FieldSelector: "status.phase=Running",
  331. Limit: 1,
  332. })
  333. if err != nil {
  334. return nil, fmt.Errorf("failed to query kubernetes client for kubecost pods: %s", err)
  335. }
  336. // If we have zero pods either something is weird with the install since the app selector is not exposed in the helm
  337. // chart or more likely we are running locally - in either case Images field will return as null
  338. var containers []ContainerInfo
  339. if len(pods.Items) > 0 {
  340. for _, pod := range pods.Items {
  341. for _, container := range pod.Spec.Containers {
  342. c := ContainerInfo{
  343. ContainerName: container.Name,
  344. Image: container.Image,
  345. StartTime: pod.Status.StartTime.String(),
  346. }
  347. containers = append(containers, c)
  348. }
  349. }
  350. }
  351. return containers, nil
  352. }
  353. func (a *Accesses) AddServiceKey(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  354. w.Header().Set("Content-Type", "application/json")
  355. w.Header().Set("Access-Control-Allow-Origin", "*")
  356. r.ParseForm()
  357. key := r.PostForm.Get("key")
  358. k := []byte(key)
  359. err := os.WriteFile(path.Join(env.GetConfigPathWithDefault(env.DefaultConfigMountPath), "key.json"), k, 0644)
  360. if err != nil {
  361. fmt.Fprintf(w, "Error writing service key: %s", err)
  362. }
  363. w.WriteHeader(http.StatusOK)
  364. }
  365. func (a *Accesses) GetHelmValues(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  366. w.Header().Set("Content-Type", "application/json")
  367. w.Header().Set("Access-Control-Allow-Origin", "*")
  368. encodedValues := sysenv.Get("HELM_VALUES", "")
  369. if encodedValues == "" {
  370. fmt.Fprintf(w, "Values reporting disabled")
  371. return
  372. }
  373. result, err := base64.StdEncoding.DecodeString(encodedValues)
  374. if err != nil {
  375. fmt.Fprintf(w, "Failed to decode encoded values: %s", err)
  376. return
  377. }
  378. w.Write(result)
  379. }
  380. func Initialize(router *httprouter.Router, additionalConfigWatchers ...*watcher.ConfigMapWatcher) *Accesses {
  381. var err error
  382. // Kubernetes API setup
  383. kubeClientset, err := kubeconfig.LoadKubeClient("")
  384. if err != nil {
  385. log.Fatalf("Failed to build Kubernetes client: %s", err.Error())
  386. }
  387. // Create Kubernetes Cluster Cache + Watchers
  388. k8sCache := clusterc.NewKubernetesClusterCache(kubeClientset)
  389. k8sCache.Run()
  390. // Create ConfigFileManager for synchronization of shared configuration
  391. confManager := config.NewConfigFileManager(&config.ConfigFileManagerOpts{
  392. BucketStoreConfig: env.GetConfigBucketFile(),
  393. LocalConfigPath: "/",
  394. })
  395. configPrefix := env.GetConfigPathWithDefault("/var/configs/")
  396. cloudProviderKey := env.GetCloudProviderAPIKey()
  397. cloudProvider, err := provider.NewProvider(k8sCache, cloudProviderKey, confManager)
  398. if err != nil {
  399. panic(err.Error())
  400. }
  401. // ClusterInfo Provider to provide the cluster map with local and remote cluster data
  402. var clusterInfoProvider clusters.ClusterInfoProvider
  403. if env.IsClusterInfoFileEnabled() {
  404. clusterInfoFile := confManager.ConfigFileAt(path.Join(configPrefix, "cluster-info.json"))
  405. clusterInfoProvider = NewConfiguredClusterInfoProvider(clusterInfoFile)
  406. } else {
  407. clusterInfoProvider = NewLocalClusterInfoProvider(kubeClientset, cloudProvider)
  408. }
  409. const maxRetries = 10
  410. const retryInterval = 10 * time.Second
  411. var fatalErr error
  412. ctx, cancel := context.WithCancel(context.Background())
  413. fn := func() (source.OpenCostDataSource, error) {
  414. ds, e := prom.NewDefaultPrometheusDataSource(clusterInfoProvider)
  415. if e != nil {
  416. if source.IsRetryable(e) {
  417. return nil, e
  418. }
  419. fatalErr = e
  420. cancel()
  421. }
  422. return ds, e
  423. }
  424. if env.IsCollectorDataSourceEnabled() {
  425. fn = func() (source.OpenCostDataSource, error) {
  426. store := getStorage()
  427. nodeStatConf, err := NewNodeClientConfigFromEnv()
  428. if err != nil {
  429. return nil, fmt.Errorf("failed to get node client config: %w", err)
  430. }
  431. clusterConfig, err := kubeconfig.LoadKubeconfig("")
  432. if err != nil {
  433. return nil, fmt.Errorf("failed to load kube config: %w", err)
  434. }
  435. nodeStatClient := nodestats.NewNodeStatsSummaryClient(k8sCache, nodeStatConf, clusterConfig)
  436. ds := collector.NewDefaultCollectorDataSource(
  437. store,
  438. clusterInfoProvider,
  439. k8sCache,
  440. nodeStatClient,
  441. )
  442. return ds, nil
  443. }
  444. }
  445. dataSource, _ := retry.Retry(
  446. ctx,
  447. fn,
  448. maxRetries,
  449. retryInterval,
  450. )
  451. if fatalErr != nil {
  452. log.Fatalf("Failed to create Prometheus data source: %s", fatalErr)
  453. panic(fatalErr)
  454. }
  455. // Append the pricing config watcher
  456. kubecostNamespace := env.GetInstallNamespace()
  457. configWatchers := watcher.NewConfigMapWatchers(kubeClientset, kubecostNamespace, additionalConfigWatchers...)
  458. configWatchers.AddWatcher(provider.ConfigWatcherFor(cloudProvider))
  459. configWatchers.AddWatcher(metrics.GetMetricsConfigWatcher())
  460. configWatchers.Watch()
  461. clusterMap := dataSource.ClusterMap()
  462. settingsCache := cache.New(cache.NoExpiration, cache.NoExpiration)
  463. costModel := NewCostModel(dataSource, cloudProvider, k8sCache, clusterMap, dataSource.BatchDuration())
  464. metricsEmitter := NewCostModelMetricsEmitter(k8sCache, cloudProvider, clusterInfoProvider, costModel)
  465. a := &Accesses{
  466. DataSource: dataSource,
  467. KubeClientSet: kubeClientset,
  468. ClusterCache: k8sCache,
  469. ClusterMap: clusterMap,
  470. CloudProvider: cloudProvider,
  471. ConfigFileManager: confManager,
  472. ClusterInfoProvider: clusterInfoProvider,
  473. Model: costModel,
  474. MetricsEmitter: metricsEmitter,
  475. SettingsCache: settingsCache,
  476. }
  477. // Initialize mechanism for subscribing to settings changes
  478. a.InitializeSettingsPubSub()
  479. err = a.CloudProvider.DownloadPricingData()
  480. if err != nil {
  481. log.Infof("Failed to download pricing data: %s", err)
  482. }
  483. if !env.IsKubecostMetricsPodEnabled() {
  484. a.MetricsEmitter.Start()
  485. }
  486. a.DataSource.RegisterEndPoints(router)
  487. router.GET("/costDataModel", a.CostDataModel)
  488. router.GET("/allocation/compute", a.ComputeAllocationHandler)
  489. router.GET("/allocation/compute/summary", a.ComputeAllocationHandlerSummary)
  490. router.GET("/allNodePricing", a.GetAllNodePricing)
  491. router.POST("/refreshPricing", a.RefreshPricingData)
  492. router.GET("/managementPlatform", a.ManagementPlatform)
  493. router.GET("/clusterInfo", a.ClusterInfo)
  494. router.GET("/clusterInfoMap", a.GetClusterInfoMap)
  495. router.GET("/serviceAccountStatus", a.GetServiceAccountStatus)
  496. router.GET("/pricingSourceStatus", a.GetPricingSourceStatus)
  497. router.GET("/pricingSourceSummary", a.GetPricingSourceSummary)
  498. router.GET("/pricingSourceCounts", a.GetPricingSourceCounts)
  499. router.GET("/orphanedPods", a.GetOrphanedPods)
  500. router.GET("/installNamespace", a.GetInstallNamespace)
  501. router.GET("/installInfo", a.GetInstallInfo)
  502. router.POST("/serviceKey", a.AddServiceKey)
  503. router.GET("/helmValues", a.GetHelmValues)
  504. return a
  505. }
  506. func getStorage() storage.Storage {
  507. var store storage.Storage
  508. pvMountPath := env.GetPVMountPath()
  509. if pvMountPath != "" {
  510. store = storage.NewFileStorage(pvMountPath)
  511. }
  512. return store
  513. }
  514. // InitializeCloudCost Initializes Cloud Cost pipeline and querier and registers endpoints
  515. func InitializeCloudCost(router *httprouter.Router, providerConfig models.ProviderConfig) {
  516. log.Debugf("Cloud Cost config path: %s", env.GetCloudCostConfigPath())
  517. cloudConfigController := cloudconfig.NewMemoryController(providerConfig)
  518. repo := cloudcost.NewMemoryRepository()
  519. cloudCostPipelineService := cloudcost.NewPipelineService(repo, cloudConfigController, cloudcost.DefaultIngestorConfiguration())
  520. repoQuerier := cloudcost.NewRepositoryQuerier(repo)
  521. cloudCostQueryService := cloudcost.NewQueryService(repoQuerier, repoQuerier)
  522. router.GET("/cloud/config/export", cloudConfigController.GetExportConfigHandler())
  523. router.GET("/cloud/config/enable", cloudConfigController.GetEnableConfigHandler())
  524. router.GET("/cloud/config/disable", cloudConfigController.GetDisableConfigHandler())
  525. router.GET("/cloud/config/delete", cloudConfigController.GetDeleteConfigHandler())
  526. router.GET("/cloudCost", cloudCostQueryService.GetCloudCostHandler())
  527. router.GET("/cloudCost/view/graph", cloudCostQueryService.GetCloudCostViewGraphHandler())
  528. router.GET("/cloudCost/view/totals", cloudCostQueryService.GetCloudCostViewTotalsHandler())
  529. router.GET("/cloudCost/view/table", cloudCostQueryService.GetCloudCostViewTableHandler())
  530. router.GET("/cloudCost/status", cloudCostPipelineService.GetCloudCostStatusHandler())
  531. router.GET("/cloudCost/rebuild", cloudCostPipelineService.GetCloudCostRebuildHandler())
  532. router.GET("/cloudCost/repair", cloudCostPipelineService.GetCloudCostRepairHandler())
  533. }
  534. func InitializeCustomCost(router *httprouter.Router) *customcost.PipelineService {
  535. hourlyRepo := customcost.NewMemoryRepository()
  536. dailyRepo := customcost.NewMemoryRepository()
  537. ingConfig := customcost.DefaultIngestorConfiguration()
  538. var err error
  539. customCostPipelineService, err := customcost.NewPipelineService(hourlyRepo, dailyRepo, ingConfig)
  540. if err != nil {
  541. log.Errorf("error instantiating custom cost pipeline service: %v", err)
  542. return nil
  543. }
  544. customCostQuerier := customcost.NewRepositoryQuerier(hourlyRepo, dailyRepo, ingConfig.HourlyDuration, ingConfig.DailyDuration)
  545. customCostQueryService := customcost.NewQueryService(customCostQuerier)
  546. router.GET("/customCost/total", customCostQueryService.GetCustomCostTotalHandler())
  547. router.GET("/customCost/timeseries", customCostQueryService.GetCustomCostTimeseriesHandler())
  548. return customCostPipelineService
  549. }