pipelineservice.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. package customcost
  2. import (
  3. "fmt"
  4. "net/http"
  5. "os"
  6. "os/exec"
  7. "runtime"
  8. "strings"
  9. "time"
  10. "github.com/hashicorp/go-hclog"
  11. "github.com/hashicorp/go-plugin"
  12. "github.com/julienschmidt/httprouter"
  13. "github.com/opencost/opencost/core/pkg/log"
  14. ocplugin "github.com/opencost/opencost/core/pkg/plugin"
  15. proto "github.com/opencost/opencost/core/pkg/protocol"
  16. "github.com/opencost/opencost/core/pkg/util/timeutil"
  17. )
  18. var protocol = proto.HTTP()
  19. const execFmt = `%s/%s.ocplugin.%s.%s`
  20. // PipelineService exposes CustomCost pipeline controls and diagnostics endpoints
  21. type PipelineService struct {
  22. hourlyIngestor, dailyIngestor *CustomCostIngestor
  23. hourlyStore, dailyStore Repository
  24. }
  25. func getRegisteredPlugins(configDir string, execDir string) (map[string]*plugin.Client, error) {
  26. pluginNames := map[string]string{}
  27. // scan plugin config directory for all file names
  28. configFiles, err := os.ReadDir(configDir)
  29. if err != nil {
  30. log.Errorf("error reading files in directory %s: %v", configDir, err)
  31. }
  32. // list of plugins that we must run are the strings before _
  33. for _, file := range configFiles {
  34. log.Tracef("parsing config file name: %s", file.Name())
  35. fileParts := strings.Split(file.Name(), "_")
  36. if len(fileParts) != 2 || fileParts[1] == "_config.json" {
  37. return nil, fmt.Errorf("plugin config file name %s invalid. Config files must have the form <plugin name>_config.json", file.Name())
  38. }
  39. pluginNames[fileParts[0]] = configDir + "/" + file.Name()
  40. }
  41. if len(pluginNames) == 0 {
  42. log.Infof("no plugins detected.")
  43. return nil, nil
  44. }
  45. log.Infof("requiring plugins matching your architecture: " + runtime.GOARCH)
  46. configs := map[string]*plugin.ClientConfig{}
  47. // set up the client config
  48. for name, config := range pluginNames {
  49. file := fmt.Sprintf(execFmt, execDir, name, runtime.GOOS, runtime.GOARCH)
  50. log.Debugf("looking for file: %s", file)
  51. if _, err := os.Stat(file); err != nil {
  52. msg := fmt.Sprintf("error reading executable for %s plugin. Plugin executables must be in %s and have name format <plugin name>.ocplugin.<os>.<opencost binary archtecture (arm64 or amd64)>", name, execDir)
  53. log.Errorf(msg)
  54. return nil, fmt.Errorf(msg)
  55. }
  56. var handshakeConfig = plugin.HandshakeConfig{
  57. ProtocolVersion: 1,
  58. MagicCookieKey: "PLUGIN_NAME",
  59. MagicCookieValue: name,
  60. }
  61. logger := hclog.New(&hclog.LoggerOptions{
  62. Name: "plugin[" + name + "]",
  63. Output: os.Stdout,
  64. Level: hclog.Debug,
  65. })
  66. // pluginMap is the map of plugins we can dispense.
  67. var pluginMap = map[string]plugin.Plugin{
  68. "CustomCostSource": &ocplugin.CustomCostPlugin{},
  69. }
  70. configs[name] = &plugin.ClientConfig{
  71. HandshakeConfig: handshakeConfig,
  72. Plugins: pluginMap,
  73. Cmd: exec.Command(fmt.Sprintf(execFmt, execDir, name, runtime.GOOS, runtime.GOARCH), config),
  74. Logger: logger,
  75. AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC},
  76. }
  77. }
  78. plugins := map[string]*plugin.Client{}
  79. for name, config := range configs {
  80. client := plugin.NewClient(config)
  81. // add the connected, initialized client to the ma
  82. plugins[name] = client
  83. }
  84. return plugins, nil
  85. }
  86. // NewPipelineService is a constructor for a PipelineService
  87. func NewPipelineService(hourlyrepo, dailyrepo Repository, ingConf CustomCostIngestorConfig) (*PipelineService, error) {
  88. registeredPlugins, err := getRegisteredPlugins(ingConf.PluginConfigDir, ingConf.PluginExecutableDir)
  89. if err != nil {
  90. log.Errorf("error getting registered plugins: %v", err)
  91. return nil, fmt.Errorf("error getting registered plugins: %v", err)
  92. }
  93. hourlyIngestor, err := NewCustomCostIngestor(&ingConf, hourlyrepo, registeredPlugins, time.Hour)
  94. if err != nil {
  95. return nil, err
  96. }
  97. hourlyIngestor.Start(false)
  98. dailyIngestor, err := NewCustomCostIngestor(&ingConf, dailyrepo, registeredPlugins, timeutil.Day)
  99. if err != nil {
  100. return nil, err
  101. }
  102. dailyIngestor.Start(false)
  103. return &PipelineService{
  104. hourlyIngestor: hourlyIngestor,
  105. hourlyStore: hourlyrepo,
  106. dailyStore: dailyrepo,
  107. dailyIngestor: dailyIngestor,
  108. }, nil
  109. }
  110. // Status gives a combined view of the state of configs and the ingestior status
  111. func (dp *PipelineService) Status() Status {
  112. // Pull config status from the config controller
  113. ingstatusHourly := dp.hourlyIngestor.Status()
  114. // Pull config status from the config controller
  115. ingstatusDaily := dp.dailyIngestor.Status()
  116. // These are the statuses
  117. return Status{
  118. CoverageDaily: ingstatusDaily.Coverage,
  119. CoverageHourly: ingstatusHourly.Coverage,
  120. RefreshRateHourly: ingstatusHourly.RefreshRate.String(),
  121. RefreshRateDaily: ingstatusDaily.RefreshRate.String(),
  122. }
  123. }
  124. // GetCustomCostRebuildHandler creates a handler from a http request which initiates a rebuild of custom cost pipeline, if a
  125. // domain is provided then it only rebuilds the specified billing domain
  126. func (s *PipelineService) GetCustomCostRebuildHandler() func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  127. // If pipeline Service is nil, always return 501
  128. if s == nil {
  129. return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  130. http.Error(w, "Custom Cost Pipeline Service is nil", http.StatusNotImplemented)
  131. }
  132. }
  133. if s.dailyIngestor == nil || s.hourlyIngestor == nil {
  134. return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  135. http.Error(w, "Custom Cost Pipeline Service Ingestion Manager is nil", http.StatusNotImplemented)
  136. }
  137. }
  138. // Return valid handler func
  139. return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  140. w.Header().Set("Content-Type", "application/json")
  141. commit := r.URL.Query().Get("commit") == "true" || r.URL.Query().Get("commit") == "1"
  142. if !commit {
  143. protocol.WriteData(w, "Pass parameter 'commit=true' to confirm Custom Cost rebuild")
  144. return
  145. }
  146. domain := r.URL.Query().Get("domain")
  147. err := s.hourlyIngestor.Rebuild(domain)
  148. if err != nil {
  149. log.Errorf("error rebuilding hourly ingestor")
  150. http.Error(w, err.Error(), http.StatusBadRequest)
  151. return
  152. }
  153. err = s.dailyIngestor.Rebuild(domain)
  154. if err != nil {
  155. log.Errorf("error rebuilding daily ingestor")
  156. http.Error(w, err.Error(), http.StatusBadRequest)
  157. return
  158. }
  159. protocol.WriteData(w, fmt.Sprintf("Rebuilding Custom Cost For Domain %s", domain))
  160. }
  161. }
  162. // GetCustomCostStatusHandler creates a handler from a http request which returns the custom cost ingestor status
  163. func (s *PipelineService) GetCustomCostStatusHandler() func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  164. if s == nil {
  165. resultStatus := Status{
  166. Enabled: false,
  167. }
  168. return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  169. protocol.WriteData(w, resultStatus)
  170. }
  171. }
  172. if s.hourlyIngestor == nil || s.dailyIngestor == nil {
  173. return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  174. http.Error(w, "Custom Cost Pipeline Service Ingestor is nil", http.StatusNotImplemented)
  175. }
  176. }
  177. // Return valid handler func
  178. return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  179. w.Header().Set("Content-Type", "application/json")
  180. stat := s.Status()
  181. stat.Enabled = true
  182. protocol.WriteData(w, stat)
  183. }
  184. }