pipelineservice.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. package customcost
  2. import (
  3. "fmt"
  4. "net/http"
  5. "os"
  6. "os/exec"
  7. "path"
  8. "runtime"
  9. "strings"
  10. "time"
  11. "github.com/hashicorp/go-hclog"
  12. "github.com/hashicorp/go-plugin"
  13. "github.com/julienschmidt/httprouter"
  14. "github.com/opencost/opencost/core/pkg/log"
  15. ocplugin "github.com/opencost/opencost/core/pkg/plugin"
  16. proto "github.com/opencost/opencost/core/pkg/protocol"
  17. "github.com/opencost/opencost/core/pkg/util/timeutil"
  18. )
  19. var protocol = proto.HTTP()
  20. const execFmt = `%s/%s.ocplugin.%s.%s`
  21. // PipelineService exposes CustomCost pipeline controls and diagnostics endpoints
  22. type PipelineService struct {
  23. hourlyIngestor, dailyIngestor *CustomCostIngestor
  24. hourlyStore, dailyStore Repository
  25. domains []string
  26. }
  27. func getRegisteredPlugins(configDir string, execDir string) (map[string]*plugin.Client, error) {
  28. pluginNames := map[string]string{}
  29. // scan plugin config directory for all file names
  30. configFiles, err := os.ReadDir(configDir)
  31. if err != nil {
  32. log.Errorf("error reading files in directory %s: %v", configDir, err)
  33. }
  34. // list of plugins that we must run are the strings before _
  35. for _, file := range configFiles {
  36. // skip hidden files and directories
  37. if strings.HasPrefix(file.Name(), ".") || file.IsDir() {
  38. continue
  39. }
  40. log.Tracef("parsing config file name: %s", file.Name())
  41. fileParts := strings.Split(file.Name(), "_")
  42. if len(fileParts) != 2 || fileParts[1] == "_config.json" {
  43. return nil, fmt.Errorf("plugin config file name %s invalid. Config files must have the form <plugin name>_config.json", file.Name())
  44. }
  45. pluginNames[fileParts[0]] = path.Join(configDir, file.Name())
  46. }
  47. if len(pluginNames) == 0 {
  48. log.Infof("no plugins detected.")
  49. return nil, nil
  50. }
  51. log.Infof("requiring plugins matching your architecture: " + runtime.GOARCH)
  52. configs := map[string]*plugin.ClientConfig{}
  53. // set up the client config
  54. for name, config := range pluginNames {
  55. file := fmt.Sprintf(execFmt, execDir, name, runtime.GOOS, runtime.GOARCH)
  56. log.Debugf("looking for file: %s", file)
  57. if _, err := os.Stat(file); err != nil {
  58. msg := fmt.Sprintf("error reading executable for %s plugin. Plugin executables must be in %s and have name format <plugin name>.ocplugin.<os>.<opencost binary archtecture (arm64 or amd64)>", name, execDir)
  59. log.Errorf(msg)
  60. return nil, fmt.Errorf(msg)
  61. }
  62. var handshakeConfig = plugin.HandshakeConfig{
  63. ProtocolVersion: 1,
  64. MagicCookieKey: "PLUGIN_NAME",
  65. MagicCookieValue: name,
  66. }
  67. logger := hclog.New(&hclog.LoggerOptions{
  68. Name: "plugin[" + name + "]",
  69. Output: os.Stdout,
  70. Level: hclog.Debug,
  71. })
  72. // pluginMap is the map of plugins we can dispense.
  73. var pluginMap = map[string]plugin.Plugin{
  74. "CustomCostSource": &ocplugin.CustomCostPlugin{},
  75. }
  76. configs[name] = &plugin.ClientConfig{
  77. HandshakeConfig: handshakeConfig,
  78. Plugins: pluginMap,
  79. Cmd: exec.Command(fmt.Sprintf(execFmt, execDir, name, runtime.GOOS, runtime.GOARCH), config),
  80. Logger: logger,
  81. AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC},
  82. }
  83. }
  84. plugins := map[string]*plugin.Client{}
  85. for name, config := range configs {
  86. client := plugin.NewClient(config)
  87. // add the connected, initialized client to the ma
  88. plugins[name] = client
  89. }
  90. return plugins, nil
  91. }
  92. // NewPipelineService is a constructor for a PipelineService
  93. func NewPipelineService(hourlyrepo, dailyrepo Repository, ingConf CustomCostIngestorConfig) (*PipelineService, error) {
  94. registeredPlugins, err := getRegisteredPlugins(ingConf.PluginConfigDir, ingConf.PluginExecutableDir)
  95. if err != nil {
  96. log.Errorf("error getting registered plugins: %v", err)
  97. return nil, fmt.Errorf("error getting registered plugins: %v", err)
  98. }
  99. hourlyIngestor, err := NewCustomCostIngestor(&ingConf, hourlyrepo, registeredPlugins, time.Hour)
  100. if err != nil {
  101. return nil, err
  102. }
  103. hourlyIngestor.Start(false)
  104. dailyIngestor, err := NewCustomCostIngestor(&ingConf, dailyrepo, registeredPlugins, timeutil.Day)
  105. if err != nil {
  106. return nil, err
  107. }
  108. dailyIngestor.Start(false)
  109. var domains []string
  110. for domain, _ := range registeredPlugins {
  111. domains = append(domains, domain)
  112. }
  113. return &PipelineService{
  114. hourlyIngestor: hourlyIngestor,
  115. hourlyStore: hourlyrepo,
  116. dailyStore: dailyrepo,
  117. dailyIngestor: dailyIngestor,
  118. domains: domains,
  119. }, nil
  120. }
  121. // Status gives a combined view of the state of configs and the ingestor status
  122. func (dp *PipelineService) Status() Status {
  123. // Pull config status from the config controller
  124. ingstatusHourly := dp.hourlyIngestor.Status()
  125. // Pull config status from the config controller
  126. ingstatusDaily := dp.dailyIngestor.Status()
  127. // These are the statuses
  128. return Status{
  129. CoverageDaily: ingstatusDaily.Coverage,
  130. CoverageHourly: ingstatusHourly.Coverage,
  131. RefreshRateHourly: ingstatusHourly.RefreshRate.String(),
  132. RefreshRateDaily: ingstatusDaily.RefreshRate.String(),
  133. Domains: dp.domains,
  134. }
  135. }
  136. // GetCustomCostRebuildHandler creates a handler from a http request which initiates a rebuild of custom cost pipeline, if a
  137. // domain is provided then it only rebuilds the specified billing domain
  138. func (s *PipelineService) GetCustomCostRebuildHandler() func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  139. // If pipeline Service is nil, always return 501
  140. if s == nil {
  141. return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  142. http.Error(w, "Custom Cost Pipeline Service is nil", http.StatusNotImplemented)
  143. }
  144. }
  145. if s.dailyIngestor == nil || s.hourlyIngestor == nil {
  146. return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  147. http.Error(w, "Custom Cost Pipeline Service Ingestion Manager is nil", http.StatusNotImplemented)
  148. }
  149. }
  150. // Return valid handler func
  151. return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  152. w.Header().Set("Content-Type", "application/json")
  153. commit := r.URL.Query().Get("commit") == "true" || r.URL.Query().Get("commit") == "1"
  154. if !commit {
  155. protocol.WriteData(w, "Pass parameter 'commit=true' to confirm Custom Cost rebuild")
  156. return
  157. }
  158. domain := r.URL.Query().Get("domain")
  159. err := s.hourlyIngestor.Rebuild(domain)
  160. if err != nil {
  161. log.Errorf("error rebuilding hourly ingestor")
  162. http.Error(w, err.Error(), http.StatusBadRequest)
  163. return
  164. }
  165. err = s.dailyIngestor.Rebuild(domain)
  166. if err != nil {
  167. log.Errorf("error rebuilding daily ingestor")
  168. http.Error(w, err.Error(), http.StatusBadRequest)
  169. return
  170. }
  171. protocol.WriteData(w, fmt.Sprintf("Rebuilding Custom Cost For Domain %s", domain))
  172. }
  173. }
  174. // GetCustomCostStatusHandler creates a handler from a http request which returns the custom cost ingestor status
  175. func (s *PipelineService) GetCustomCostStatusHandler() func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  176. if s == nil {
  177. resultStatus := Status{
  178. Enabled: false,
  179. }
  180. return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  181. protocol.WriteData(w, resultStatus)
  182. }
  183. }
  184. if s.hourlyIngestor == nil || s.dailyIngestor == nil {
  185. return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  186. http.Error(w, "Custom Cost Pipeline Service Ingestor is nil", http.StatusNotImplemented)
  187. }
  188. }
  189. // Return valid handler func
  190. return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  191. w.Header().Set("Content-Type", "application/json")
  192. stat := s.Status()
  193. stat.Enabled = true
  194. protocol.WriteData(w, stat)
  195. }
  196. }