bigqueryintegration.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. package gcp
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "strings"
  7. "time"
  8. "github.com/opencost/opencost/core/pkg/log"
  9. "github.com/opencost/opencost/core/pkg/opencost"
  10. "google.golang.org/api/iterator"
  11. )
  12. type BigQueryIntegration struct {
  13. BigQueryQuerier
  14. }
  15. const (
  16. UsageDateColumnName = "usage_date"
  17. BillingAccountIDColumnName = "billing_id"
  18. ProjectIDColumnName = "project_id"
  19. ProjectNameColumnName = "project_name"
  20. RegionColumnName = "region"
  21. ZoneColumnName = "zone"
  22. ServiceDescriptionColumnName = "service"
  23. SKUDescriptionColumnName = "description"
  24. LabelsColumnName = "labels"
  25. ProjectLabelsColumnName = "project_labels"
  26. ResourceNameColumnName = "resource"
  27. ResourceGlobalNameColumnName = "global_resource"
  28. CostColumnName = "cost"
  29. ListCostColumnName = "list_cost"
  30. CreditsColumnName = "credits"
  31. )
  32. const BiqQueryWherePartitionFmt = `DATE(_PARTITIONTIME) >= "%s" AND DATE(_PARTITIONTIME) < "%s"`
  33. const BiqQueryWhereDateFmt = `usage_start_time >= "%s" AND usage_start_time < "%s"`
  34. func (bqi *BigQueryIntegration) GetCloudCost(start time.Time, end time.Time) (*opencost.CloudCostSetRange, error) {
  35. cudRates, err := bqi.GetFlexibleCUDRates(start, end)
  36. if err != nil {
  37. return nil, fmt.Errorf("error retrieving CUD rates: %w", err)
  38. }
  39. // Build Query
  40. selectColumns := []string{
  41. fmt.Sprintf("TIMESTAMP_TRUNC(usage_start_time, day) as %s", UsageDateColumnName),
  42. fmt.Sprintf("billing_account_id as %s", BillingAccountIDColumnName),
  43. fmt.Sprintf("project.id as %s", ProjectIDColumnName),
  44. fmt.Sprintf("project.name as %s", ProjectNameColumnName),
  45. fmt.Sprintf("location.region as %s", RegionColumnName),
  46. fmt.Sprintf("location.zone as %s", ZoneColumnName),
  47. fmt.Sprintf("service.description as %s", ServiceDescriptionColumnName),
  48. fmt.Sprintf("sku.description as %s", SKUDescriptionColumnName),
  49. fmt.Sprintf("resource.name as %s", ResourceNameColumnName),
  50. fmt.Sprintf("resource.global_name as %s", ResourceGlobalNameColumnName),
  51. fmt.Sprintf("TO_JSON_STRING(labels) as %s", LabelsColumnName),
  52. fmt.Sprintf("SUM(cost) as %s", CostColumnName),
  53. fmt.Sprintf("SUM(cost_at_list) as %s", ListCostColumnName),
  54. fmt.Sprintf("ARRAY_CONCAT_AGG(credits) as %s", CreditsColumnName),
  55. }
  56. groupByColumns := []string{
  57. UsageDateColumnName,
  58. BillingAccountIDColumnName,
  59. ProjectIDColumnName,
  60. ProjectNameColumnName,
  61. RegionColumnName,
  62. ZoneColumnName,
  63. ServiceDescriptionColumnName,
  64. SKUDescriptionColumnName,
  65. LabelsColumnName,
  66. ResourceNameColumnName,
  67. ResourceGlobalNameColumnName,
  68. }
  69. whereConjuncts := GetWhereConjuncts(start, end, bqi.ExcludePartitionTime)
  70. columnStr := strings.Join(selectColumns, ", ")
  71. table := fmt.Sprintf(" `%s` bd ", bqi.GetBillingDataDataset())
  72. whereClause := strings.Join(whereConjuncts, " AND ")
  73. groupByStr := strings.Join(groupByColumns, ", ")
  74. queryStr := `
  75. SELECT %s
  76. FROM %s
  77. WHERE %s
  78. GROUP BY %s
  79. `
  80. querystr := fmt.Sprintf(queryStr, columnStr, table, whereClause, groupByStr)
  81. // Perform Query and parse values
  82. ccsr, err := opencost.NewCloudCostSetRange(start, end, opencost.AccumulateOptionDay, bqi.Key())
  83. if err != nil {
  84. return ccsr, fmt.Errorf("error creating new CloudCostSetRange: %s", err)
  85. }
  86. iter, err := bqi.Query(context.Background(), querystr)
  87. if err != nil {
  88. return ccsr, fmt.Errorf("error querying: %s", err)
  89. }
  90. // Parse query into CloudCostSetRange
  91. for {
  92. ccl := CloudCostLoader{
  93. FlexibleCUDRates: cudRates,
  94. }
  95. err = iter.Next(&ccl)
  96. if err == iterator.Done {
  97. break
  98. }
  99. if err != nil {
  100. return ccsr, err
  101. }
  102. if ccl.CloudCost == nil {
  103. continue
  104. }
  105. ccsr.LoadCloudCost(ccl.CloudCost)
  106. }
  107. return ccsr, nil
  108. }
  109. // GetWhereConjuncts creates a list of Where filter statements that filter for usage start date and partition time
  110. // additional filters can be added before combining into the final where clause
  111. func GetWhereConjuncts(start time.Time, end time.Time, excludePartitions bool) []string {
  112. var conjuncts []string
  113. if !excludePartitions {
  114. partitionStart := start
  115. partitionEnd := end.AddDate(0, 0, 2)
  116. wherePartition := fmt.Sprintf(BiqQueryWherePartitionFmt, partitionStart.Format("2006-01-02"), partitionEnd.Format("2006-01-02"))
  117. conjuncts = append(conjuncts, wherePartition)
  118. }
  119. whereDate := fmt.Sprintf(BiqQueryWhereDateFmt, start.Format("2006-01-02"), end.Format("2006-01-02"))
  120. conjuncts = append(conjuncts, whereDate)
  121. return conjuncts
  122. }
  123. // FlexibleCUDRates are the total amount paid / total amount credited per day for all Flexible CUDs. Since credited will be a negative value
  124. // this will be a negative ratio. This can then be multiplied with the credits from Flexible CUDs on specific line items to determine
  125. // the amount paid for the credit it received. This allows us to amortize the Flexible CUD costs which are not associated with resources
  126. // in the billing export. AmountPayed itself may have some credits on it so a Rate and a NetRate are created.
  127. // Having both allow us to populate AmortizedCost and AmortizedNetCost respectively.
  128. type FlexibleCUDRates struct {
  129. NetRate float64
  130. Rate float64
  131. }
  132. // GetFlexibleCUDRates returns a map of FlexibleCUDRates keyed on the start time of the day which those
  133. // FlexibleCUDRates were derived from.
  134. func (bqi *BigQueryIntegration) GetFlexibleCUDRates(start time.Time, end time.Time) (map[time.Time]FlexibleCUDRates, error) {
  135. costsByDate, err := bqi.queryFlexibleCUDTotalCosts(start, end)
  136. if err != nil {
  137. return nil, fmt.Errorf("GetFlexibleCUDRates: %w", err)
  138. }
  139. creditsByDate, err := bqi.queryFlexibleCUDTotalCredits(start, end)
  140. if err != nil {
  141. return nil, fmt.Errorf("GetFlexibleCUDRates: %w", err)
  142. }
  143. results := map[time.Time]FlexibleCUDRates{}
  144. for date, amountCredited := range creditsByDate {
  145. // Protection against divide by zero
  146. if amountCredited == 0 {
  147. log.Warnf("GetFlexibleCUDRates: 0 value total credit for Flexible CUDs for date %s", date.Format(time.RFC3339))
  148. continue
  149. }
  150. amountPayed, ok := costsByDate[date]
  151. if !ok {
  152. log.Warnf("GetFlexibleCUDRates: could not find Flexible CUD payments for date %s", date.Format(time.RFC3339))
  153. continue
  154. }
  155. // amountPayed itself may have some credits on it so a Rate and a NetRate are created.
  156. // Having both allow us to populate AmortizedCost and AmortizedNetCost respectively.
  157. results[date] = FlexibleCUDRates{
  158. NetRate: (amountPayed.cost + amountPayed.credits) / amountCredited,
  159. Rate: amountPayed.cost / amountCredited,
  160. }
  161. }
  162. return results, nil
  163. }
  164. func (bqi *BigQueryIntegration) queryFlexibleCUDTotalCosts(start time.Time, end time.Time) (map[time.Time]flexibleCUDCostTotals, error) {
  165. queryFmt := `
  166. SELECT
  167. TIMESTAMP_TRUNC(usage_start_time, day) as usage_date,
  168. sum(cost),
  169. IFNULL(SUM((Select SUM(amount) FROM bd.credits)),0),
  170. FROM %s
  171. WHERE %s
  172. GROUP BY usage_date
  173. `
  174. table := fmt.Sprintf(" `%s` bd ", bqi.GetBillingDataDataset())
  175. whereConjuncts := GetWhereConjuncts(start, end, bqi.ExcludePartitionTime)
  176. whereConjuncts = append(whereConjuncts, "sku.description like 'Commitment - dollar based v1:%'")
  177. whereClause := strings.Join(whereConjuncts, " AND ")
  178. query := fmt.Sprintf(queryFmt, table, whereClause)
  179. iter, err := bqi.Query(context.Background(), query)
  180. if err != nil {
  181. return nil, fmt.Errorf("queryCUDAmountPayed: query error %w", err)
  182. }
  183. var loader FlexibleCUDCostTotalsLoader
  184. for {
  185. err = iter.Next(&loader)
  186. if errors.Is(err, iterator.Done) {
  187. break
  188. }
  189. if err != nil {
  190. return nil, fmt.Errorf("queryCUDAmountPayed: load error %w", err)
  191. }
  192. }
  193. return loader.values, nil
  194. }
  195. func (bqi *BigQueryIntegration) queryFlexibleCUDTotalCredits(start time.Time, end time.Time) (map[time.Time]float64, error) {
  196. queryFmt := `SELECT
  197. TIMESTAMP_TRUNC(usage_start_time, day) as usage_date,
  198. sum(credits.amount)
  199. FROM %s
  200. CROSS JOIN UNNEST(bd.credits) AS credits
  201. WHERE %s
  202. GROUP BY usage_date
  203. `
  204. table := fmt.Sprintf(" `%s` bd ", bqi.GetBillingDataDataset())
  205. whereConjuncts := GetWhereConjuncts(start, end, bqi.ExcludePartitionTime)
  206. whereConjuncts = append(whereConjuncts, "credits.type = 'COMMITTED_USAGE_DISCOUNT_DOLLAR_BASE'")
  207. whereClause := strings.Join(whereConjuncts, " AND ")
  208. query := fmt.Sprintf(queryFmt, table, whereClause)
  209. iter, err := bqi.Query(context.Background(), query)
  210. if err != nil {
  211. return nil, fmt.Errorf("queryFlexibleCUDTotalCredits: query error %w", err)
  212. }
  213. var loader FlexibleCUDCreditTotalsLoader
  214. for {
  215. err = iter.Next(&loader)
  216. if errors.Is(err, iterator.Done) {
  217. break
  218. }
  219. if err != nil {
  220. return nil, fmt.Errorf("queryFlexibleCUDTotalCredits: load error %w", err)
  221. }
  222. }
  223. return loader.values, nil
  224. }