pipeline.go 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. package kubemodel
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/opencost/opencost/core/pkg/env"
  6. ocexporter "github.com/opencost/opencost/core/pkg/opencost/exporter"
  7. "github.com/opencost/opencost/core/pkg/storage"
  8. "github.com/opencost/opencost/core/pkg/util/timeutil"
  9. )
  10. var (
  11. exportInterval = 5 * time.Minute
  12. janitorInterval = timeutil.Day
  13. )
  14. // Pipeline manages the KubeModel export controller group and the retention janitor.
  15. type Pipeline struct {
  16. controllers *ocexporter.PipelineExportControllers
  17. janitor *Janitor
  18. }
  19. // NewPipeline creates a new pipeline with preset settings
  20. func NewPipeline(appName, clusterUID string, store storage.Storage, cm ocexporter.ComputePipelineSource) (*Pipeline, error) {
  21. if store == nil {
  22. return nil, fmt.Errorf("NewPipeline: store cannot be nil")
  23. }
  24. if clusterUID == "" {
  25. return nil, fmt.Errorf("NewPipeline: clusterUID cannot be empty")
  26. }
  27. config := ocexporter.NewPipelinesExportConfig(appName, clusterUID, "", false, env.GetExportKubeModel())
  28. controllers := ocexporter.NewPipelineExportControllers(store, cm, config)
  29. return &Pipeline{
  30. controllers: controllers,
  31. janitor: NewJanitor(store, appName, clusterUID, config.KubeModelPipelineResolutions),
  32. }, nil
  33. }
  34. // Start launches the export controllers and the retention janitor.
  35. func (p *Pipeline) Start() {
  36. p.controllers.Start(exportInterval)
  37. p.janitor.Start(janitorInterval)
  38. }
  39. // Stop halts the export controllers and the retention janitor.
  40. func (p *Pipeline) Stop() {
  41. p.controllers.Stop()
  42. p.janitor.Stop()
  43. }