querier.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. package kubemodel
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/opencost/opencost/core/pkg/exporter"
  6. "github.com/opencost/opencost/core/pkg/exporter/pathing"
  7. coremodel "github.com/opencost/opencost/core/pkg/model/kubemodel"
  8. "github.com/opencost/opencost/core/pkg/opencost"
  9. "github.com/opencost/opencost/core/pkg/storage"
  10. "github.com/opencost/opencost/core/pkg/util/timeutil"
  11. )
  12. // supportedResolutions lists the resolutions written by the pipeline, in ascending order.
  13. var supportedResolutions = []time.Duration{time.Hour, timeutil.Day}
  14. // Querier is the query interface used by KubeModelHandler, allowing the
  15. // concrete *km.Querier to be swapped for a test double.
  16. type Querier interface {
  17. Query(opencost.Window) ([]*coremodel.KubeModelSet, error)
  18. }
  19. // Querier reads KubeModelSets written by the pipeline from storage.
  20. type querier struct {
  21. appName string
  22. clusterId string
  23. store storage.Storage
  24. }
  25. // NewQuerier creates a Querier backed by the given storage and cluster.
  26. func NewQuerier(appName, clusterId string, store storage.Storage) Querier {
  27. return &querier{store: store, appName: appName, clusterId: clusterId}
  28. }
  29. // Query returns KubeModelSets covering the given window split into resolution-sized sub-windows.
  30. // resolution is snapped to the nearest supported pipeline resolution (1h or 1d).
  31. // Sub-windows with no file in storage are silently skipped.
  32. func (q *querier) Query(window opencost.Window) ([]*coremodel.KubeModelSet, error) {
  33. if window.IsOpen() {
  34. return nil, fmt.Errorf("kubemodel querier: window must be closed")
  35. }
  36. res := snapResolution(window)
  37. resStr := timeutil.FormatStoreResolution(res)
  38. formatter, err := pathing.NewKubeModelStoragePathFormatter(q.appName, q.clusterId, resStr)
  39. if err != nil {
  40. return nil, fmt.Errorf("kubemodel querier: %w", err)
  41. }
  42. start := window.Start().Truncate(res)
  43. end := window.End().Truncate(res)
  44. subWindows, err := opencost.GetWindowsForQueryWindow(start, end, res)
  45. if err != nil {
  46. return nil, fmt.Errorf("kubemodel querier: splitting window: %w", err)
  47. }
  48. results := make([]*coremodel.KubeModelSet, 0, len(subWindows))
  49. for _, w := range subWindows {
  50. kms, err := q.readWindow(formatter, w)
  51. if err != nil {
  52. if storage.IsNotExist(err) {
  53. continue
  54. }
  55. return nil, fmt.Errorf("kubemodel querier: reading window %s: %w", w, err)
  56. }
  57. results = append(results, kms)
  58. }
  59. return results, nil
  60. }
  61. func (q *querier) readWindow(formatter pathing.StoragePathFormatter[opencost.Window], window opencost.Window) (*coremodel.KubeModelSet, error) {
  62. path := formatter.ToFullPath("", window, exporter.BingenExt)
  63. data, err := q.store.Read(path)
  64. if err != nil {
  65. return nil, err
  66. }
  67. kms := new(coremodel.KubeModelSet)
  68. if err := kms.UnmarshalBinary(data); err != nil {
  69. return nil, fmt.Errorf("decoding KubeModelSet: %w", err)
  70. }
  71. return kms, nil
  72. }
  73. // snapResolution returns the largest supported resolution that evenly divides
  74. // the window duration. Falls back to the smallest supported resolution if none
  75. // divides evenly.
  76. func snapResolution(window opencost.Window) time.Duration {
  77. dur := window.Duration()
  78. for i := len(supportedResolutions) - 1; i >= 0; i-- {
  79. if dur%supportedResolutions[i] == 0 {
  80. return supportedResolutions[i]
  81. }
  82. }
  83. return supportedResolutions[0]
  84. }