router.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653
  1. package costmodel
  2. import (
  3. "context"
  4. "crypto/subtle"
  5. "encoding/base64"
  6. "fmt"
  7. "net/http"
  8. "os"
  9. "reflect"
  10. "strconv"
  11. "strings"
  12. "sync"
  13. "time"
  14. "github.com/opencost/opencost/core/pkg/kubeconfig"
  15. "github.com/opencost/opencost/core/pkg/nodestats"
  16. "github.com/opencost/opencost/core/pkg/protocol"
  17. "github.com/opencost/opencost/core/pkg/source"
  18. "github.com/opencost/opencost/core/pkg/storage"
  19. "github.com/opencost/opencost/core/pkg/util/retry"
  20. "github.com/opencost/opencost/core/pkg/util/timeutil"
  21. "github.com/opencost/opencost/core/pkg/version"
  22. cloudconfig "github.com/opencost/opencost/pkg/cloud/config"
  23. "github.com/opencost/opencost/pkg/cloud/provider"
  24. "github.com/opencost/opencost/pkg/cloudcost"
  25. "github.com/opencost/opencost/pkg/config"
  26. "github.com/opencost/opencost/pkg/customcost"
  27. "github.com/opencost/opencost/pkg/metrics"
  28. "github.com/opencost/opencost/pkg/util/watcher"
  29. "github.com/julienschmidt/httprouter"
  30. "github.com/opencost/opencost/core/pkg/clustercache"
  31. "github.com/opencost/opencost/core/pkg/clusters"
  32. sysenv "github.com/opencost/opencost/core/pkg/env"
  33. "github.com/opencost/opencost/core/pkg/log"
  34. "github.com/opencost/opencost/core/pkg/util/json"
  35. "github.com/opencost/opencost/modules/collector-source/pkg/collector"
  36. "github.com/opencost/opencost/modules/prometheus-source/pkg/prom"
  37. "github.com/opencost/opencost/pkg/cloud/models"
  38. clusterc "github.com/opencost/opencost/pkg/clustercache"
  39. "github.com/opencost/opencost/pkg/env"
  40. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  41. "github.com/patrickmn/go-cache"
  42. "k8s.io/client-go/kubernetes"
  43. )
  44. const (
  45. RFC3339Milli = "2006-01-02T15:04:05.000Z"
  46. CustomPricingSetting = "CustomPricing"
  47. DiscountSetting = "Discount"
  48. )
  49. var (
  50. // gitCommit is set by the build system
  51. gitCommit string
  52. proto = protocol.HTTP()
  53. )
  54. // Accesses defines a singleton application instance, providing access to
  55. // Prometheus, Kubernetes, the cloud provider, and caches.
  56. type Accesses struct {
  57. DataSource source.OpenCostDataSource
  58. KubeClientSet kubernetes.Interface
  59. ClusterCache clustercache.ClusterCache
  60. ClusterMap clusters.ClusterMap
  61. CloudProvider models.Provider
  62. ConfigFileManager *config.ConfigFileManager
  63. ClusterInfoProvider clusters.ClusterInfoProvider
  64. Model *CostModel
  65. MetricsEmitter *CostModelMetricsEmitter
  66. // SettingsCache stores current state of app settings
  67. SettingsCache *cache.Cache
  68. // settingsSubscribers tracks channels through which changes to different
  69. // settings will be published in a pub/sub model
  70. settingsSubscribers map[string][]chan string
  71. settingsMutex sync.Mutex
  72. }
  73. func filterFields(fields string, data map[string]*CostData) map[string]CostData {
  74. fs := strings.Split(fields, ",")
  75. fmap := make(map[string]bool)
  76. for _, f := range fs {
  77. fieldNameLower := strings.ToLower(f) // convert to go struct name by uppercasing first letter
  78. log.Debugf("to delete: %s", fieldNameLower)
  79. fmap[fieldNameLower] = true
  80. }
  81. filteredData := make(map[string]CostData)
  82. for cname, costdata := range data {
  83. s := reflect.TypeOf(*costdata)
  84. val := reflect.ValueOf(*costdata)
  85. costdata2 := CostData{}
  86. cd2 := reflect.New(reflect.Indirect(reflect.ValueOf(costdata2)).Type()).Elem()
  87. n := s.NumField()
  88. for i := 0; i < n; i++ {
  89. field := s.Field(i)
  90. value := val.Field(i)
  91. value2 := cd2.Field(i)
  92. if _, ok := fmap[strings.ToLower(field.Name)]; !ok {
  93. value2.Set(reflect.Value(value))
  94. }
  95. }
  96. filteredData[cname] = cd2.Interface().(CostData)
  97. }
  98. return filteredData
  99. }
  100. // ParsePercentString takes a string of expected format "N%" and returns a floating point 0.0N.
  101. // If the "%" symbol is missing, it just returns 0.0N. Empty string is interpreted as "0%" and
  102. // return 0.0.
  103. func ParsePercentString(percentStr string) (float64, error) {
  104. if len(percentStr) == 0 {
  105. return 0.0, nil
  106. }
  107. if percentStr[len(percentStr)-1:] == "%" {
  108. percentStr = percentStr[:len(percentStr)-1]
  109. }
  110. discount, err := strconv.ParseFloat(percentStr, 64)
  111. if err != nil {
  112. return 0.0, err
  113. }
  114. discount *= 0.01
  115. return discount, nil
  116. }
  117. // adminAuthMiddleware wraps a handler and requires a Bearer token matching ADMIN_TOKEN env var when set.
  118. // When ADMIN_TOKEN is not set, logs a deduped warning and allows the request through.
  119. // When ADMIN_TOKEN is set, returns 401 if the Bearer token is missing or 403 if it does not match.
  120. func adminAuthMiddleware(next httprouter.Handle) httprouter.Handle {
  121. return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  122. adminToken := env.GetAdminToken()
  123. if adminToken == "" {
  124. log.DedupedWarningf(5, "Admin token (ADMIN_TOKEN) not configured; write operations are unauthenticated")
  125. next(w, r, ps)
  126. return
  127. }
  128. authHeader := r.Header.Get("Authorization")
  129. const prefix = "Bearer "
  130. if !strings.HasPrefix(authHeader, prefix) {
  131. http.Error(w, "Missing or invalid authorization", http.StatusUnauthorized)
  132. return
  133. }
  134. bearerToken := strings.TrimPrefix(authHeader, prefix)
  135. if subtle.ConstantTimeCompare([]byte(bearerToken), []byte(adminToken)) != 1 {
  136. http.Error(w, "Missing or invalid authorization", http.StatusForbidden)
  137. return
  138. }
  139. next(w, r, ps)
  140. }
  141. }
  142. func WriteData(w http.ResponseWriter, data interface{}, err error) {
  143. if err != nil {
  144. proto.WriteError(w, proto.InternalServerError(err.Error()))
  145. return
  146. }
  147. proto.WriteData(w, data)
  148. }
  149. // RefreshPricingData needs to be called when a new node joins the fleet, since we cache the relevant subsets of pricing data to avoid storing the whole thing.
  150. func (a *Accesses) RefreshPricingData(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  151. w.Header().Set("Content-Type", "application/json")
  152. w.Header().Set("Access-Control-Allow-Origin", "*")
  153. err := a.CloudProvider.DownloadPricingData()
  154. if err != nil {
  155. log.Errorf("Error refreshing pricing data: %s", err.Error())
  156. }
  157. WriteData(w, nil, err)
  158. }
  159. func (a *Accesses) CostDataModel(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  160. w.Header().Set("Content-Type", "application/json")
  161. w.Header().Set("Access-Control-Allow-Origin", "*")
  162. window := r.URL.Query().Get("timeWindow")
  163. offset := r.URL.Query().Get("offset")
  164. fields := r.URL.Query().Get("filterFields")
  165. namespace := r.URL.Query().Get("namespace")
  166. duration, err := timeutil.ParseDuration(window)
  167. if err != nil {
  168. WriteData(w, nil, fmt.Errorf("error parsing window (%s): %s", window, err))
  169. return
  170. }
  171. end := time.Now()
  172. if offset != "" {
  173. offsetDur, err := timeutil.ParseDuration(offset)
  174. if err != nil {
  175. WriteData(w, nil, fmt.Errorf("error parsing offset (%s): %s", offset, err))
  176. return
  177. }
  178. end = end.Add(-offsetDur)
  179. }
  180. start := end.Add(-duration)
  181. data, err := a.Model.ComputeCostData(start, end)
  182. // apply filter by removing if != namespace
  183. if namespace != "" {
  184. for key, costData := range data {
  185. if costData.Namespace != namespace {
  186. delete(data, key)
  187. }
  188. }
  189. }
  190. if fields != "" {
  191. filteredData := filterFields(fields, data)
  192. WriteData(w, filteredData, err)
  193. } else {
  194. WriteData(w, data, err)
  195. }
  196. }
  197. func (a *Accesses) GetAllNodePricing(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  198. w.Header().Set("Content-Type", "application/json")
  199. w.Header().Set("Access-Control-Allow-Origin", "*")
  200. data, err := a.CloudProvider.AllNodePricing()
  201. WriteData(w, data, err)
  202. }
  203. func (a *Accesses) ManagementPlatform(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  204. w.Header().Set("Content-Type", "application/json")
  205. w.Header().Set("Access-Control-Allow-Origin", "*")
  206. data, err := a.CloudProvider.GetManagementPlatform()
  207. WriteData(w, data, err)
  208. }
  209. func (a *Accesses) ClusterInfo(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  210. w.Header().Set("Content-Type", "application/json")
  211. w.Header().Set("Access-Control-Allow-Origin", "*")
  212. data := a.ClusterInfoProvider.GetClusterInfo()
  213. WriteData(w, data, nil)
  214. }
  215. func (a *Accesses) GetClusterInfoMap(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  216. w.Header().Set("Content-Type", "application/json")
  217. w.Header().Set("Access-Control-Allow-Origin", "*")
  218. data := a.ClusterMap.AsMap()
  219. WriteData(w, data, nil)
  220. }
  221. func (a *Accesses) GetServiceAccountStatus(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  222. w.Header().Set("Content-Type", "application/json")
  223. w.Header().Set("Access-Control-Allow-Origin", "*")
  224. WriteData(w, a.CloudProvider.ServiceAccountStatus(), nil)
  225. }
  226. func (a *Accesses) GetPricingSourceStatus(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  227. w.Header().Set("Content-Type", "application/json")
  228. w.Header().Set("Access-Control-Allow-Origin", "*")
  229. data := a.CloudProvider.PricingSourceStatus()
  230. WriteData(w, data, nil)
  231. }
  232. func (a *Accesses) GetPricingSourceCounts(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
  233. w.Header().Set("Content-Type", "application/json")
  234. w.Header().Set("Access-Control-Allow-Origin", "*")
  235. data, err := a.Model.GetPricingSourceCounts()
  236. WriteData(w, data, err)
  237. }
  238. func (a *Accesses) GetPricingSourceSummary(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
  239. w.Header().Set("Content-Type", "application/json")
  240. w.Header().Set("Access-Control-Allow-Origin", "*")
  241. data := a.CloudProvider.PricingSourceSummary()
  242. WriteData(w, data, nil)
  243. }
  244. func (a *Accesses) GetOrphanedPods(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  245. w.Header().Set("Content-Type", "application/json")
  246. w.Header().Set("Access-Control-Allow-Origin", "*")
  247. podlist := a.ClusterCache.GetAllPods()
  248. var lonePods []*clustercache.Pod
  249. for _, pod := range podlist {
  250. if len(pod.OwnerReferences) == 0 {
  251. lonePods = append(lonePods, pod)
  252. }
  253. }
  254. body, err := json.Marshal(lonePods)
  255. if err != nil {
  256. fmt.Fprintf(w, "Error decoding pod: %s", err)
  257. } else {
  258. w.Write(body)
  259. }
  260. }
  261. func (a *Accesses) GetInstallNamespace(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  262. w.Header().Set("Content-Type", "application/json")
  263. w.Header().Set("Access-Control-Allow-Origin", "*")
  264. ns := env.GetOpencostNamespace()
  265. w.Write([]byte(ns))
  266. }
  267. type InstallInfo struct {
  268. Containers []ContainerInfo `json:"containers"`
  269. ClusterInfo map[string]string `json:"clusterInfo"`
  270. Version string `json:"version"`
  271. }
  272. type ContainerInfo struct {
  273. ContainerName string `json:"containerName"`
  274. Image string `json:"image"`
  275. StartTime string `json:"startTime"`
  276. }
  277. func (a *Accesses) GetInstallInfo(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  278. w.Header().Set("Content-Type", "application/json")
  279. w.Header().Set("Access-Control-Allow-Origin", "*")
  280. containers, err := GetKubecostContainers(a.KubeClientSet)
  281. if err != nil {
  282. http.Error(w, fmt.Sprintf("Unable to list pods: %s", err.Error()), http.StatusInternalServerError)
  283. return
  284. }
  285. info := InstallInfo{
  286. Containers: containers,
  287. ClusterInfo: make(map[string]string),
  288. Version: version.FriendlyVersion(),
  289. }
  290. nodes := a.ClusterCache.GetAllNodes()
  291. cachePods := a.ClusterCache.GetAllPods()
  292. info.ClusterInfo["nodeCount"] = strconv.Itoa(len(nodes))
  293. info.ClusterInfo["podCount"] = strconv.Itoa(len(cachePods))
  294. body, err := json.Marshal(info)
  295. if err != nil {
  296. http.Error(w, fmt.Sprintf("Error decoding pod: %s", err.Error()), http.StatusInternalServerError)
  297. return
  298. }
  299. w.Write(body)
  300. }
  301. func GetKubecostContainers(kubeClientSet kubernetes.Interface) ([]ContainerInfo, error) {
  302. pods, err := kubeClientSet.CoreV1().Pods(env.GetOpencostNamespace()).List(context.Background(), metav1.ListOptions{
  303. LabelSelector: "app=cost-analyzer",
  304. FieldSelector: "status.phase=Running",
  305. Limit: 1,
  306. })
  307. if err != nil {
  308. return nil, fmt.Errorf("failed to query kubernetes client for kubecost pods: %s", err)
  309. }
  310. // If we have zero pods either something is weird with the install since the app selector is not exposed in the helm
  311. // chart or more likely we are running locally - in either case Images field will return as null
  312. containers := make([]ContainerInfo, 0)
  313. if len(pods.Items) > 0 {
  314. for _, pod := range pods.Items {
  315. for _, container := range pod.Spec.Containers {
  316. c := ContainerInfo{
  317. ContainerName: container.Name,
  318. Image: container.Image,
  319. StartTime: pod.Status.StartTime.String(),
  320. }
  321. containers = append(containers, c)
  322. }
  323. }
  324. }
  325. return containers, nil
  326. }
  327. func (a *Accesses) AddServiceKey(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  328. w.Header().Set("Content-Type", "application/json")
  329. w.Header().Set("Access-Control-Allow-Origin", "*")
  330. r.ParseForm()
  331. key := r.PostForm.Get("key")
  332. k := []byte(key)
  333. err := os.WriteFile(env.GetGCPAuthSecretFilePath(), k, 0644)
  334. if err != nil {
  335. fmt.Fprintf(w, "Error writing service key: %s", err)
  336. }
  337. w.WriteHeader(http.StatusOK)
  338. }
  339. func (a *Accesses) GetHelmValues(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  340. w.Header().Set("Content-Type", "application/json")
  341. w.Header().Set("Access-Control-Allow-Origin", "*")
  342. encodedValues := sysenv.Get("HELM_VALUES", "")
  343. if encodedValues == "" {
  344. fmt.Fprintf(w, "Values reporting disabled")
  345. return
  346. }
  347. result, err := base64.StdEncoding.DecodeString(encodedValues)
  348. if err != nil {
  349. fmt.Fprintf(w, "Failed to decode encoded values: %s", err)
  350. return
  351. }
  352. w.Write(result)
  353. }
  354. func Initialize(router *httprouter.Router, additionalConfigWatchers ...*watcher.ConfigMapWatcher) *Accesses {
  355. var err error
  356. // Kubernetes API setup
  357. kubeClientset, err := kubeconfig.LoadKubeClient("")
  358. if err != nil {
  359. log.Fatalf("Failed to build Kubernetes client: %s", err.Error())
  360. }
  361. clusterUID, err := kubeconfig.GetClusterUID(kubeClientset)
  362. if err != nil {
  363. log.Fatalf("Failed to determine cluster UID: %s", err)
  364. }
  365. // Create Kubernetes Cluster Cache + Watchers
  366. k8sCache := clusterc.NewKubernetesClusterCache(kubeClientset)
  367. k8sCache.Run()
  368. // Create ConfigFileManager for synchronization of shared configuration
  369. confManager := config.NewConfigFileManager(nil)
  370. cloudProviderKey := env.GetCloudProviderAPIKey()
  371. cloudProvider, err := provider.NewProvider(k8sCache, cloudProviderKey, confManager)
  372. if err != nil {
  373. panic(err.Error())
  374. }
  375. // ClusterInfo Provider to provide the cluster map with local and remote cluster data
  376. var clusterInfoProvider clusters.ClusterInfoProvider
  377. if env.IsClusterInfoFileEnabled() {
  378. clusterInfoFile := confManager.ConfigFileAt(env.GetClusterInfoFilePath())
  379. clusterInfoProvider = NewConfiguredClusterInfoProvider(clusterInfoFile)
  380. } else {
  381. clusterInfoProvider = NewLocalClusterInfoProvider(kubeClientset, cloudProvider)
  382. }
  383. const maxRetries = 10
  384. const retryInterval = 10 * time.Second
  385. var fatalErr error
  386. ctx, cancel := context.WithCancel(context.Background())
  387. fn := func() (source.OpenCostDataSource, error) {
  388. ds, e := prom.NewDefaultPrometheusDataSource(clusterInfoProvider)
  389. if e != nil {
  390. if source.IsRetryable(e) {
  391. return nil, e
  392. }
  393. fatalErr = e
  394. cancel()
  395. }
  396. return ds, e
  397. }
  398. if env.IsCollectorDataSourceEnabled() {
  399. fn = func() (source.OpenCostDataSource, error) {
  400. store := GetDefaultCollectorStorage()
  401. nodeStatConf, err := NewNodeClientConfigFromEnv()
  402. if err != nil {
  403. return nil, fmt.Errorf("failed to get node client config: %w", err)
  404. }
  405. clusterConfig, err := kubeconfig.LoadKubeconfig("")
  406. if err != nil {
  407. return nil, fmt.Errorf("failed to load kube config: %w", err)
  408. }
  409. nodeStatClient := nodestats.NewNodeStatsSummaryClient(k8sCache, nodeStatConf, clusterConfig)
  410. ds := collector.NewDefaultCollectorDataSource(
  411. clusterUID,
  412. store,
  413. clusterInfoProvider,
  414. k8sCache,
  415. nodeStatClient,
  416. )
  417. return ds, nil
  418. }
  419. }
  420. dataSource, _ := retry.Retry(
  421. ctx,
  422. fn,
  423. maxRetries,
  424. retryInterval,
  425. )
  426. if fatalErr != nil {
  427. log.Fatalf("Failed to create Prometheus data source: %s", fatalErr)
  428. panic(fatalErr)
  429. }
  430. // Append the pricing config watcher
  431. installNamespace := env.GetOpencostNamespace()
  432. configWatchers := watcher.NewConfigMapWatchers(kubeClientset, installNamespace, additionalConfigWatchers...)
  433. configWatchers.AddWatcher(provider.ConfigWatcherFor(cloudProvider))
  434. configWatchers.AddWatcher(metrics.GetMetricsConfigWatcher())
  435. configWatchers.Watch()
  436. clusterMap := dataSource.ClusterMap()
  437. settingsCache := cache.New(cache.NoExpiration, cache.NoExpiration)
  438. costModel := NewCostModel(clusterUID, dataSource, cloudProvider, k8sCache, clusterMap, dataSource.BatchDuration())
  439. metricsEmitter := NewCostModelMetricsEmitter(k8sCache, cloudProvider, clusterInfoProvider, costModel)
  440. a := &Accesses{
  441. DataSource: dataSource,
  442. KubeClientSet: kubeClientset,
  443. ClusterCache: k8sCache,
  444. ClusterMap: clusterMap,
  445. CloudProvider: cloudProvider,
  446. ConfigFileManager: confManager,
  447. ClusterInfoProvider: clusterInfoProvider,
  448. Model: costModel,
  449. MetricsEmitter: metricsEmitter,
  450. SettingsCache: settingsCache,
  451. }
  452. // Initialize mechanism for subscribing to settings changes
  453. a.InitializeSettingsPubSub()
  454. err = a.CloudProvider.DownloadPricingData()
  455. if err != nil {
  456. log.Infof("Failed to download pricing data: %s", err)
  457. }
  458. if !env.IsKubecostMetricsPodEnabled() {
  459. a.MetricsEmitter.Start()
  460. }
  461. a.DataSource.RegisterEndPoints(router)
  462. router.GET("/costDataModel", a.CostDataModel)
  463. router.GET("/allocation/compute", a.ComputeAllocationHandler)
  464. router.GET("/allocation/compute/summary", a.ComputeAllocationHandlerSummary)
  465. router.GET("/allNodePricing", a.GetAllNodePricing)
  466. router.POST("/refreshPricing", a.RefreshPricingData)
  467. router.GET("/managementPlatform", a.ManagementPlatform)
  468. router.GET("/clusterInfo", a.ClusterInfo)
  469. router.GET("/clusterInfoMap", a.GetClusterInfoMap)
  470. router.GET("/serviceAccountStatus", a.GetServiceAccountStatus)
  471. router.GET("/pricingSourceStatus", a.GetPricingSourceStatus)
  472. router.GET("/pricingSourceSummary", a.GetPricingSourceSummary)
  473. router.GET("/pricingSourceCounts", a.GetPricingSourceCounts)
  474. router.GET("/orphanedPods", a.GetOrphanedPods)
  475. router.GET("/installNamespace", a.GetInstallNamespace)
  476. router.GET("/installInfo", a.GetInstallInfo)
  477. router.POST("/serviceKey", adminAuthMiddleware(a.AddServiceKey))
  478. router.GET("/helmValues", a.GetHelmValues)
  479. return a
  480. }
  481. // GetDefaultStorage retrieves the default shared storage which is required for running an opencost collector.
  482. func GetDefaultCollectorStorage() storage.Storage {
  483. const warningMessage = `Failed to create local collector directory '%s' - %s.
  484. Did you mean to enable to collector? For persistent storage, it's recommended to use Prometheus,
  485. or set a storage bucket configuration at %s.
  486. %s`
  487. // Try bucket storage if it exists
  488. store, err := storage.TryGetDefaultStorage()
  489. if err == nil {
  490. return store
  491. }
  492. // Fallback to a local storage bucket
  493. dir := env.GetLocalCollectorDirectory()
  494. err = os.MkdirAll(dir, os.ModePerm)
  495. if err != nil {
  496. log.Warnf(
  497. warningMessage,
  498. dir,
  499. err.Error(),
  500. sysenv.GetDefaultStorageConfigFilePath(),
  501. "Falling back to an in-memory file system for collector, which will lose any persistent storage upon restart.",
  502. )
  503. return storage.NewMemoryStorage()
  504. }
  505. return storage.NewFileStorage(dir)
  506. }
  507. // InitializeCloudCost Initializes Cloud Cost pipeline and querier and registers endpoints
  508. func InitializeCloudCost(router *httprouter.Router) *cloudcost.PipelineService {
  509. log.Debugf("Cloud Cost config path: %s", env.GetCloudCostConfigPath())
  510. cloudConfigController := cloudconfig.NewMemoryController(nil)
  511. repo := cloudcost.NewMemoryRepository()
  512. cloudCostPipelineService := cloudcost.NewPipelineService(repo, cloudConfigController, cloudcost.DefaultIngestorConfiguration())
  513. repoQuerier := cloudcost.NewRepositoryQuerier(repo)
  514. cloudCostQueryService := cloudcost.NewQueryService(repoQuerier, repoQuerier)
  515. router.GET("/cloudCost", cloudCostQueryService.GetCloudCostHandler())
  516. router.GET("/cloudCost/view/graph", cloudCostQueryService.GetCloudCostViewGraphHandler())
  517. router.GET("/cloudCost/view/totals", cloudCostQueryService.GetCloudCostViewTotalsHandler())
  518. router.GET("/cloudCost/view/table", cloudCostQueryService.GetCloudCostViewTableHandler(nil))
  519. router.GET("/cloudCost/status", cloudCostPipelineService.GetCloudCostStatusHandler())
  520. router.GET("/cloudCost/rebuild", adminAuthMiddleware(cloudCostPipelineService.GetCloudCostRebuildHandler()))
  521. router.GET("/cloudCost/repair", adminAuthMiddleware(cloudCostPipelineService.GetCloudCostRepairHandler()))
  522. router.GET("/cloud/config/export", adminAuthMiddleware(cloudConfigController.GetExportConfigHandler()))
  523. router.GET("/cloud/config/enable", adminAuthMiddleware(cloudConfigController.GetEnableConfigHandler()))
  524. router.GET("/cloud/config/disable", adminAuthMiddleware(cloudConfigController.GetDisableConfigHandler()))
  525. router.GET("/cloud/config/delete", adminAuthMiddleware(cloudConfigController.GetDeleteConfigHandler()))
  526. return cloudCostPipelineService
  527. }
  528. func InitializeCustomCost(router *httprouter.Router) *customcost.PipelineService {
  529. hourlyRepo := customcost.NewMemoryRepository()
  530. dailyRepo := customcost.NewMemoryRepository()
  531. ingConfig := customcost.DefaultIngestorConfiguration()
  532. var err error
  533. customCostPipelineService, err := customcost.NewPipelineService(hourlyRepo, dailyRepo, ingConfig)
  534. if err != nil {
  535. log.Errorf("error instantiating custom cost pipeline service: %v", err)
  536. return nil
  537. }
  538. customCostQuerier := customcost.NewRepositoryQuerier(hourlyRepo, dailyRepo, ingConfig.HourlyDuration, ingConfig.DailyDuration)
  539. customCostQueryService := customcost.NewQueryService(customCostQuerier)
  540. router.GET("/customCost/total", customCostQueryService.GetCustomCostTotalHandler())
  541. router.GET("/customCost/timeseries", customCostQueryService.GetCustomCostTimeseriesHandler())
  542. return customCostPipelineService
  543. }