repositoryquerier.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330
  1. package cloudcost
  2. import (
  3. "context"
  4. "fmt"
  5. "sort"
  6. "strings"
  7. "github.com/opencost/opencost/core/pkg/autocomplete"
  8. corecloudcost "github.com/opencost/opencost/core/pkg/autocomplete/cloudcost"
  9. "github.com/opencost/opencost/core/pkg/log"
  10. "github.com/opencost/opencost/core/pkg/opencost"
  11. )
  12. // RepositoryQuerier is an implementation of Querier and ViewQuerier which pulls directly from a Repository
  13. type RepositoryQuerier struct {
  14. repo Repository
  15. }
  16. func NewRepositoryQuerier(repo Repository) *RepositoryQuerier {
  17. return &RepositoryQuerier{repo: repo}
  18. }
  19. func (rq *RepositoryQuerier) Query(ctx context.Context, request QueryRequest) (*opencost.CloudCostSetRange, error) {
  20. repoKeys, err := rq.repo.Keys()
  21. if err != nil {
  22. return nil, fmt.Errorf("RepositoryQuerier: Query: failed to get list of keys from repository: %w", err)
  23. }
  24. // create filter
  25. compiler := opencost.NewCloudCostMatchCompiler()
  26. matcher, err := compiler.Compile(request.Filter)
  27. if err != nil {
  28. return nil, fmt.Errorf("RepositoryQuerier: Query: failed to compile filters: %w", err)
  29. }
  30. // Create a Cloud Cost Set Range in the resolution of the repository
  31. ccsr, err := opencost.NewCloudCostSetRange(request.Start, request.End, opencost.AccumulateOptionDay, "")
  32. if err != nil {
  33. return nil, fmt.Errorf("RepositoryQuerier: Query: failed to create Cloud Cost Set Range: %w", err)
  34. }
  35. for _, cloudCostSet := range ccsr.CloudCostSets {
  36. // Setting this values creates
  37. cloudCostSet.AggregationProperties = request.AggregateBy
  38. for _, key := range repoKeys {
  39. ccs, err := rq.repo.Get(cloudCostSet.Window.Start().UTC(), key)
  40. if err != nil {
  41. log.Errorf("RepositoryQuerier: Query: %s", err.Error())
  42. continue
  43. }
  44. if ccs == nil {
  45. continue
  46. }
  47. for _, cc := range ccs.CloudCosts {
  48. if matcher.Matches(cc) {
  49. cloudCostSet.Insert(cc)
  50. }
  51. }
  52. }
  53. }
  54. if request.Accumulate != opencost.AccumulateOptionNone {
  55. ccsr, err = ccsr.Accumulate(request.Accumulate)
  56. if err != nil {
  57. return nil, fmt.Errorf("RepositoryQuerier: Query: error accumulating: %w", err)
  58. }
  59. }
  60. return ccsr, nil
  61. }
  62. func (rq *RepositoryQuerier) QueryCloudCostAutocomplete(ctx context.Context, request autocomplete.Request) (*autocomplete.Response, error) {
  63. field, err := autocomplete.NormalizeRequest(&request, corecloudcost.ValidateField, autocomplete.NormalizeOptions{})
  64. if err != nil {
  65. return nil, err
  66. }
  67. limit := request.Limit
  68. ccsr, err := rq.Query(ctx, QueryRequest{
  69. Start: *request.Window.Start(),
  70. End: *request.Window.End(),
  71. Accumulate: opencost.AccumulateOptionNone,
  72. Filter: request.Filter,
  73. })
  74. if err != nil {
  75. return nil, fmt.Errorf("QueryCloudCostAutocomplete: query failed: %w", err)
  76. }
  77. search := strings.ToLower(request.Search)
  78. results := map[string]struct{}{}
  79. for _, ccs := range ccsr.CloudCostSets {
  80. for _, cc := range ccs.CloudCosts {
  81. if cc == nil || cc.Properties == nil {
  82. continue
  83. }
  84. values := cloudCostAutocompleteValues(cc, field)
  85. for _, value := range values {
  86. if value == "" {
  87. continue
  88. }
  89. if search != "" && !strings.Contains(strings.ToLower(value), search) {
  90. continue
  91. }
  92. results[value] = struct{}{}
  93. }
  94. }
  95. }
  96. return &autocomplete.Response{Data: autocomplete.UniqueSortedLimited(results, limit)}, nil
  97. }
  98. func cloudCostAutocompleteValues(cc *opencost.CloudCost, field string) []string {
  99. if field == "label" {
  100. keys := make([]string, 0, len(cc.Properties.Labels))
  101. for label := range cc.Properties.Labels {
  102. keys = append(keys, label)
  103. }
  104. return keys
  105. }
  106. if strings.HasPrefix(field, "label:") {
  107. labelName := strings.TrimPrefix(field, "label:")
  108. if value, ok := cloudCostLabelValueFold(cc.Properties.Labels, labelName); ok {
  109. return []string{value}
  110. }
  111. return nil
  112. }
  113. property, err := opencost.ParseCloudCostProperty(field)
  114. if err != nil {
  115. return nil
  116. }
  117. value, err := cc.StringProperty(string(property))
  118. if err != nil {
  119. return nil
  120. }
  121. return []string{value}
  122. }
  123. func cloudCostLabelValueFold(labels map[string]string, key string) (string, bool) {
  124. if v, ok := labels[key]; ok {
  125. return v, true
  126. }
  127. for k, v := range labels {
  128. if strings.EqualFold(k, key) {
  129. return v, true
  130. }
  131. }
  132. return "", false
  133. }
  134. func (rq *RepositoryQuerier) QueryViewGraph(ctx context.Context, request ViewQueryRequest) (ViewGraphData, error) {
  135. ccasr, err := rq.Query(ctx, request.QueryRequest)
  136. if err != nil {
  137. return nil, fmt.Errorf("QueryViewGraph: query failed: %w", err)
  138. }
  139. if ccasr.IsEmpty() {
  140. return make([]*ViewGraphDataSet, 0), nil
  141. }
  142. var sets ViewGraphData
  143. for _, ccas := range ccasr.CloudCostSets {
  144. items := make([]ViewGraphDataSetItem, 0)
  145. for key, cc := range ccas.CloudCosts {
  146. costMetric, err := cc.GetCostMetric(request.CostMetricName)
  147. if err != nil {
  148. return nil, fmt.Errorf("QueryViewGraph: failed to get cost metric: %w", err)
  149. }
  150. items = append(items, ViewGraphDataSetItem{
  151. Name: key,
  152. Value: costMetric.Cost,
  153. })
  154. }
  155. sort.SliceStable(items, func(i, j int) bool {
  156. return items[i].Value > items[j].Value
  157. })
  158. if len(items) > request.ChartItemsLength {
  159. otherItems := items[request.ChartItemsLength:]
  160. newItems := items[:request.ChartItemsLength]
  161. // Rename last item other and add all other values into it
  162. newItems[request.ChartItemsLength-1].Name = "Other"
  163. for _, item := range otherItems {
  164. newItems[request.ChartItemsLength-1].Value += item.Value
  165. }
  166. items = newItems
  167. }
  168. sets = append(sets, &ViewGraphDataSet{
  169. Start: *ccas.Window.Start(),
  170. End: *ccas.Window.End(),
  171. Items: items,
  172. })
  173. }
  174. return sets, nil
  175. }
  176. func (rq *RepositoryQuerier) QueryViewTotals(ctx context.Context, request ViewQueryRequest) (*ViewTotals, error) {
  177. ccasr, err := rq.Query(ctx, request.QueryRequest)
  178. if err != nil {
  179. return nil, fmt.Errorf("QueryViewTotals: query failed: %w", err)
  180. }
  181. acc, err := ccasr.AccumulateAll()
  182. if err != nil {
  183. return nil, fmt.Errorf("QueryViewTotals: accumulate failed: %w", err)
  184. }
  185. if acc.IsEmpty() {
  186. return nil, nil
  187. }
  188. count := len(acc.CloudCosts)
  189. total, err := acc.Aggregate([]string{})
  190. if err != nil {
  191. return nil, fmt.Errorf("QueryViewTotals: aggregate total failed: %w", err)
  192. }
  193. if total.IsEmpty() {
  194. return nil, fmt.Errorf("QueryViewTotals: missing total: %w", err)
  195. }
  196. if len(total.CloudCosts) != 1 {
  197. return nil, fmt.Errorf("QueryViewTotals: total did not aggregate: %w", err)
  198. }
  199. cm, err := total.CloudCosts[""].GetCostMetric(request.CostMetricName)
  200. if err != nil {
  201. return nil, fmt.Errorf("QueryViewTotals: failed to retrieve cost metric: %w", err)
  202. }
  203. return &ViewTotals{
  204. NumResults: count,
  205. Combined: &ViewTableRow{
  206. Name: "Totals",
  207. KubernetesPercent: cm.KubernetesPercent,
  208. Cost: cm.Cost,
  209. },
  210. }, nil
  211. }
  212. func (rq *RepositoryQuerier) QueryViewTable(ctx context.Context, request ViewQueryRequest) (ViewTableRows, error) {
  213. ccasr, err := rq.Query(ctx, request.QueryRequest)
  214. if err != nil {
  215. return nil, fmt.Errorf("QueryViewTable: query failed: %w", err)
  216. }
  217. acc, err := ccasr.AccumulateAll()
  218. if err != nil {
  219. return nil, fmt.Errorf("QueryViewTable: accumulate failed: %w", err)
  220. }
  221. var rows ViewTableRows
  222. for key, cloudCost := range acc.CloudCosts {
  223. costMetric, err2 := cloudCost.GetCostMetric(request.CostMetricName)
  224. if err2 != nil {
  225. return nil, fmt.Errorf("QueryViewTable: failed to retrieve cost metric: %w", err)
  226. }
  227. var labels map[string]string
  228. if cloudCost.Properties != nil {
  229. labels = cloudCost.Properties.Labels
  230. }
  231. vtr := &ViewTableRow{
  232. Name: key,
  233. Labels: labels,
  234. KubernetesPercent: costMetric.KubernetesPercent,
  235. Cost: costMetric.Cost,
  236. }
  237. rows = append(rows, vtr)
  238. }
  239. // Sort Results
  240. // Sort by Name to ensure consistent return
  241. sort.SliceStable(rows, func(i, j int) bool {
  242. return rows[i].Name > rows[j].Name
  243. })
  244. switch request.SortColumn {
  245. case SortFieldName:
  246. if request.SortDirection == SortDirectionAscending {
  247. sort.SliceStable(rows, func(i, j int) bool {
  248. return rows[i].Name < rows[j].Name
  249. })
  250. }
  251. case SortFieldCost:
  252. if request.SortDirection == SortDirectionAscending {
  253. sort.SliceStable(rows, func(i, j int) bool {
  254. return rows[i].Cost < rows[j].Cost
  255. })
  256. } else {
  257. sort.SliceStable(rows, func(i, j int) bool {
  258. return rows[i].Cost > rows[j].Cost
  259. })
  260. }
  261. case SortFieldKubernetesPercent:
  262. if request.SortDirection == SortDirectionAscending {
  263. sort.SliceStable(rows, func(i, j int) bool {
  264. return rows[i].KubernetesPercent < rows[j].KubernetesPercent
  265. })
  266. } else {
  267. sort.SliceStable(rows, func(i, j int) bool {
  268. return rows[i].KubernetesPercent > rows[j].KubernetesPercent
  269. })
  270. }
  271. default:
  272. return nil, fmt.Errorf("invalid sort field '%s'", string(request.SortColumn))
  273. }
  274. // paginate sorted results
  275. if request.Offset > len(rows) {
  276. return make([]*ViewTableRow, 0), nil
  277. }
  278. if request.Limit > 0 {
  279. limit := request.Offset + request.Limit
  280. if limit > len(rows) {
  281. return rows[request.Offset:], nil
  282. }
  283. return rows[request.Offset:limit], nil
  284. }
  285. if request.Offset > 0 {
  286. return rows[request.Offset:], nil
  287. }
  288. return rows, nil
  289. }