| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100 |
- package kubemodel
- import (
- "fmt"
- "time"
- "github.com/opencost/opencost/core/pkg/exporter"
- "github.com/opencost/opencost/core/pkg/exporter/pathing"
- coremodel "github.com/opencost/opencost/core/pkg/model/kubemodel"
- "github.com/opencost/opencost/core/pkg/opencost"
- "github.com/opencost/opencost/core/pkg/storage"
- "github.com/opencost/opencost/core/pkg/util/timeutil"
- )
- // supportedResolutions lists the resolutions written by the pipeline, in ascending order.
- var supportedResolutions = []time.Duration{time.Hour, timeutil.Day}
- // Querier is the query interface used by KubeModelHandler, allowing the
- // concrete *km.Querier to be swapped for a test double.
- type Querier interface {
- Query(opencost.Window) ([]*coremodel.KubeModelSet, error)
- }
- // Querier reads KubeModelSets written by the pipeline from storage.
- type querier struct {
- appName string
- clusterId string
- store storage.Storage
- }
- // NewQuerier creates a Querier backed by the given storage and cluster.
- func NewQuerier(appName, clusterId string, store storage.Storage) Querier {
- return &querier{store: store, appName: appName, clusterId: clusterId}
- }
- // Query returns KubeModelSets covering the given window split into resolution-sized sub-windows.
- // resolution is snapped to the nearest supported pipeline resolution (1h or 1d).
- // Sub-windows with no file in storage are silently skipped.
- func (q *querier) Query(window opencost.Window) ([]*coremodel.KubeModelSet, error) {
- if window.IsOpen() {
- return nil, fmt.Errorf("kubemodel querier: window must be closed")
- }
- res := snapResolution(window)
- resStr := timeutil.FormatStoreResolution(res)
- formatter, err := pathing.NewKubeModelStoragePathFormatter(q.appName, q.clusterId, resStr)
- if err != nil {
- return nil, fmt.Errorf("kubemodel querier: %w", err)
- }
- start := window.Start().Truncate(res)
- end := window.End().Truncate(res)
- subWindows, err := opencost.GetWindowsForQueryWindow(start, end, res)
- if err != nil {
- return nil, fmt.Errorf("kubemodel querier: splitting window: %w", err)
- }
- results := make([]*coremodel.KubeModelSet, 0, len(subWindows))
- for _, w := range subWindows {
- kms, err := q.readWindow(formatter, w)
- if err != nil {
- if storage.IsNotExist(err) {
- continue
- }
- return nil, fmt.Errorf("kubemodel querier: reading window %s: %w", w, err)
- }
- results = append(results, kms)
- }
- return results, nil
- }
- func (q *querier) readWindow(formatter pathing.StoragePathFormatter[opencost.Window], window opencost.Window) (*coremodel.KubeModelSet, error) {
- path := formatter.ToFullPath("", window, exporter.BingenExt)
- data, err := q.store.Read(path)
- if err != nil {
- return nil, err
- }
- kms := new(coremodel.KubeModelSet)
- if err := kms.UnmarshalBinary(data); err != nil {
- return nil, fmt.Errorf("decoding KubeModelSet: %w", err)
- }
- return kms, nil
- }
- // snapResolution returns the largest supported resolution that evenly divides
- // the window duration. Falls back to the smallest supported resolution if none
- // divides evenly.
- func snapResolution(window opencost.Window) time.Duration {
- dur := window.Duration()
- for i := len(supportedResolutions) - 1; i >= 0; i-- {
- if dur%supportedResolutions[i] == 0 {
- return supportedResolutions[i]
- }
- }
- return supportedResolutions[0]
- }
|