janitor.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. package kubemodel
  2. import (
  3. "path"
  4. "strings"
  5. "sync/atomic"
  6. "time"
  7. coreenv "github.com/opencost/opencost/core/pkg/env"
  8. "github.com/opencost/opencost/core/pkg/exporter/pathing"
  9. "github.com/opencost/opencost/core/pkg/log"
  10. "github.com/opencost/opencost/core/pkg/pipelines"
  11. "github.com/opencost/opencost/core/pkg/storage"
  12. "github.com/opencost/opencost/core/pkg/util/timeutil"
  13. )
  14. const (
  15. janitorDefault1dRetention = 30 // days
  16. janitorDefault1hRetention = 49 // hours
  17. janitorDefault10mRetention = 36 // 10-minute segments (6 hours)
  18. )
  19. // Janitor removes KubeModelSet files from storage that exceed the retention period for each resolution.
  20. // Retention durations are read from the standard RESOLUTION_*_RETENTION env vars.
  21. type Janitor struct {
  22. store storage.Storage
  23. appName string
  24. clusterId string
  25. resolutions []time.Duration
  26. isRunning atomic.Bool
  27. isStopping atomic.Bool
  28. exitCh chan struct{}
  29. }
  30. // NewJanitor creates a Janitor for the given storage backend, cluster, and active resolutions.
  31. func NewJanitor(store storage.Storage, appName, clusterId string, resolutions []time.Duration) *Janitor {
  32. return &Janitor{
  33. store: store,
  34. appName: appName,
  35. clusterId: clusterId,
  36. resolutions: resolutions,
  37. }
  38. }
  39. // Start launches the background retention cleanup loop. No-op if already running.
  40. func (j *Janitor) Start(interval time.Duration) {
  41. if !j.isRunning.CompareAndSwap(false, true) {
  42. return
  43. }
  44. j.exitCh = make(chan struct{})
  45. go func() {
  46. ticker := time.NewTicker(interval)
  47. defer ticker.Stop()
  48. for {
  49. select {
  50. case <-j.exitCh:
  51. j.isRunning.Store(false)
  52. j.isStopping.Store(false)
  53. return
  54. case <-ticker.C:
  55. j.cleanup()
  56. }
  57. }
  58. }()
  59. }
  60. // Stop halts the cleanup loop.
  61. func (j *Janitor) Stop() {
  62. if !j.isStopping.CompareAndSwap(false, true) {
  63. return
  64. }
  65. close(j.exitCh)
  66. }
  67. func (j *Janitor) cleanup() {
  68. for _, res := range j.resolutions {
  69. retention := retentionFor(res)
  70. if retention == 0 {
  71. continue
  72. }
  73. resStr := timeutil.FormatStoreResolution(res)
  74. baseDir := path.Join(j.appName, j.clusterId, pipelines.KubeModelPipelineName, resStr)
  75. cutoff := time.Now().UTC().Truncate(res).Add(-retention)
  76. j.pruneResolution(baseDir, cutoff)
  77. }
  78. }
  79. // retentionFor returns the total retention duration for a given export resolution, using the
  80. // standard RESOLUTION_*_RETENTION env vars (without a prefix, so the same values apply across pipelines).
  81. func retentionFor(res time.Duration) time.Duration {
  82. switch {
  83. case res >= timeutil.Day:
  84. n := coreenv.GetInt(coreenv.Resolution1dRetentionEnvVar, janitorDefault1dRetention)
  85. return timeutil.Day * time.Duration(n)
  86. case res >= time.Hour:
  87. n := coreenv.GetInt(coreenv.Resolution1hRetentionEnvVar, janitorDefault1hRetention)
  88. return time.Hour * time.Duration(n)
  89. default:
  90. n := coreenv.GetInt(coreenv.Resolution10mRetentionEnvVar, janitorDefault10mRetention)
  91. return 10 * time.Minute * time.Duration(n)
  92. }
  93. }
  94. func (j *Janitor) pruneResolution(baseDir string, cutoff time.Time) {
  95. cutoffDay := cutoff.Truncate(timeutil.Day)
  96. years, err := j.store.ListDirectories(baseDir)
  97. if err != nil {
  98. if !storage.IsNotExist(err) {
  99. log.Errorf("KubeModel janitor: listing %s: %v", baseDir, err)
  100. }
  101. return
  102. }
  103. for _, yearInfo := range years {
  104. // ListDirectories returns the full path in Name with a trailing slash.
  105. yearDir := strings.TrimSuffix(yearInfo.Name, "/")
  106. months, err := j.store.ListDirectories(yearDir)
  107. if err != nil {
  108. log.Warnf("KubeModel janitor: listing %s: %v", yearDir, err)
  109. continue
  110. }
  111. for _, monthInfo := range months {
  112. monthDir := strings.TrimSuffix(monthInfo.Name, "/")
  113. days, err := j.store.ListDirectories(monthDir)
  114. if err != nil {
  115. log.Warnf("KubeModel janitor: listing %s: %v", monthDir, err)
  116. continue
  117. }
  118. for _, dayInfo := range days {
  119. dayDir := strings.TrimSuffix(dayInfo.Name, "/")
  120. dateStr := path.Base(yearDir) + "/" + path.Base(monthDir) + "/" + path.Base(dayDir)
  121. date, err := time.Parse("2006/01/02", dateStr)
  122. if err != nil {
  123. log.Warnf("KubeModel janitor: cannot parse date from %s: %v", dateStr, err)
  124. continue
  125. }
  126. if !date.After(cutoffDay) {
  127. j.cleanDay(dayDir, cutoff)
  128. }
  129. }
  130. }
  131. }
  132. }
  133. // cleanDay removes files in dayDir whose embedded timestamp is before cutoff.
  134. func (j *Janitor) cleanDay(dayDir string, cutoff time.Time) {
  135. files, err := j.store.List(dayDir)
  136. if err != nil {
  137. log.Warnf("KubeModel janitor: listing files in %s: %v", dayDir, err)
  138. return
  139. }
  140. for _, f := range files {
  141. ts, err := parseKubeModelFileTimestamp(f.Name)
  142. if err != nil {
  143. log.Warnf("KubeModel janitor: cannot parse timestamp from %s: %v", f.Name, err)
  144. continue
  145. }
  146. if ts.Before(cutoff) {
  147. filePath := path.Join(dayDir, f.Name)
  148. if err := j.store.Remove(filePath); err != nil {
  149. log.Warnf("KubeModel janitor: removing %s: %v", filePath, err)
  150. } else {
  151. log.Infof("KubeModel janitor: removed expired file %s", filePath)
  152. }
  153. }
  154. }
  155. }
  156. // parseKubeModelFileTimestamp extracts the start time from a KubeModel file name.
  157. // File names have the format <YYYYMMDDHHmmSS>.<ext> (or <prefix>.<YYYYMMDDHHmmSS>.<ext>
  158. // when a prefix is present, but the pipeline writes files without a prefix).
  159. func parseKubeModelFileTimestamp(name string) (time.Time, error) {
  160. fileParts := strings.Split(name, ".")
  161. return time.ParseInLocation(pathing.KubeModelStorageTimeFormat, fileParts[0], time.UTC)
  162. }