pipelineservice.go 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. package cloudcost
  2. import (
  3. "fmt"
  4. "net/http"
  5. "time"
  6. "github.com/julienschmidt/httprouter"
  7. "github.com/opencost/opencost/core/pkg/opencost"
  8. proto "github.com/opencost/opencost/core/pkg/protocol"
  9. cloudconfig "github.com/opencost/opencost/pkg/cloud"
  10. "github.com/opencost/opencost/pkg/cloud/config"
  11. "github.com/opencost/opencost/pkg/env"
  12. )
  13. var protocol = proto.HTTP()
  14. // PipelineService exposes CloudCost pipeline controls and diagnostics endpoints
  15. type PipelineService struct {
  16. ingestionManager *IngestionManager
  17. store Repository
  18. configController *config.Controller
  19. }
  20. // NewPipelineService is a constructor for a PipelineService
  21. func NewPipelineService(repo Repository, ic *config.Controller, ingConf IngestorConfig) *PipelineService {
  22. im := NewIngestionManager(ic, repo, ingConf)
  23. return &PipelineService{
  24. ingestionManager: im,
  25. store: repo,
  26. configController: ic,
  27. }
  28. }
  29. // Status merges status values from the config.Controller and the IngestionManager to give a combined view of that state
  30. // of configs and their ingestion status
  31. func (dp *PipelineService) Status() []Status {
  32. // Pull config status from the config controller
  33. confStatuses := dp.configController.GetStatus()
  34. statuses := make([]Status, 0, len(confStatuses))
  35. refreshRate := time.Hour * time.Duration(env.GetCloudCostRefreshRateHours())
  36. for _, confStat := range confStatuses {
  37. var conf cloudconfig.Config
  38. var provider string
  39. if confStat.Config != nil {
  40. conf = confStat.Config.Sanitize()
  41. provider = confStat.Config.Provider()
  42. }
  43. var ingestorStatus IngestorStatus
  44. if ing, ok := dp.ingestionManager.ingestors[confStat.Key]; ok {
  45. ingestorStatus = ing.Status()
  46. }
  47. // These are the statuses
  48. status := Status{
  49. Key: confStat.Key,
  50. Source: confStat.Source.String(),
  51. Active: confStat.Active,
  52. Valid: confStat.Valid,
  53. Config: conf,
  54. Provider: provider,
  55. ConnectionStatus: ingestorStatus.ConnectionStatus.String(),
  56. LastRun: ingestorStatus.LastRun,
  57. NextRun: ingestorStatus.NextRun,
  58. Runs: ingestorStatus.Runs,
  59. Created: ingestorStatus.Created,
  60. Coverage: ingestorStatus.Coverage.String(),
  61. RefreshRate: refreshRate.String(),
  62. }
  63. statuses = append(statuses, status)
  64. }
  65. return statuses
  66. }
  67. // GetCloudCostRebuildHandler creates a handler from a http request which initiates a rebuild of cloud cost pipeline, if an
  68. // integrationKey is provided then it only rebuilds the specified billing integration
  69. func (s *PipelineService) GetCloudCostRebuildHandler() func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  70. // If Reporting Service is nil, always return 501
  71. if s == nil {
  72. return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  73. http.Error(w, "Cloud Cost Pipeline Service is nil", http.StatusNotImplemented)
  74. }
  75. }
  76. if s.ingestionManager == nil {
  77. return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  78. http.Error(w, "Cloud Cost Pipeline Service Ingestion Manager is nil", http.StatusNotImplemented)
  79. }
  80. }
  81. // Return valid handler func
  82. return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  83. w.Header().Set("Content-Type", "application/json")
  84. commit := r.URL.Query().Get("commit") == "true" || r.URL.Query().Get("commit") == "1"
  85. if !commit {
  86. protocol.WriteData(w, "Pass parameter 'commit=true' to confirm Cloud Cost rebuild")
  87. return
  88. }
  89. integrationKey := r.URL.Query().Get("integrationKey")
  90. // If no providerKey argument was provider, restart all Cloud Asset Pipelines
  91. if integrationKey == "" {
  92. s.ingestionManager.RebuildAll()
  93. protocol.WriteData(w, "Rebuilding Cloud Usage For All Providers")
  94. return
  95. } else {
  96. err := s.ingestionManager.Rebuild(integrationKey)
  97. if err != nil {
  98. http.Error(w, err.Error(), http.StatusBadRequest)
  99. return
  100. }
  101. protocol.WriteData(w, fmt.Sprintf("Rebuilding Cloud Usage For Provider %s", integrationKey))
  102. return
  103. }
  104. }
  105. }
  106. // GetCloudCostRepairHandler creates a handler from a http request which initiates a repair of cloud cost for a given window, if an
  107. // integrationKey is provided then it only repairs the specified integration
  108. func (s *PipelineService) GetCloudCostRepairHandler() func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  109. // If Reporting Service is nil, always return 501
  110. if s == nil {
  111. return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  112. http.Error(w, "Reporting Service is nil", http.StatusNotImplemented)
  113. }
  114. }
  115. if s.ingestionManager == nil {
  116. return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  117. http.Error(w, "Cloud Cost Pipeline Service Ingestion Manager is nil", http.StatusNotImplemented)
  118. }
  119. }
  120. // Return valid handler func
  121. return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  122. w.Header().Set("Content-Type", "application/json")
  123. windowStr := r.URL.Query().Get("window")
  124. var window opencost.Window
  125. if windowStr != "" {
  126. win, err := opencost.ParseWindowUTC(windowStr)
  127. if err != nil {
  128. http.Error(w, fmt.Sprintf("Invalid parameter: %s", err), http.StatusBadRequest)
  129. return
  130. }
  131. window = win
  132. }
  133. integrationKey := r.URL.Query().Get("integrationKey")
  134. // If no providerKey argument was provider, restart all Cloud Asset Pipelines
  135. if integrationKey == "" {
  136. err := s.ingestionManager.RepairAll(*window.Start(), *window.End())
  137. if err != nil {
  138. http.Error(w, err.Error(), http.StatusBadRequest)
  139. return
  140. }
  141. protocol.WriteData(w, "Rebuilding Cloud Usage For All Providers")
  142. return
  143. } else {
  144. err := s.ingestionManager.Repair(integrationKey, *window.Start(), *window.End())
  145. if err != nil {
  146. http.Error(w, err.Error(), http.StatusBadRequest)
  147. return
  148. }
  149. protocol.WriteData(w, fmt.Sprintf("Rebuilding Cloud Usage For Provider %s", integrationKey))
  150. return
  151. }
  152. }
  153. }
  154. // GetCloudCostStatusHandler creates a handler from a http request which returns a list of the billing integration status
  155. func (s *PipelineService) GetCloudCostStatusHandler() func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  156. // If Reporting Service is nil, always return 501
  157. if s == nil {
  158. return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  159. http.Error(w, "Reporting Service is nil", http.StatusNotImplemented)
  160. }
  161. }
  162. if s.ingestionManager == nil {
  163. return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  164. http.Error(w, "Cloud Cost Pipeline Service Ingestion Manager is nil", http.StatusNotImplemented)
  165. }
  166. }
  167. // Return valid handler func
  168. return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  169. w.Header().Set("Content-Type", "application/json")
  170. protocol.WriteData(w, s.Status())
  171. }
  172. }