repositoryquerier.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332
  1. package cloudcost
  2. import (
  3. "context"
  4. "fmt"
  5. "sort"
  6. "strings"
  7. "github.com/opencost/opencost/core/pkg/log"
  8. "github.com/opencost/opencost/core/pkg/opencost"
  9. )
  10. // RepositoryQuerier is an implementation of Querier and ViewQuerier which pulls directly from a Repository
  11. type RepositoryQuerier struct {
  12. repo Repository
  13. }
  14. func NewRepositoryQuerier(repo Repository) *RepositoryQuerier {
  15. return &RepositoryQuerier{repo: repo}
  16. }
  17. func (rq *RepositoryQuerier) Query(ctx context.Context, request QueryRequest) (*opencost.CloudCostSetRange, error) {
  18. repoKeys, err := rq.repo.Keys()
  19. if err != nil {
  20. return nil, fmt.Errorf("RepositoryQuerier: Query: failed to get list of keys from repository: %w", err)
  21. }
  22. // create filter
  23. compiler := opencost.NewCloudCostMatchCompiler()
  24. matcher, err := compiler.Compile(request.Filter)
  25. if err != nil {
  26. return nil, fmt.Errorf("RepositoryQuerier: Query: failed to compile filters: %w", err)
  27. }
  28. // Create a Cloud Cost Set Range in the resolution of the repository
  29. ccsr, err := opencost.NewCloudCostSetRange(request.Start, request.End, opencost.AccumulateOptionDay, "")
  30. if err != nil {
  31. return nil, fmt.Errorf("RepositoryQuerier: Query: failed to create Cloud Cost Set Range: %w", err)
  32. }
  33. for _, cloudCostSet := range ccsr.CloudCostSets {
  34. // Setting this values creates
  35. cloudCostSet.AggregationProperties = request.AggregateBy
  36. for _, key := range repoKeys {
  37. ccs, err := rq.repo.Get(cloudCostSet.Window.Start().UTC(), key)
  38. if err != nil {
  39. log.Errorf("RepositoryQuerier: Query: %s", err.Error())
  40. continue
  41. }
  42. if ccs == nil {
  43. continue
  44. }
  45. for _, cc := range ccs.CloudCosts {
  46. if matcher.Matches(cc) {
  47. cloudCostSet.Insert(cc)
  48. }
  49. }
  50. }
  51. }
  52. if request.Accumulate != opencost.AccumulateOptionNone {
  53. ccsr, err = ccsr.Accumulate(request.Accumulate)
  54. if err != nil {
  55. return nil, fmt.Errorf("RepositoryQuerier: Query: error accumulating: %w", err)
  56. }
  57. }
  58. return ccsr, nil
  59. }
  60. func (rq *RepositoryQuerier) QueryCloudCostAutocomplete(ctx context.Context, request CloudCostAutocompleteRequest) (*CloudCostAutocompleteResponse, error) {
  61. if request.Window.IsOpen() {
  62. return nil, fmt.Errorf("invalid window for autocomplete query: %s", request.Window.String())
  63. }
  64. limit := request.Limit
  65. if limit <= 0 {
  66. limit = DefaultAutocompleteResultLimit
  67. }
  68. if limit > MaxAutocompleteResultLimit {
  69. return nil, fmt.Errorf("exceeded maxiumum autocomplete result limit of %d", MaxAutocompleteResultLimit)
  70. }
  71. ccsr, err := rq.Query(ctx, QueryRequest{
  72. Start: *request.Window.Start(),
  73. End: *request.Window.End(),
  74. Accumulate: opencost.AccumulateOptionNone,
  75. Filter: request.Filter,
  76. })
  77. if err != nil {
  78. return nil, fmt.Errorf("QueryCloudCostAutocomplete: query failed: %w", err)
  79. }
  80. prop := strings.ToLower(request.Field)
  81. search := strings.ToLower(request.Search)
  82. results := map[string]struct{}{}
  83. for _, ccs := range ccsr.CloudCostSets {
  84. for _, cc := range ccs.CloudCosts {
  85. if cc == nil || cc.Properties == nil {
  86. continue
  87. }
  88. values := cloudCostAutocompleteValues(cc, prop)
  89. for _, value := range values {
  90. if value == "" {
  91. continue
  92. }
  93. if search != "" && !strings.Contains(strings.ToLower(value), search) {
  94. continue
  95. }
  96. results[value] = struct{}{}
  97. }
  98. }
  99. }
  100. data := make([]string, 0, len(results))
  101. for result := range results {
  102. data = append(data, result)
  103. }
  104. sort.Strings(data)
  105. if len(data) > limit {
  106. data = data[:limit]
  107. }
  108. return &CloudCostAutocompleteResponse{Data: data}, nil
  109. }
  110. func cloudCostAutocompleteValues(cc *opencost.CloudCost, field string) []string {
  111. if field == "label" {
  112. keys := make([]string, 0, len(cc.Properties.Labels))
  113. for label := range cc.Properties.Labels {
  114. keys = append(keys, label)
  115. }
  116. return keys
  117. }
  118. if strings.HasPrefix(field, "label:") {
  119. labelName := strings.TrimPrefix(field, "label:")
  120. if value, ok := cc.Properties.Labels[labelName]; ok {
  121. return []string{value}
  122. }
  123. return nil
  124. }
  125. property, err := opencost.ParseCloudCostProperty(field)
  126. if err != nil {
  127. return nil
  128. }
  129. value, err := cc.StringProperty(string(property))
  130. if err != nil {
  131. return nil
  132. }
  133. return []string{value}
  134. }
  135. func (rq *RepositoryQuerier) QueryViewGraph(ctx context.Context, request ViewQueryRequest) (ViewGraphData, error) {
  136. ccasr, err := rq.Query(ctx, request.QueryRequest)
  137. if err != nil {
  138. return nil, fmt.Errorf("QueryViewGraph: query failed: %w", err)
  139. }
  140. if ccasr.IsEmpty() {
  141. return make([]*ViewGraphDataSet, 0), nil
  142. }
  143. var sets ViewGraphData
  144. for _, ccas := range ccasr.CloudCostSets {
  145. items := make([]ViewGraphDataSetItem, 0)
  146. for key, cc := range ccas.CloudCosts {
  147. costMetric, err := cc.GetCostMetric(request.CostMetricName)
  148. if err != nil {
  149. return nil, fmt.Errorf("QueryViewGraph: failed to get cost metric: %w", err)
  150. }
  151. items = append(items, ViewGraphDataSetItem{
  152. Name: key,
  153. Value: costMetric.Cost,
  154. })
  155. }
  156. sort.SliceStable(items, func(i, j int) bool {
  157. return items[i].Value > items[j].Value
  158. })
  159. if len(items) > request.ChartItemsLength {
  160. otherItems := items[request.ChartItemsLength:]
  161. newItems := items[:request.ChartItemsLength]
  162. // Rename last item other and add all other values into it
  163. newItems[request.ChartItemsLength-1].Name = "Other"
  164. for _, item := range otherItems {
  165. newItems[request.ChartItemsLength-1].Value += item.Value
  166. }
  167. items = newItems
  168. }
  169. sets = append(sets, &ViewGraphDataSet{
  170. Start: *ccas.Window.Start(),
  171. End: *ccas.Window.End(),
  172. Items: items,
  173. })
  174. }
  175. return sets, nil
  176. }
  177. func (rq *RepositoryQuerier) QueryViewTotals(ctx context.Context, request ViewQueryRequest) (*ViewTotals, error) {
  178. ccasr, err := rq.Query(ctx, request.QueryRequest)
  179. if err != nil {
  180. return nil, fmt.Errorf("QueryViewTotals: query failed: %w", err)
  181. }
  182. acc, err := ccasr.AccumulateAll()
  183. if err != nil {
  184. return nil, fmt.Errorf("QueryViewTotals: accumulate failed: %w", err)
  185. }
  186. if acc.IsEmpty() {
  187. return nil, nil
  188. }
  189. count := len(acc.CloudCosts)
  190. total, err := acc.Aggregate([]string{})
  191. if err != nil {
  192. return nil, fmt.Errorf("QueryViewTotals: aggregate total failed: %w", err)
  193. }
  194. if total.IsEmpty() {
  195. return nil, fmt.Errorf("QueryViewTotals: missing total: %w", err)
  196. }
  197. if len(total.CloudCosts) != 1 {
  198. return nil, fmt.Errorf("QueryViewTotals: total did not aggregate: %w", err)
  199. }
  200. cm, err := total.CloudCosts[""].GetCostMetric(request.CostMetricName)
  201. if err != nil {
  202. return nil, fmt.Errorf("QueryViewTotals: failed to retrieve cost metric: %w", err)
  203. }
  204. return &ViewTotals{
  205. NumResults: count,
  206. Combined: &ViewTableRow{
  207. Name: "Totals",
  208. KubernetesPercent: cm.KubernetesPercent,
  209. Cost: cm.Cost,
  210. },
  211. }, nil
  212. }
  213. func (rq *RepositoryQuerier) QueryViewTable(ctx context.Context, request ViewQueryRequest) (ViewTableRows, error) {
  214. ccasr, err := rq.Query(ctx, request.QueryRequest)
  215. if err != nil {
  216. return nil, fmt.Errorf("QueryViewTable: query failed: %w", err)
  217. }
  218. acc, err := ccasr.AccumulateAll()
  219. if err != nil {
  220. return nil, fmt.Errorf("QueryViewTable: accumulate failed: %w", err)
  221. }
  222. var rows ViewTableRows
  223. for key, cloudCost := range acc.CloudCosts {
  224. costMetric, err2 := cloudCost.GetCostMetric(request.CostMetricName)
  225. if err2 != nil {
  226. return nil, fmt.Errorf("QueryViewTable: failed to retrieve cost metric: %w", err)
  227. }
  228. var labels map[string]string
  229. if cloudCost.Properties != nil {
  230. labels = cloudCost.Properties.Labels
  231. }
  232. vtr := &ViewTableRow{
  233. Name: key,
  234. Labels: labels,
  235. KubernetesPercent: costMetric.KubernetesPercent,
  236. Cost: costMetric.Cost,
  237. }
  238. rows = append(rows, vtr)
  239. }
  240. // Sort Results
  241. // Sort by Name to ensure consistent return
  242. sort.SliceStable(rows, func(i, j int) bool {
  243. return rows[i].Name > rows[j].Name
  244. })
  245. switch request.SortColumn {
  246. case SortFieldName:
  247. if request.SortDirection == SortDirectionAscending {
  248. sort.SliceStable(rows, func(i, j int) bool {
  249. return rows[i].Name < rows[j].Name
  250. })
  251. }
  252. case SortFieldCost:
  253. if request.SortDirection == SortDirectionAscending {
  254. sort.SliceStable(rows, func(i, j int) bool {
  255. return rows[i].Cost < rows[j].Cost
  256. })
  257. } else {
  258. sort.SliceStable(rows, func(i, j int) bool {
  259. return rows[i].Cost > rows[j].Cost
  260. })
  261. }
  262. case SortFieldKubernetesPercent:
  263. if request.SortDirection == SortDirectionAscending {
  264. sort.SliceStable(rows, func(i, j int) bool {
  265. return rows[i].KubernetesPercent < rows[j].KubernetesPercent
  266. })
  267. } else {
  268. sort.SliceStable(rows, func(i, j int) bool {
  269. return rows[i].KubernetesPercent > rows[j].KubernetesPercent
  270. })
  271. }
  272. default:
  273. return nil, fmt.Errorf("invalid sort field '%s'", string(request.SortColumn))
  274. }
  275. // paginate sorted results
  276. if request.Offset > len(rows) {
  277. return make([]*ViewTableRow, 0), nil
  278. }
  279. if request.Limit > 0 {
  280. limit := request.Offset + request.Limit
  281. if limit > len(rows) {
  282. return rows[request.Offset:], nil
  283. }
  284. return rows[request.Offset:limit], nil
  285. }
  286. if request.Offset > 0 {
  287. return rows[request.Offset:], nil
  288. }
  289. return rows, nil
  290. }