pipeline.go 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. package kubemodel
  2. import (
  3. "fmt"
  4. "time"
  5. ocexporter "github.com/opencost/opencost/core/pkg/opencost/exporter"
  6. "github.com/opencost/opencost/core/pkg/storage"
  7. "github.com/opencost/opencost/core/pkg/util/timeutil"
  8. )
  9. var (
  10. exportInterval = 5 * time.Minute
  11. janitorInterval = timeutil.Day
  12. defaultResolutions = []time.Duration{time.Hour, timeutil.Day}
  13. )
  14. // Pipeline manages the KubeModel export controller group and the retention janitor.
  15. type Pipeline struct {
  16. controllers *ocexporter.PipelineExportControllers
  17. janitor *Janitor
  18. }
  19. // NewPipeline creates a new pipeline with preset settings
  20. func NewPipeline(appName, clusterUID string, store storage.Storage, cm ocexporter.ComputePipelineSource) (*Pipeline, error) {
  21. if store == nil {
  22. return nil, fmt.Errorf("NewPipeline: store cannot be nil")
  23. }
  24. if clusterUID == "" {
  25. return nil, fmt.Errorf("NewPipeline: clusterUID cannot be empty")
  26. }
  27. config := ocexporter.PipelinesExportConfig{
  28. AppName: appName,
  29. ClusterUID: clusterUID,
  30. KubeModelPipelineResolutions: defaultResolutions,
  31. }
  32. controllers := ocexporter.NewPipelineExportControllers(store, cm, config)
  33. return &Pipeline{
  34. controllers: controllers,
  35. janitor: NewJanitor(store, appName, clusterUID, config.KubeModelPipelineResolutions),
  36. }, nil
  37. }
  38. // Start launches the export controllers and the retention janitor.
  39. func (p *Pipeline) Start() {
  40. p.controllers.Start(exportInterval)
  41. p.janitor.Start(janitorInterval)
  42. }
  43. // Stop halts the export controllers and the retention janitor.
  44. func (p *Pipeline) Stop() {
  45. p.controllers.Stop()
  46. p.janitor.Stop()
  47. }