repositoryquerier.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378
  1. package cloudcost
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "sort"
  7. "strings"
  8. "github.com/opencost/opencost/core/pkg/log"
  9. "github.com/opencost/opencost/core/pkg/opencost"
  10. )
  11. // ErrAutocompleteBadRequest indicates a client error in an autocomplete request.
  12. var ErrAutocompleteBadRequest = errors.New("autocomplete bad request")
  13. // IsAutocompleteBadRequest reports whether err is a client validation error.
  14. func IsAutocompleteBadRequest(err error) bool {
  15. return errors.Is(err, ErrAutocompleteBadRequest)
  16. }
  17. // RepositoryQuerier is an implementation of Querier and ViewQuerier which pulls directly from a Repository
  18. type RepositoryQuerier struct {
  19. repo Repository
  20. }
  21. func NewRepositoryQuerier(repo Repository) *RepositoryQuerier {
  22. return &RepositoryQuerier{repo: repo}
  23. }
  24. func (rq *RepositoryQuerier) Query(ctx context.Context, request QueryRequest) (*opencost.CloudCostSetRange, error) {
  25. repoKeys, err := rq.repo.Keys()
  26. if err != nil {
  27. return nil, fmt.Errorf("RepositoryQuerier: Query: failed to get list of keys from repository: %w", err)
  28. }
  29. // create filter
  30. compiler := opencost.NewCloudCostMatchCompiler()
  31. matcher, err := compiler.Compile(request.Filter)
  32. if err != nil {
  33. return nil, fmt.Errorf("RepositoryQuerier: Query: failed to compile filters: %w", err)
  34. }
  35. // Create a Cloud Cost Set Range in the resolution of the repository
  36. ccsr, err := opencost.NewCloudCostSetRange(request.Start, request.End, opencost.AccumulateOptionDay, "")
  37. if err != nil {
  38. return nil, fmt.Errorf("RepositoryQuerier: Query: failed to create Cloud Cost Set Range: %w", err)
  39. }
  40. for _, cloudCostSet := range ccsr.CloudCostSets {
  41. // Setting this values creates
  42. cloudCostSet.AggregationProperties = request.AggregateBy
  43. for _, key := range repoKeys {
  44. ccs, err := rq.repo.Get(cloudCostSet.Window.Start().UTC(), key)
  45. if err != nil {
  46. log.Errorf("RepositoryQuerier: Query: %s", err.Error())
  47. continue
  48. }
  49. if ccs == nil {
  50. continue
  51. }
  52. for _, cc := range ccs.CloudCosts {
  53. if matcher.Matches(cc) {
  54. cloudCostSet.Insert(cc)
  55. }
  56. }
  57. }
  58. }
  59. if request.Accumulate != opencost.AccumulateOptionNone {
  60. ccsr, err = ccsr.Accumulate(request.Accumulate)
  61. if err != nil {
  62. return nil, fmt.Errorf("RepositoryQuerier: Query: error accumulating: %w", err)
  63. }
  64. }
  65. return ccsr, nil
  66. }
  67. func (rq *RepositoryQuerier) QueryCloudCostAutocomplete(ctx context.Context, request CloudCostAutocompleteRequest) (*CloudCostAutocompleteResponse, error) {
  68. if request.Window.IsOpen() {
  69. return nil, fmt.Errorf("%w: invalid window for autocomplete query: %s", ErrAutocompleteBadRequest, request.Window.String())
  70. }
  71. limit := request.Limit
  72. if limit <= 0 {
  73. limit = DefaultAutocompleteResultLimit
  74. }
  75. if limit > MaxAutocompleteResultLimit {
  76. return nil, fmt.Errorf("%w: exceeded maximum autocomplete result limit of %d", ErrAutocompleteBadRequest, MaxAutocompleteResultLimit)
  77. }
  78. field, err := validateCloudCostAutocompleteField(request.Field)
  79. if err != nil {
  80. return nil, fmt.Errorf("%w: invalid field: %w", ErrAutocompleteBadRequest, err)
  81. }
  82. ccsr, err := rq.Query(ctx, QueryRequest{
  83. Start: *request.Window.Start(),
  84. End: *request.Window.End(),
  85. Accumulate: opencost.AccumulateOptionNone,
  86. Filter: request.Filter,
  87. })
  88. if err != nil {
  89. return nil, fmt.Errorf("QueryCloudCostAutocomplete: query failed: %w", err)
  90. }
  91. search := strings.ToLower(request.Search)
  92. results := map[string]struct{}{}
  93. for _, ccs := range ccsr.CloudCostSets {
  94. for _, cc := range ccs.CloudCosts {
  95. if cc == nil || cc.Properties == nil {
  96. continue
  97. }
  98. values := cloudCostAutocompleteValues(cc, field)
  99. for _, value := range values {
  100. if value == "" {
  101. continue
  102. }
  103. if search != "" && !strings.Contains(strings.ToLower(value), search) {
  104. continue
  105. }
  106. results[value] = struct{}{}
  107. }
  108. }
  109. }
  110. data := make([]string, 0, len(results))
  111. for result := range results {
  112. data = append(data, result)
  113. }
  114. sort.Strings(data)
  115. if len(data) > limit {
  116. data = data[:limit]
  117. }
  118. return &CloudCostAutocompleteResponse{Data: data}, nil
  119. }
  120. func validateCloudCostAutocompleteField(field string) (string, error) {
  121. if field == "" {
  122. return "", fmt.Errorf("field is required")
  123. }
  124. f := strings.ToLower(field)
  125. if f == "label" {
  126. return f, nil
  127. }
  128. if strings.HasPrefix(f, "label:") {
  129. _, labelKey, _ := strings.Cut(f, ":")
  130. return "label:" + labelKey, nil
  131. }
  132. property, err := opencost.ParseCloudCostProperty(field)
  133. if err != nil {
  134. return "", err
  135. }
  136. return string(property), nil
  137. }
  138. func cloudCostAutocompleteValues(cc *opencost.CloudCost, field string) []string {
  139. if field == "label" {
  140. keys := make([]string, 0, len(cc.Properties.Labels))
  141. for label := range cc.Properties.Labels {
  142. keys = append(keys, label)
  143. }
  144. return keys
  145. }
  146. if strings.HasPrefix(field, "label:") {
  147. labelName := strings.TrimPrefix(field, "label:")
  148. if value, ok := cloudCostLabelValueFold(cc.Properties.Labels, labelName); ok {
  149. return []string{value}
  150. }
  151. return nil
  152. }
  153. property, err := opencost.ParseCloudCostProperty(field)
  154. if err != nil {
  155. return nil
  156. }
  157. value, err := cc.StringProperty(string(property))
  158. if err != nil {
  159. return nil
  160. }
  161. return []string{value}
  162. }
  163. func cloudCostLabelValueFold(labels map[string]string, key string) (string, bool) {
  164. if v, ok := labels[key]; ok {
  165. return v, true
  166. }
  167. for k, v := range labels {
  168. if strings.EqualFold(k, key) {
  169. return v, true
  170. }
  171. }
  172. return "", false
  173. }
  174. func (rq *RepositoryQuerier) QueryViewGraph(ctx context.Context, request ViewQueryRequest) (ViewGraphData, error) {
  175. ccasr, err := rq.Query(ctx, request.QueryRequest)
  176. if err != nil {
  177. return nil, fmt.Errorf("QueryViewGraph: query failed: %w", err)
  178. }
  179. if ccasr.IsEmpty() {
  180. return make([]*ViewGraphDataSet, 0), nil
  181. }
  182. var sets ViewGraphData
  183. for _, ccas := range ccasr.CloudCostSets {
  184. items := make([]ViewGraphDataSetItem, 0)
  185. for key, cc := range ccas.CloudCosts {
  186. costMetric, err := cc.GetCostMetric(request.CostMetricName)
  187. if err != nil {
  188. return nil, fmt.Errorf("QueryViewGraph: failed to get cost metric: %w", err)
  189. }
  190. items = append(items, ViewGraphDataSetItem{
  191. Name: key,
  192. Value: costMetric.Cost,
  193. })
  194. }
  195. sort.SliceStable(items, func(i, j int) bool {
  196. return items[i].Value > items[j].Value
  197. })
  198. if len(items) > request.ChartItemsLength {
  199. otherItems := items[request.ChartItemsLength:]
  200. newItems := items[:request.ChartItemsLength]
  201. // Rename last item other and add all other values into it
  202. newItems[request.ChartItemsLength-1].Name = "Other"
  203. for _, item := range otherItems {
  204. newItems[request.ChartItemsLength-1].Value += item.Value
  205. }
  206. items = newItems
  207. }
  208. sets = append(sets, &ViewGraphDataSet{
  209. Start: *ccas.Window.Start(),
  210. End: *ccas.Window.End(),
  211. Items: items,
  212. })
  213. }
  214. return sets, nil
  215. }
  216. func (rq *RepositoryQuerier) QueryViewTotals(ctx context.Context, request ViewQueryRequest) (*ViewTotals, error) {
  217. ccasr, err := rq.Query(ctx, request.QueryRequest)
  218. if err != nil {
  219. return nil, fmt.Errorf("QueryViewTotals: query failed: %w", err)
  220. }
  221. acc, err := ccasr.AccumulateAll()
  222. if err != nil {
  223. return nil, fmt.Errorf("QueryViewTotals: accumulate failed: %w", err)
  224. }
  225. if acc.IsEmpty() {
  226. return nil, nil
  227. }
  228. count := len(acc.CloudCosts)
  229. total, err := acc.Aggregate([]string{})
  230. if err != nil {
  231. return nil, fmt.Errorf("QueryViewTotals: aggregate total failed: %w", err)
  232. }
  233. if total.IsEmpty() {
  234. return nil, fmt.Errorf("QueryViewTotals: missing total: %w", err)
  235. }
  236. if len(total.CloudCosts) != 1 {
  237. return nil, fmt.Errorf("QueryViewTotals: total did not aggregate: %w", err)
  238. }
  239. cm, err := total.CloudCosts[""].GetCostMetric(request.CostMetricName)
  240. if err != nil {
  241. return nil, fmt.Errorf("QueryViewTotals: failed to retrieve cost metric: %w", err)
  242. }
  243. return &ViewTotals{
  244. NumResults: count,
  245. Combined: &ViewTableRow{
  246. Name: "Totals",
  247. KubernetesPercent: cm.KubernetesPercent,
  248. Cost: cm.Cost,
  249. },
  250. }, nil
  251. }
  252. func (rq *RepositoryQuerier) QueryViewTable(ctx context.Context, request ViewQueryRequest) (ViewTableRows, error) {
  253. ccasr, err := rq.Query(ctx, request.QueryRequest)
  254. if err != nil {
  255. return nil, fmt.Errorf("QueryViewTable: query failed: %w", err)
  256. }
  257. acc, err := ccasr.AccumulateAll()
  258. if err != nil {
  259. return nil, fmt.Errorf("QueryViewTable: accumulate failed: %w", err)
  260. }
  261. var rows ViewTableRows
  262. for key, cloudCost := range acc.CloudCosts {
  263. costMetric, err2 := cloudCost.GetCostMetric(request.CostMetricName)
  264. if err2 != nil {
  265. return nil, fmt.Errorf("QueryViewTable: failed to retrieve cost metric: %w", err)
  266. }
  267. var labels map[string]string
  268. if cloudCost.Properties != nil {
  269. labels = cloudCost.Properties.Labels
  270. }
  271. vtr := &ViewTableRow{
  272. Name: key,
  273. Labels: labels,
  274. KubernetesPercent: costMetric.KubernetesPercent,
  275. Cost: costMetric.Cost,
  276. }
  277. rows = append(rows, vtr)
  278. }
  279. // Sort Results
  280. // Sort by Name to ensure consistent return
  281. sort.SliceStable(rows, func(i, j int) bool {
  282. return rows[i].Name > rows[j].Name
  283. })
  284. switch request.SortColumn {
  285. case SortFieldName:
  286. if request.SortDirection == SortDirectionAscending {
  287. sort.SliceStable(rows, func(i, j int) bool {
  288. return rows[i].Name < rows[j].Name
  289. })
  290. }
  291. case SortFieldCost:
  292. if request.SortDirection == SortDirectionAscending {
  293. sort.SliceStable(rows, func(i, j int) bool {
  294. return rows[i].Cost < rows[j].Cost
  295. })
  296. } else {
  297. sort.SliceStable(rows, func(i, j int) bool {
  298. return rows[i].Cost > rows[j].Cost
  299. })
  300. }
  301. case SortFieldKubernetesPercent:
  302. if request.SortDirection == SortDirectionAscending {
  303. sort.SliceStable(rows, func(i, j int) bool {
  304. return rows[i].KubernetesPercent < rows[j].KubernetesPercent
  305. })
  306. } else {
  307. sort.SliceStable(rows, func(i, j int) bool {
  308. return rows[i].KubernetesPercent > rows[j].KubernetesPercent
  309. })
  310. }
  311. default:
  312. return nil, fmt.Errorf("invalid sort field '%s'", string(request.SortColumn))
  313. }
  314. // paginate sorted results
  315. if request.Offset > len(rows) {
  316. return make([]*ViewTableRow, 0), nil
  317. }
  318. if request.Limit > 0 {
  319. limit := request.Offset + request.Limit
  320. if limit > len(rows) {
  321. return rows[request.Offset:], nil
  322. }
  323. return rows[request.Offset:limit], nil
  324. }
  325. if request.Offset > 0 {
  326. return rows[request.Offset:], nil
  327. }
  328. return rows, nil
  329. }