router.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627
  1. package costmodel
  2. import (
  3. "context"
  4. "encoding/base64"
  5. "fmt"
  6. "net/http"
  7. "os"
  8. "reflect"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. "time"
  13. "github.com/opencost/opencost/core/pkg/kubeconfig"
  14. "github.com/opencost/opencost/core/pkg/nodestats"
  15. "github.com/opencost/opencost/core/pkg/protocol"
  16. "github.com/opencost/opencost/core/pkg/source"
  17. "github.com/opencost/opencost/core/pkg/storage"
  18. "github.com/opencost/opencost/core/pkg/util/retry"
  19. "github.com/opencost/opencost/core/pkg/util/timeutil"
  20. "github.com/opencost/opencost/core/pkg/version"
  21. cloudconfig "github.com/opencost/opencost/pkg/cloud/config"
  22. "github.com/opencost/opencost/pkg/cloud/provider"
  23. "github.com/opencost/opencost/pkg/cloudcost"
  24. "github.com/opencost/opencost/pkg/config"
  25. "github.com/opencost/opencost/pkg/customcost"
  26. "github.com/opencost/opencost/pkg/metrics"
  27. "github.com/opencost/opencost/pkg/util/watcher"
  28. "github.com/julienschmidt/httprouter"
  29. "github.com/opencost/opencost/core/pkg/clustercache"
  30. "github.com/opencost/opencost/core/pkg/clusters"
  31. sysenv "github.com/opencost/opencost/core/pkg/env"
  32. "github.com/opencost/opencost/core/pkg/log"
  33. "github.com/opencost/opencost/core/pkg/util/json"
  34. "github.com/opencost/opencost/modules/collector-source/pkg/collector"
  35. "github.com/opencost/opencost/modules/prometheus-source/pkg/prom"
  36. "github.com/opencost/opencost/pkg/cloud/models"
  37. clusterc "github.com/opencost/opencost/pkg/clustercache"
  38. "github.com/opencost/opencost/pkg/env"
  39. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  40. "github.com/patrickmn/go-cache"
  41. "k8s.io/client-go/kubernetes"
  42. )
  43. const (
  44. RFC3339Milli = "2006-01-02T15:04:05.000Z"
  45. CustomPricingSetting = "CustomPricing"
  46. DiscountSetting = "Discount"
  47. )
  48. var (
  49. // gitCommit is set by the build system
  50. gitCommit string
  51. proto = protocol.HTTP()
  52. )
  53. // Accesses defines a singleton application instance, providing access to
  54. // Prometheus, Kubernetes, the cloud provider, and caches.
  55. type Accesses struct {
  56. DataSource source.OpenCostDataSource
  57. KubeClientSet kubernetes.Interface
  58. ClusterCache clustercache.ClusterCache
  59. ClusterMap clusters.ClusterMap
  60. CloudProvider models.Provider
  61. ConfigFileManager *config.ConfigFileManager
  62. ClusterInfoProvider clusters.ClusterInfoProvider
  63. Model *CostModel
  64. MetricsEmitter *CostModelMetricsEmitter
  65. // SettingsCache stores current state of app settings
  66. SettingsCache *cache.Cache
  67. // settingsSubscribers tracks channels through which changes to different
  68. // settings will be published in a pub/sub model
  69. settingsSubscribers map[string][]chan string
  70. settingsMutex sync.Mutex
  71. }
  72. func filterFields(fields string, data map[string]*CostData) map[string]CostData {
  73. fs := strings.Split(fields, ",")
  74. fmap := make(map[string]bool)
  75. for _, f := range fs {
  76. fieldNameLower := strings.ToLower(f) // convert to go struct name by uppercasing first letter
  77. log.Debugf("to delete: %s", fieldNameLower)
  78. fmap[fieldNameLower] = true
  79. }
  80. filteredData := make(map[string]CostData)
  81. for cname, costdata := range data {
  82. s := reflect.TypeOf(*costdata)
  83. val := reflect.ValueOf(*costdata)
  84. costdata2 := CostData{}
  85. cd2 := reflect.New(reflect.Indirect(reflect.ValueOf(costdata2)).Type()).Elem()
  86. n := s.NumField()
  87. for i := 0; i < n; i++ {
  88. field := s.Field(i)
  89. value := val.Field(i)
  90. value2 := cd2.Field(i)
  91. if _, ok := fmap[strings.ToLower(field.Name)]; !ok {
  92. value2.Set(reflect.Value(value))
  93. }
  94. }
  95. filteredData[cname] = cd2.Interface().(CostData)
  96. }
  97. return filteredData
  98. }
  99. // ParsePercentString takes a string of expected format "N%" and returns a floating point 0.0N.
  100. // If the "%" symbol is missing, it just returns 0.0N. Empty string is interpreted as "0%" and
  101. // return 0.0.
  102. func ParsePercentString(percentStr string) (float64, error) {
  103. if len(percentStr) == 0 {
  104. return 0.0, nil
  105. }
  106. if percentStr[len(percentStr)-1:] == "%" {
  107. percentStr = percentStr[:len(percentStr)-1]
  108. }
  109. discount, err := strconv.ParseFloat(percentStr, 64)
  110. if err != nil {
  111. return 0.0, err
  112. }
  113. discount *= 0.01
  114. return discount, nil
  115. }
  116. func WriteData(w http.ResponseWriter, data interface{}, err error) {
  117. if err != nil {
  118. proto.WriteError(w, proto.InternalServerError(err.Error()))
  119. return
  120. }
  121. proto.WriteData(w, data)
  122. }
  123. // RefreshPricingData needs to be called when a new node joins the fleet, since we cache the relevant subsets of pricing data to avoid storing the whole thing.
  124. func (a *Accesses) RefreshPricingData(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  125. w.Header().Set("Content-Type", "application/json")
  126. w.Header().Set("Access-Control-Allow-Origin", "*")
  127. err := a.CloudProvider.DownloadPricingData()
  128. if err != nil {
  129. log.Errorf("Error refreshing pricing data: %s", err.Error())
  130. }
  131. WriteData(w, nil, err)
  132. }
  133. func (a *Accesses) CostDataModel(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  134. w.Header().Set("Content-Type", "application/json")
  135. w.Header().Set("Access-Control-Allow-Origin", "*")
  136. window := r.URL.Query().Get("timeWindow")
  137. offset := r.URL.Query().Get("offset")
  138. fields := r.URL.Query().Get("filterFields")
  139. namespace := r.URL.Query().Get("namespace")
  140. duration, err := timeutil.ParseDuration(window)
  141. if err != nil {
  142. WriteData(w, nil, fmt.Errorf("error parsing window (%s): %s", window, err))
  143. return
  144. }
  145. end := time.Now()
  146. if offset != "" {
  147. offsetDur, err := timeutil.ParseDuration(offset)
  148. if err != nil {
  149. WriteData(w, nil, fmt.Errorf("error parsing offset (%s): %s", offset, err))
  150. return
  151. }
  152. end = end.Add(-offsetDur)
  153. }
  154. start := end.Add(-duration)
  155. data, err := a.Model.ComputeCostData(start, end)
  156. // apply filter by removing if != namespace
  157. if namespace != "" {
  158. for key, costData := range data {
  159. if costData.Namespace != namespace {
  160. delete(data, key)
  161. }
  162. }
  163. }
  164. if fields != "" {
  165. filteredData := filterFields(fields, data)
  166. WriteData(w, filteredData, err)
  167. } else {
  168. WriteData(w, data, err)
  169. }
  170. }
  171. func (a *Accesses) GetAllNodePricing(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  172. w.Header().Set("Content-Type", "application/json")
  173. w.Header().Set("Access-Control-Allow-Origin", "*")
  174. data, err := a.CloudProvider.AllNodePricing()
  175. WriteData(w, data, err)
  176. }
  177. func (a *Accesses) ManagementPlatform(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  178. w.Header().Set("Content-Type", "application/json")
  179. w.Header().Set("Access-Control-Allow-Origin", "*")
  180. data, err := a.CloudProvider.GetManagementPlatform()
  181. WriteData(w, data, err)
  182. }
  183. func (a *Accesses) ClusterInfo(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  184. w.Header().Set("Content-Type", "application/json")
  185. w.Header().Set("Access-Control-Allow-Origin", "*")
  186. data := a.ClusterInfoProvider.GetClusterInfo()
  187. WriteData(w, data, nil)
  188. }
  189. func (a *Accesses) GetClusterInfoMap(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  190. w.Header().Set("Content-Type", "application/json")
  191. w.Header().Set("Access-Control-Allow-Origin", "*")
  192. data := a.ClusterMap.AsMap()
  193. WriteData(w, data, nil)
  194. }
  195. func (a *Accesses) GetServiceAccountStatus(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  196. w.Header().Set("Content-Type", "application/json")
  197. w.Header().Set("Access-Control-Allow-Origin", "*")
  198. WriteData(w, a.CloudProvider.ServiceAccountStatus(), nil)
  199. }
  200. func (a *Accesses) GetPricingSourceStatus(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  201. w.Header().Set("Content-Type", "application/json")
  202. w.Header().Set("Access-Control-Allow-Origin", "*")
  203. data := a.CloudProvider.PricingSourceStatus()
  204. WriteData(w, data, nil)
  205. }
  206. func (a *Accesses) GetPricingSourceCounts(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  207. w.Header().Set("Content-Type", "application/json")
  208. w.Header().Set("Access-Control-Allow-Origin", "*")
  209. data, err := a.Model.GetPricingSourceCounts()
  210. WriteData(w, data, err)
  211. }
  212. func (a *Accesses) GetPricingSourceSummary(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
  213. w.Header().Set("Content-Type", "application/json")
  214. w.Header().Set("Access-Control-Allow-Origin", "*")
  215. data := a.CloudProvider.PricingSourceSummary()
  216. WriteData(w, data, nil)
  217. }
  218. func (a *Accesses) GetOrphanedPods(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  219. w.Header().Set("Content-Type", "application/json")
  220. w.Header().Set("Access-Control-Allow-Origin", "*")
  221. podlist := a.ClusterCache.GetAllPods()
  222. var lonePods []*clustercache.Pod
  223. for _, pod := range podlist {
  224. if len(pod.OwnerReferences) == 0 {
  225. lonePods = append(lonePods, pod)
  226. }
  227. }
  228. body, err := json.Marshal(lonePods)
  229. if err != nil {
  230. fmt.Fprintf(w, "Error decoding pod: %s", err)
  231. } else {
  232. w.Write(body)
  233. }
  234. }
  235. func (a *Accesses) GetInstallNamespace(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  236. w.Header().Set("Content-Type", "application/json")
  237. w.Header().Set("Access-Control-Allow-Origin", "*")
  238. ns := env.GetOpencostNamespace()
  239. w.Write([]byte(ns))
  240. }
  241. type InstallInfo struct {
  242. Containers []ContainerInfo `json:"containers"`
  243. ClusterInfo map[string]string `json:"clusterInfo"`
  244. Version string `json:"version"`
  245. }
  246. type ContainerInfo struct {
  247. ContainerName string `json:"containerName"`
  248. Image string `json:"image"`
  249. StartTime string `json:"startTime"`
  250. }
  251. func (a *Accesses) GetInstallInfo(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  252. w.Header().Set("Content-Type", "application/json")
  253. w.Header().Set("Access-Control-Allow-Origin", "*")
  254. containers, err := GetKubecostContainers(a.KubeClientSet)
  255. if err != nil {
  256. http.Error(w, fmt.Sprintf("Unable to list pods: %s", err.Error()), http.StatusInternalServerError)
  257. return
  258. }
  259. info := InstallInfo{
  260. Containers: containers,
  261. ClusterInfo: make(map[string]string),
  262. Version: version.FriendlyVersion(),
  263. }
  264. nodes := a.ClusterCache.GetAllNodes()
  265. cachePods := a.ClusterCache.GetAllPods()
  266. info.ClusterInfo["nodeCount"] = strconv.Itoa(len(nodes))
  267. info.ClusterInfo["podCount"] = strconv.Itoa(len(cachePods))
  268. body, err := json.Marshal(info)
  269. if err != nil {
  270. http.Error(w, fmt.Sprintf("Error decoding pod: %s", err.Error()), http.StatusInternalServerError)
  271. return
  272. }
  273. w.Write(body)
  274. }
  275. func GetKubecostContainers(kubeClientSet kubernetes.Interface) ([]ContainerInfo, error) {
  276. pods, err := kubeClientSet.CoreV1().Pods(env.GetOpencostNamespace()).List(context.Background(), metav1.ListOptions{
  277. LabelSelector: "app=cost-analyzer",
  278. FieldSelector: "status.phase=Running",
  279. Limit: 1,
  280. })
  281. if err != nil {
  282. return nil, fmt.Errorf("failed to query kubernetes client for kubecost pods: %s", err)
  283. }
  284. // If we have zero pods either something is weird with the install since the app selector is not exposed in the helm
  285. // chart or more likely we are running locally - in either case Images field will return as null
  286. containers := make([]ContainerInfo, 0)
  287. if len(pods.Items) > 0 {
  288. for _, pod := range pods.Items {
  289. for _, container := range pod.Spec.Containers {
  290. c := ContainerInfo{
  291. ContainerName: container.Name,
  292. Image: container.Image,
  293. StartTime: pod.Status.StartTime.String(),
  294. }
  295. containers = append(containers, c)
  296. }
  297. }
  298. }
  299. return containers, nil
  300. }
  301. func (a *Accesses) AddServiceKey(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  302. w.Header().Set("Content-Type", "application/json")
  303. w.Header().Set("Access-Control-Allow-Origin", "*")
  304. r.ParseForm()
  305. key := r.PostForm.Get("key")
  306. k := []byte(key)
  307. err := os.WriteFile(env.GetGCPAuthSecretFilePath(), k, 0644)
  308. if err != nil {
  309. fmt.Fprintf(w, "Error writing service key: %s", err)
  310. }
  311. w.WriteHeader(http.StatusOK)
  312. }
  313. func (a *Accesses) GetHelmValues(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  314. w.Header().Set("Content-Type", "application/json")
  315. w.Header().Set("Access-Control-Allow-Origin", "*")
  316. encodedValues := sysenv.Get("HELM_VALUES", "")
  317. if encodedValues == "" {
  318. fmt.Fprintf(w, "Values reporting disabled")
  319. return
  320. }
  321. result, err := base64.StdEncoding.DecodeString(encodedValues)
  322. if err != nil {
  323. fmt.Fprintf(w, "Failed to decode encoded values: %s", err)
  324. return
  325. }
  326. w.Write(result)
  327. }
  328. func Initialize(router *httprouter.Router, additionalConfigWatchers ...*watcher.ConfigMapWatcher) *Accesses {
  329. var err error
  330. // Kubernetes API setup
  331. kubeClientset, err := kubeconfig.LoadKubeClient("")
  332. if err != nil {
  333. log.Fatalf("Failed to build Kubernetes client: %s", err.Error())
  334. }
  335. clusterUID, err := kubeconfig.GetClusterUID(kubeClientset)
  336. if err != nil {
  337. log.Fatalf("Failed to determine cluster UID: %s", err)
  338. }
  339. // Create Kubernetes Cluster Cache + Watchers
  340. k8sCache := clusterc.NewKubernetesClusterCache(kubeClientset)
  341. k8sCache.Run()
  342. // Create ConfigFileManager for synchronization of shared configuration
  343. confManager := config.NewConfigFileManager(nil)
  344. cloudProviderKey := env.GetCloudProviderAPIKey()
  345. cloudProvider, err := provider.NewProvider(k8sCache, cloudProviderKey, confManager)
  346. if err != nil {
  347. panic(err.Error())
  348. }
  349. // ClusterInfo Provider to provide the cluster map with local and remote cluster data
  350. var clusterInfoProvider clusters.ClusterInfoProvider
  351. if env.IsClusterInfoFileEnabled() {
  352. clusterInfoFile := confManager.ConfigFileAt(env.GetClusterInfoFilePath())
  353. clusterInfoProvider = NewConfiguredClusterInfoProvider(clusterInfoFile)
  354. } else {
  355. clusterInfoProvider = NewLocalClusterInfoProvider(kubeClientset, cloudProvider)
  356. }
  357. const maxRetries = 10
  358. const retryInterval = 10 * time.Second
  359. var fatalErr error
  360. ctx, cancel := context.WithCancel(context.Background())
  361. fn := func() (source.OpenCostDataSource, error) {
  362. ds, e := prom.NewDefaultPrometheusDataSource(clusterInfoProvider)
  363. if e != nil {
  364. if source.IsRetryable(e) {
  365. return nil, e
  366. }
  367. fatalErr = e
  368. cancel()
  369. }
  370. return ds, e
  371. }
  372. if env.IsCollectorDataSourceEnabled() {
  373. fn = func() (source.OpenCostDataSource, error) {
  374. store := GetDefaultCollectorStorage()
  375. nodeStatConf, err := NewNodeClientConfigFromEnv()
  376. if err != nil {
  377. return nil, fmt.Errorf("failed to get node client config: %w", err)
  378. }
  379. clusterConfig, err := kubeconfig.LoadKubeconfig("")
  380. if err != nil {
  381. return nil, fmt.Errorf("failed to load kube config: %w", err)
  382. }
  383. nodeStatClient := nodestats.NewNodeStatsSummaryClient(k8sCache, nodeStatConf, clusterConfig)
  384. ds := collector.NewDefaultCollectorDataSource(
  385. clusterUID,
  386. store,
  387. clusterInfoProvider,
  388. k8sCache,
  389. nodeStatClient,
  390. )
  391. return ds, nil
  392. }
  393. }
  394. dataSource, _ := retry.Retry(
  395. ctx,
  396. fn,
  397. maxRetries,
  398. retryInterval,
  399. )
  400. if fatalErr != nil {
  401. log.Fatalf("Failed to create Prometheus data source: %s", fatalErr)
  402. panic(fatalErr)
  403. }
  404. // Append the pricing config watcher
  405. installNamespace := env.GetOpencostNamespace()
  406. configWatchers := watcher.NewConfigMapWatchers(kubeClientset, installNamespace, additionalConfigWatchers...)
  407. configWatchers.AddWatcher(provider.ConfigWatcherFor(cloudProvider))
  408. configWatchers.AddWatcher(metrics.GetMetricsConfigWatcher())
  409. configWatchers.Watch()
  410. clusterMap := dataSource.ClusterMap()
  411. settingsCache := cache.New(cache.NoExpiration, cache.NoExpiration)
  412. costModel := NewCostModel(clusterUID, dataSource, cloudProvider, k8sCache, clusterMap, dataSource.BatchDuration())
  413. metricsEmitter := NewCostModelMetricsEmitter(k8sCache, cloudProvider, clusterInfoProvider, costModel)
  414. a := &Accesses{
  415. DataSource: dataSource,
  416. KubeClientSet: kubeClientset,
  417. ClusterCache: k8sCache,
  418. ClusterMap: clusterMap,
  419. CloudProvider: cloudProvider,
  420. ConfigFileManager: confManager,
  421. ClusterInfoProvider: clusterInfoProvider,
  422. Model: costModel,
  423. MetricsEmitter: metricsEmitter,
  424. SettingsCache: settingsCache,
  425. }
  426. // Initialize mechanism for subscribing to settings changes
  427. a.InitializeSettingsPubSub()
  428. err = a.CloudProvider.DownloadPricingData()
  429. if err != nil {
  430. log.Infof("Failed to download pricing data: %s", err)
  431. }
  432. if !env.IsKubecostMetricsPodEnabled() {
  433. a.MetricsEmitter.Start()
  434. }
  435. a.DataSource.RegisterEndPoints(router)
  436. router.GET("/costDataModel", a.CostDataModel)
  437. router.GET("/allocation/compute", a.ComputeAllocationHandler)
  438. router.GET("/allocation/compute/summary", a.ComputeAllocationHandlerSummary)
  439. router.GET("/allNodePricing", a.GetAllNodePricing)
  440. router.POST("/refreshPricing", a.RefreshPricingData)
  441. router.GET("/managementPlatform", a.ManagementPlatform)
  442. router.GET("/clusterInfo", a.ClusterInfo)
  443. router.GET("/clusterInfoMap", a.GetClusterInfoMap)
  444. router.GET("/serviceAccountStatus", a.GetServiceAccountStatus)
  445. router.GET("/pricingSourceStatus", a.GetPricingSourceStatus)
  446. router.GET("/pricingSourceSummary", a.GetPricingSourceSummary)
  447. router.GET("/pricingSourceCounts", a.GetPricingSourceCounts)
  448. router.GET("/orphanedPods", a.GetOrphanedPods)
  449. router.GET("/installNamespace", a.GetInstallNamespace)
  450. router.GET("/installInfo", a.GetInstallInfo)
  451. router.POST("/serviceKey", a.AddServiceKey)
  452. router.GET("/helmValues", a.GetHelmValues)
  453. return a
  454. }
  455. // GetDefaultStorage retrieves the default shared storage which is required for running an opencost collector.
  456. func GetDefaultCollectorStorage() storage.Storage {
  457. const warningMessage = `Failed to create local collector directory '%s' - %s.
  458. Did you mean to enable to collector? For persistent storage, it's recommended to use Prometheus,
  459. or set a storage bucket configuration at %s.
  460. %s`
  461. // Try bucket storage if it exists
  462. store, err := storage.TryGetDefaultStorage()
  463. if err == nil {
  464. return store
  465. }
  466. // Fallback to a local storage bucket
  467. dir := env.GetLocalCollectorDirectory()
  468. err = os.MkdirAll(dir, os.ModePerm)
  469. if err != nil {
  470. log.Warnf(
  471. warningMessage,
  472. dir,
  473. err.Error(),
  474. sysenv.GetDefaultStorageConfigFilePath(),
  475. "Falling back to an in-memory file system for collector, which will lose any persistent storage upon restart.",
  476. )
  477. return storage.NewMemoryStorage()
  478. }
  479. return storage.NewFileStorage(dir)
  480. }
  481. // InitializeCloudCost Initializes Cloud Cost pipeline and querier and registers endpoints
  482. func InitializeCloudCost(router *httprouter.Router, providerConfig models.ProviderConfig) *cloudcost.PipelineService {
  483. log.Debugf("Cloud Cost config path: %s", env.GetCloudCostConfigPath())
  484. cloudConfigController := cloudconfig.NewMemoryController(providerConfig)
  485. repo := cloudcost.NewMemoryRepository()
  486. cloudCostPipelineService := cloudcost.NewPipelineService(repo, cloudConfigController, cloudcost.DefaultIngestorConfiguration())
  487. repoQuerier := cloudcost.NewRepositoryQuerier(repo)
  488. cloudCostQueryService := cloudcost.NewQueryService(repoQuerier, repoQuerier)
  489. router.GET("/cloud/config/export", cloudConfigController.GetExportConfigHandler())
  490. router.GET("/cloud/config/enable", cloudConfigController.GetEnableConfigHandler())
  491. router.GET("/cloud/config/disable", cloudConfigController.GetDisableConfigHandler())
  492. router.GET("/cloud/config/delete", cloudConfigController.GetDeleteConfigHandler())
  493. router.GET("/cloudCost", cloudCostQueryService.GetCloudCostHandler())
  494. router.GET("/cloudCost/view/graph", cloudCostQueryService.GetCloudCostViewGraphHandler())
  495. router.GET("/cloudCost/view/totals", cloudCostQueryService.GetCloudCostViewTotalsHandler())
  496. router.GET("/cloudCost/view/table", cloudCostQueryService.GetCloudCostViewTableHandler(nil))
  497. router.GET("/cloudCost/status", cloudCostPipelineService.GetCloudCostStatusHandler())
  498. router.GET("/cloudCost/rebuild", cloudCostPipelineService.GetCloudCostRebuildHandler())
  499. router.GET("/cloudCost/repair", cloudCostPipelineService.GetCloudCostRepairHandler())
  500. return cloudCostPipelineService
  501. }
  502. func InitializeCustomCost(router *httprouter.Router) *customcost.PipelineService {
  503. hourlyRepo := customcost.NewMemoryRepository()
  504. dailyRepo := customcost.NewMemoryRepository()
  505. ingConfig := customcost.DefaultIngestorConfiguration()
  506. var err error
  507. customCostPipelineService, err := customcost.NewPipelineService(hourlyRepo, dailyRepo, ingConfig)
  508. if err != nil {
  509. log.Errorf("error instantiating custom cost pipeline service: %v", err)
  510. return nil
  511. }
  512. customCostQuerier := customcost.NewRepositoryQuerier(hourlyRepo, dailyRepo, ingConfig.HourlyDuration, ingConfig.DailyDuration)
  513. customCostQueryService := customcost.NewQueryService(customCostQuerier)
  514. router.GET("/customCost/total", customCostQueryService.GetCustomCostTotalHandler())
  515. router.GET("/customCost/timeseries", customCostQueryService.GetCustomCostTimeseriesHandler())
  516. return customCostPipelineService
  517. }