pipelineservice.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. package customcost
  2. import (
  3. "fmt"
  4. "net/http"
  5. "os"
  6. "os/exec"
  7. "runtime"
  8. "strings"
  9. "time"
  10. "github.com/hashicorp/go-hclog"
  11. "github.com/hashicorp/go-plugin"
  12. "github.com/julienschmidt/httprouter"
  13. "github.com/opencost/opencost/core/pkg/log"
  14. ocplugin "github.com/opencost/opencost/core/pkg/plugin"
  15. proto "github.com/opencost/opencost/core/pkg/protocol"
  16. "github.com/opencost/opencost/core/pkg/util/timeutil"
  17. "github.com/opencost/opencost/core/pkg/version"
  18. )
  19. var protocol = proto.HTTP()
  20. const execFmt = `%s/%s.ocplugin.%s.%s`
  21. // PipelineService exposes CloudCost pipeline controls and diagnostics endpoints
  22. type PipelineService struct {
  23. hourlyIngestor, dailyIngestor *CustomCostIngestor
  24. hourlyStore, dailyStore Repository
  25. }
  26. func getRegisteredPlugins(configDir string, execDir string) (map[string]*plugin.Client, error) {
  27. pluginNames := map[string]string{}
  28. // scan plugin config directory for all file names
  29. configFiles, err := os.ReadDir(configDir)
  30. if err != nil {
  31. log.Errorf("error reading files in directory %s: %v", configDir, err)
  32. }
  33. // list of plugins that we must run are the strings before _
  34. for _, file := range configFiles {
  35. log.Tracef("parsing config file name: %s", file.Name())
  36. fileParts := strings.Split(file.Name(), "_")
  37. if len(fileParts) != 2 || fileParts[1] == "_config.json" {
  38. return nil, fmt.Errorf("plugin config file name %s invalid. Config files must have the form <plugin name>_config.json", file.Name())
  39. }
  40. pluginNames[fileParts[0]] = configDir + "/" + file.Name()
  41. }
  42. if len(pluginNames) == 0 {
  43. log.Infof("no plugins detected.")
  44. return nil, nil
  45. }
  46. log.Infof("requiring plugins matching your architecture: " + version.Architecture)
  47. configs := map[string]*plugin.ClientConfig{}
  48. // set up the client config
  49. for name, config := range pluginNames {
  50. file := fmt.Sprintf(execFmt, execDir, name, runtime.GOOS, version.Architecture)
  51. log.Debugf("looking for file: %s", file)
  52. if _, err := os.Stat(file); err != nil {
  53. msg := fmt.Sprintf("error reading executable for %s plugin. Plugin executables must be in %s and have name format <plugin name>.ocplugin.<os>.<opencost binary archtecture (arm64 or amd64)>", name, execDir)
  54. log.Errorf(msg)
  55. return nil, fmt.Errorf(msg)
  56. }
  57. var handshakeConfig = plugin.HandshakeConfig{
  58. ProtocolVersion: 1,
  59. MagicCookieKey: "PLUGIN_NAME",
  60. MagicCookieValue: name,
  61. }
  62. logger := hclog.New(&hclog.LoggerOptions{
  63. Name: "plugin[" + name + "]",
  64. Output: os.Stdout,
  65. Level: hclog.Debug,
  66. })
  67. // pluginMap is the map of plugins we can dispense.
  68. var pluginMap = map[string]plugin.Plugin{
  69. "CustomCostSource": &ocplugin.CustomCostPlugin{},
  70. }
  71. configs[name] = &plugin.ClientConfig{
  72. HandshakeConfig: handshakeConfig,
  73. Plugins: pluginMap,
  74. Cmd: exec.Command(fmt.Sprintf(execFmt, execDir, name, runtime.GOOS, version.Architecture), config),
  75. Logger: logger,
  76. AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC},
  77. }
  78. }
  79. plugins := map[string]*plugin.Client{}
  80. for name, config := range configs {
  81. client := plugin.NewClient(config)
  82. // add the connected, initialized client to the ma
  83. plugins[name] = client
  84. }
  85. return plugins, nil
  86. }
  87. // NewPipelineService is a constructor for a PipelineService
  88. func NewPipelineService(hourlyrepo, dailyrepo Repository, ingConf CustomCostIngestorConfig) (*PipelineService, error) {
  89. registeredPlugins, err := getRegisteredPlugins(ingConf.PluginConfigDir, ingConf.PluginExecutableDir)
  90. if err != nil {
  91. log.Errorf("error getting registered plugins: %v", err)
  92. return nil, fmt.Errorf("error getting registered plugins: %v", err)
  93. }
  94. hourlyIngestor, err := NewCustomCostIngestor(&ingConf, hourlyrepo, registeredPlugins, time.Hour)
  95. if err != nil {
  96. return nil, err
  97. }
  98. hourlyIngestor.Start(false)
  99. dailyIngestor, err := NewCustomCostIngestor(&ingConf, dailyrepo, registeredPlugins, timeutil.Day)
  100. if err != nil {
  101. return nil, err
  102. }
  103. dailyIngestor.Start(false)
  104. return &PipelineService{
  105. hourlyIngestor: hourlyIngestor,
  106. hourlyStore: hourlyrepo,
  107. dailyStore: dailyrepo,
  108. dailyIngestor: dailyIngestor,
  109. }, nil
  110. }
  111. // Status gives a combined view of the state of configs and the ingestior status
  112. func (dp *PipelineService) Status() Status {
  113. // Pull config status from the config controller
  114. ingstatusHourly := dp.hourlyIngestor.Status()
  115. // Pull config status from the config controller
  116. ingstatusDaily := dp.dailyIngestor.Status()
  117. // These are the statuses
  118. return Status{
  119. CoverageDaily: ingstatusDaily.Coverage,
  120. CoverageHourly: ingstatusHourly.Coverage,
  121. RefreshRateHourly: ingstatusHourly.RefreshRate.String(),
  122. RefreshRateDaily: ingstatusDaily.RefreshRate.String(),
  123. }
  124. }
  125. // GetCustomCostRebuildHandler creates a handler from a http request which initiates a rebuild of custom cost pipeline, if a
  126. // domain is provided then it only rebuilds the specified billing domain
  127. func (s *PipelineService) GetCustomCostRebuildHandler() func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  128. // If pipeline Service is nil, always return 501
  129. if s == nil {
  130. return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  131. http.Error(w, "Custom Cost Pipeline Service is nil", http.StatusNotImplemented)
  132. }
  133. }
  134. if s.dailyIngestor == nil || s.hourlyIngestor == nil {
  135. return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  136. http.Error(w, "Custom Cost Pipeline Service Ingestion Manager is nil", http.StatusNotImplemented)
  137. }
  138. }
  139. // Return valid handler func
  140. return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  141. w.Header().Set("Content-Type", "application/json")
  142. commit := r.URL.Query().Get("commit") == "true" || r.URL.Query().Get("commit") == "1"
  143. if !commit {
  144. protocol.WriteData(w, "Pass parameter 'commit=true' to confirm Custom Cost rebuild")
  145. return
  146. }
  147. domain := r.URL.Query().Get("domain")
  148. err := s.hourlyIngestor.Rebuild(domain)
  149. if err != nil {
  150. log.Errorf("error rebuilding hourly ingestor")
  151. http.Error(w, err.Error(), http.StatusBadRequest)
  152. return
  153. }
  154. err = s.dailyIngestor.Rebuild(domain)
  155. if err != nil {
  156. log.Errorf("error rebuilding daily ingestor")
  157. http.Error(w, err.Error(), http.StatusBadRequest)
  158. return
  159. }
  160. protocol.WriteData(w, fmt.Sprintf("Rebuilding Custom Cost For Domain %s", domain))
  161. }
  162. }
  163. // GetCustomCostStatusHandler creates a handler from a http request which returns the custom cost ingestor status
  164. func (s *PipelineService) GetCustomCostStatusHandler() func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  165. // If Reporting Service is nil, always return 501
  166. if s == nil {
  167. return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  168. http.Error(w, "Custom cost pipeline Service is nil", http.StatusNotImplemented)
  169. }
  170. }
  171. if s.hourlyIngestor == nil || s.dailyIngestor == nil {
  172. return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  173. http.Error(w, "Custom Cost Pipeline Service Ingestor is nil", http.StatusNotImplemented)
  174. }
  175. }
  176. // Return valid handler func
  177. return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  178. w.Header().Set("Content-Type", "application/json")
  179. protocol.WriteData(w, s.Status())
  180. }
  181. }