2
0

pipelineservice.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. package customcost
  2. import (
  3. "fmt"
  4. "net/http"
  5. "os"
  6. "os/exec"
  7. "path"
  8. "runtime"
  9. "strings"
  10. "time"
  11. "github.com/hashicorp/go-hclog"
  12. "github.com/hashicorp/go-plugin"
  13. "github.com/julienschmidt/httprouter"
  14. "github.com/opencost/opencost/core/pkg/log"
  15. ocplugin "github.com/opencost/opencost/core/pkg/plugin"
  16. proto "github.com/opencost/opencost/core/pkg/protocol"
  17. "github.com/opencost/opencost/core/pkg/util/timeutil"
  18. )
  19. var protocol = proto.HTTP()
  20. const execFmt = `%s/%s.ocplugin.%s.%s`
  21. // PipelineService exposes CustomCost pipeline controls and diagnostics endpoints
  22. type PipelineService struct {
  23. hourlyIngestor, dailyIngestor *CustomCostIngestor
  24. hourlyStore, dailyStore Repository
  25. domains []string
  26. }
  27. func getRegisteredPlugins(configDir string, execDir string) (map[string]*plugin.Client, error) {
  28. pluginNames := map[string]string{}
  29. // scan plugin config directory for all file names
  30. configFiles, err := os.ReadDir(configDir)
  31. if err != nil {
  32. log.Errorf("error reading files in directory %s: %v", configDir, err)
  33. return nil, fmt.Errorf("failed to read plugin config directory: %w", err)
  34. }
  35. // list of plugins that we must run are the strings before _
  36. for _, file := range configFiles {
  37. // skip hidden files and directories
  38. if strings.HasPrefix(file.Name(), ".") || file.IsDir() {
  39. continue
  40. }
  41. log.Tracef("parsing config file name: %s", file.Name())
  42. fileParts := strings.Split(file.Name(), "_")
  43. if len(fileParts) != 2 || fileParts[1] == "_config.json" {
  44. return nil, fmt.Errorf("plugin config file name %s invalid. Config files must have the form <plugin name>_config.json", file.Name())
  45. }
  46. pluginNames[fileParts[0]] = path.Join(configDir, file.Name())
  47. }
  48. if len(pluginNames) == 0 {
  49. log.Infof("no plugins detected.")
  50. return nil, nil
  51. }
  52. log.Infof("requiring plugins matching your architecture: " + runtime.GOARCH)
  53. configs := map[string]*plugin.ClientConfig{}
  54. // set up the client config
  55. for name, config := range pluginNames {
  56. file := fmt.Sprintf(execFmt, execDir, name, runtime.GOOS, runtime.GOARCH)
  57. log.Debugf("looking for file: %s", file)
  58. if _, err := os.Stat(file); err != nil {
  59. err := fmt.Errorf("error reading executable for %s plugin. Plugin executables must be in %s and have name format <plugin name>.ocplugin.<os>.<opencost binary archtecture (arm64 or amd64)>", name, execDir)
  60. log.Errorf("err: %v", err)
  61. return nil, err
  62. }
  63. var handshakeConfig = plugin.HandshakeConfig{
  64. ProtocolVersion: 1,
  65. MagicCookieKey: "PLUGIN_NAME",
  66. MagicCookieValue: name,
  67. }
  68. logger := hclog.New(&hclog.LoggerOptions{
  69. Name: "plugin[" + name + "]",
  70. Output: os.Stdout,
  71. Level: hclog.Debug,
  72. })
  73. // pluginMap is the map of plugins we can dispense.
  74. var pluginMap = map[string]plugin.Plugin{
  75. "CustomCostSource": &ocplugin.CustomCostPlugin{},
  76. }
  77. configs[name] = &plugin.ClientConfig{
  78. HandshakeConfig: handshakeConfig,
  79. Plugins: pluginMap,
  80. Cmd: exec.Command(fmt.Sprintf(execFmt, execDir, name, runtime.GOOS, runtime.GOARCH), config),
  81. Logger: logger,
  82. AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC},
  83. }
  84. }
  85. plugins := map[string]*plugin.Client{}
  86. for name, config := range configs {
  87. client := plugin.NewClient(config)
  88. // add the connected, initialized client to the ma
  89. plugins[name] = client
  90. }
  91. return plugins, nil
  92. }
  93. // NewPipelineService is a constructor for a PipelineService
  94. func NewPipelineService(hourlyrepo, dailyrepo Repository, ingConf CustomCostIngestorConfig) (*PipelineService, error) {
  95. registeredPlugins, err := getRegisteredPlugins(ingConf.PluginConfigDir, ingConf.PluginExecutableDir)
  96. if err != nil {
  97. log.Errorf("error getting registered plugins: %v", err)
  98. return nil, fmt.Errorf("error getting registered plugins: %v", err)
  99. }
  100. hourlyIngestor, err := NewCustomCostIngestor(&ingConf, hourlyrepo, registeredPlugins, time.Hour)
  101. if err != nil {
  102. return nil, err
  103. }
  104. hourlyIngestor.Start(false)
  105. dailyIngestor, err := NewCustomCostIngestor(&ingConf, dailyrepo, registeredPlugins, timeutil.Day)
  106. if err != nil {
  107. return nil, err
  108. }
  109. dailyIngestor.Start(false)
  110. var domains []string
  111. for domain := range registeredPlugins {
  112. domains = append(domains, domain)
  113. }
  114. return &PipelineService{
  115. hourlyIngestor: hourlyIngestor,
  116. hourlyStore: hourlyrepo,
  117. dailyStore: dailyrepo,
  118. dailyIngestor: dailyIngestor,
  119. domains: domains,
  120. }, nil
  121. }
  122. // Stop gracefully shuts down both hourly and daily ingestors.
  123. // Both ingestors may reference the same plugin clients, so Kill() may be invoked
  124. // multiple times per plugin, which is safe per the go-plugin library.
  125. func (ps *PipelineService) Stop() {
  126. if ps == nil {
  127. return
  128. }
  129. log.Infof("Shutting down CustomCost Pipeline Service")
  130. if ps.hourlyIngestor != nil {
  131. ps.hourlyIngestor.Stop()
  132. }
  133. if ps.dailyIngestor != nil {
  134. ps.dailyIngestor.Stop()
  135. }
  136. log.Infof("CustomCost Pipeline Service stopped successfully")
  137. }
  138. // Status gives a combined view of the state of configs and the ingestor status
  139. func (dp *PipelineService) Status() Status {
  140. // Pull config status from the config controller
  141. ingstatusHourly := dp.hourlyIngestor.Status()
  142. // Pull config status from the config controller
  143. ingstatusDaily := dp.dailyIngestor.Status()
  144. // These are the statuses
  145. return Status{
  146. CoverageDaily: ingstatusDaily.Coverage,
  147. CoverageHourly: ingstatusHourly.Coverage,
  148. RefreshRateHourly: ingstatusHourly.RefreshRate.String(),
  149. RefreshRateDaily: ingstatusDaily.RefreshRate.String(),
  150. Domains: dp.domains,
  151. }
  152. }
  153. // GetCustomCostRebuildHandler creates a handler from a http request which initiates a rebuild of custom cost pipeline, if a
  154. // domain is provided then it only rebuilds the specified billing domain
  155. func (s *PipelineService) GetCustomCostRebuildHandler() func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  156. // If pipeline Service is nil, always return 501
  157. if s == nil {
  158. return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  159. http.Error(w, "Custom Cost Pipeline Service is nil", http.StatusNotImplemented)
  160. }
  161. }
  162. if s.dailyIngestor == nil || s.hourlyIngestor == nil {
  163. return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  164. http.Error(w, "Custom Cost Pipeline Service Ingestion Manager is nil", http.StatusNotImplemented)
  165. }
  166. }
  167. // Return valid handler func
  168. return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  169. w.Header().Set("Content-Type", "application/json")
  170. commit := r.URL.Query().Get("commit") == "true" || r.URL.Query().Get("commit") == "1"
  171. if !commit {
  172. protocol.WriteData(w, "Pass parameter 'commit=true' to confirm Custom Cost rebuild")
  173. return
  174. }
  175. domain := r.URL.Query().Get("domain")
  176. err := s.hourlyIngestor.Rebuild(domain)
  177. if err != nil {
  178. log.Errorf("error rebuilding hourly ingestor")
  179. http.Error(w, err.Error(), http.StatusBadRequest)
  180. return
  181. }
  182. err = s.dailyIngestor.Rebuild(domain)
  183. if err != nil {
  184. log.Errorf("error rebuilding daily ingestor")
  185. http.Error(w, err.Error(), http.StatusBadRequest)
  186. return
  187. }
  188. protocol.WriteData(w, fmt.Sprintf("Rebuilding Custom Cost For Domain %s", domain))
  189. }
  190. }
  191. // GetCustomCostStatusHandler creates a handler from a http request which returns the custom cost ingestor status
  192. func (s *PipelineService) GetCustomCostStatusHandler() func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  193. if s == nil {
  194. resultStatus := Status{
  195. Enabled: false,
  196. }
  197. return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  198. protocol.WriteData(w, resultStatus)
  199. }
  200. }
  201. if s.hourlyIngestor == nil || s.dailyIngestor == nil {
  202. return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  203. http.Error(w, "Custom Cost Pipeline Service Ingestor is nil", http.StatusNotImplemented)
  204. }
  205. }
  206. // Return valid handler func
  207. return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  208. w.Header().Set("Content-Type", "application/json")
  209. stat := s.Status()
  210. stat.Enabled = true
  211. protocol.WriteData(w, stat)
  212. }
  213. }