bigqueryintegration.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. package gcp
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "strings"
  7. "time"
  8. "github.com/opencost/opencost/core/pkg/log"
  9. "github.com/opencost/opencost/core/pkg/opencost"
  10. "google.golang.org/api/iterator"
  11. )
  12. type BigQueryIntegration struct {
  13. BigQueryQuerier
  14. }
  15. const (
  16. UsageDateColumnName = "usage_date"
  17. BillingAccountIDColumnName = "billing_id"
  18. ProjectIDColumnName = "project_id"
  19. ProjectNameColumnName = "project_name"
  20. RegionColumnName = "region"
  21. ZoneColumnName = "zone"
  22. ServiceDescriptionColumnName = "service"
  23. SKUDescriptionColumnName = "description"
  24. LabelsColumnName = "labels"
  25. ResourceNameColumnName = "resource"
  26. ResourceGlobalNameColumnName = "global_resource"
  27. CostColumnName = "cost"
  28. ListCostColumnName = "list_cost"
  29. CreditsColumnName = "credits"
  30. )
  31. const BiqQueryWherePartitionFmt = `DATE(_PARTITIONTIME) >= "%s" AND DATE(_PARTITIONTIME) < "%s"`
  32. const BiqQueryWhereDateFmt = `usage_start_time >= "%s" AND usage_start_time < "%s"`
  33. func (bqi *BigQueryIntegration) GetCloudCost(start time.Time, end time.Time) (*opencost.CloudCostSetRange, error) {
  34. cudRates, err := bqi.GetFlexibleCUDRates(start, end)
  35. if err != nil {
  36. return nil, fmt.Errorf("error retrieving CUD rates: %w", err)
  37. }
  38. // Build Query
  39. selectColumns := []string{
  40. fmt.Sprintf("TIMESTAMP_TRUNC(usage_start_time, day) as %s", UsageDateColumnName),
  41. fmt.Sprintf("billing_account_id as %s", BillingAccountIDColumnName),
  42. fmt.Sprintf("project.id as %s", ProjectIDColumnName),
  43. fmt.Sprintf("project.name as %s", ProjectNameColumnName),
  44. fmt.Sprintf("location.region as %s", RegionColumnName),
  45. fmt.Sprintf("location.zone as %s", ZoneColumnName),
  46. fmt.Sprintf("service.description as %s", ServiceDescriptionColumnName),
  47. fmt.Sprintf("sku.description as %s", SKUDescriptionColumnName),
  48. fmt.Sprintf("resource.name as %s", ResourceNameColumnName),
  49. fmt.Sprintf("resource.global_name as %s", ResourceGlobalNameColumnName),
  50. fmt.Sprintf("TO_JSON_STRING(labels) as %s", LabelsColumnName),
  51. fmt.Sprintf("SUM(cost) as %s", CostColumnName),
  52. fmt.Sprintf("SUM(cost_at_list) as %s", ListCostColumnName),
  53. fmt.Sprintf("ARRAY_CONCAT_AGG(credits) as %s", CreditsColumnName),
  54. }
  55. groupByColumns := []string{
  56. UsageDateColumnName,
  57. BillingAccountIDColumnName,
  58. ProjectIDColumnName,
  59. ProjectNameColumnName,
  60. RegionColumnName,
  61. ZoneColumnName,
  62. ServiceDescriptionColumnName,
  63. SKUDescriptionColumnName,
  64. LabelsColumnName,
  65. ResourceNameColumnName,
  66. ResourceGlobalNameColumnName,
  67. }
  68. whereConjuncts := GetWhereConjuncts(start, end)
  69. columnStr := strings.Join(selectColumns, ", ")
  70. table := fmt.Sprintf(" `%s` bd ", bqi.GetBillingDataDataset())
  71. whereClause := strings.Join(whereConjuncts, " AND ")
  72. groupByStr := strings.Join(groupByColumns, ", ")
  73. queryStr := `
  74. SELECT %s
  75. FROM %s
  76. WHERE %s
  77. GROUP BY %s
  78. `
  79. querystr := fmt.Sprintf(queryStr, columnStr, table, whereClause, groupByStr)
  80. // Perform Query and parse values
  81. ccsr, err := opencost.NewCloudCostSetRange(start, end, opencost.AccumulateOptionDay, bqi.Key())
  82. if err != nil {
  83. return ccsr, fmt.Errorf("error creating new CloudCostSetRange: %s", err)
  84. }
  85. iter, err := bqi.Query(context.Background(), querystr)
  86. if err != nil {
  87. return ccsr, fmt.Errorf("error querying: %s", err)
  88. }
  89. // Parse query into CloudCostSetRange
  90. for {
  91. ccl := CloudCostLoader{
  92. FlexibleCUDRates: cudRates,
  93. }
  94. err = iter.Next(&ccl)
  95. if err == iterator.Done {
  96. break
  97. }
  98. if err != nil {
  99. return ccsr, err
  100. }
  101. if ccl.CloudCost == nil {
  102. continue
  103. }
  104. ccsr.LoadCloudCost(ccl.CloudCost)
  105. }
  106. return ccsr, nil
  107. }
  108. // GetWhereConjuncts creates a list of Where filter statements that filter for usage start date and partition time
  109. // additional filters can be added before combining into the final where clause
  110. func GetWhereConjuncts(start time.Time, end time.Time) []string {
  111. partitionStart := start
  112. partitionEnd := end.AddDate(0, 0, 2)
  113. wherePartition := fmt.Sprintf(BiqQueryWherePartitionFmt, partitionStart.Format("2006-01-02"), partitionEnd.Format("2006-01-02"))
  114. whereDate := fmt.Sprintf(BiqQueryWhereDateFmt, start.Format("2006-01-02"), end.Format("2006-01-02"))
  115. return []string{wherePartition, whereDate}
  116. }
  117. // FlexibleCUDRates are the total amount paid / total amount credited per day for all Flexible CUDs. Since credited will be a negative value
  118. // this will be a negative ratio. This can then be multiplied with the credits from Flexible CUDs on specific line items to determine
  119. // the amount paid for the credit it received. This allows us to amortize the Flexible CUD costs which are not associated with resources
  120. // in the billing export. AmountPayed itself may have some credits on it so a Rate and a NetRate are created.
  121. // Having both allow us to populate AmortizedCost and AmortizedNetCost respectively.
  122. type FlexibleCUDRates struct {
  123. NetRate float64
  124. Rate float64
  125. }
  126. // GetFlexibleCUDRates returns a map of FlexibleCUDRates keyed on the start time of the day which those
  127. // FlexibleCUDRates were derived from.
  128. func (bqi *BigQueryIntegration) GetFlexibleCUDRates(start time.Time, end time.Time) (map[time.Time]FlexibleCUDRates, error) {
  129. costsByDate, err := bqi.queryFlexibleCUDTotalCosts(start, end)
  130. if err != nil {
  131. return nil, fmt.Errorf("GetFlexibleCUDRates: %w", err)
  132. }
  133. creditsByDate, err := bqi.queryFlexibleCUDTotalCredits(start, end)
  134. if err != nil {
  135. return nil, fmt.Errorf("GetFlexibleCUDRates: %w", err)
  136. }
  137. results := map[time.Time]FlexibleCUDRates{}
  138. for date, amountCredited := range creditsByDate {
  139. // Protection against divide by zero
  140. if amountCredited == 0 {
  141. log.Warnf("GetFlexibleCUDRates: 0 value total credit for Flexible CUDs for date %s", date.Format(time.RFC3339))
  142. continue
  143. }
  144. amountPayed, ok := costsByDate[date]
  145. if !ok {
  146. log.Warnf("GetFlexibleCUDRates: could not find Flexible CUD payments for date %s", date.Format(time.RFC3339))
  147. continue
  148. }
  149. // amountPayed itself may have some credits on it so a Rate and a NetRate are created.
  150. // Having both allow us to populate AmortizedCost and AmortizedNetCost respectively.
  151. results[date] = FlexibleCUDRates{
  152. NetRate: (amountPayed.cost + amountPayed.credits) / amountCredited,
  153. Rate: amountPayed.cost / amountCredited,
  154. }
  155. }
  156. return results, nil
  157. }
  158. func (bqi *BigQueryIntegration) queryFlexibleCUDTotalCosts(start time.Time, end time.Time) (map[time.Time]flexibleCUDCostTotals, error) {
  159. queryFmt := `
  160. SELECT
  161. TIMESTAMP_TRUNC(usage_start_time, day) as usage_date,
  162. sum(cost),
  163. IFNULL(SUM((Select SUM(amount) FROM bd.credits)),0),
  164. FROM %s
  165. WHERE %s
  166. GROUP BY usage_date
  167. `
  168. table := fmt.Sprintf(" `%s` bd ", bqi.GetBillingDataDataset())
  169. whereConjuncts := GetWhereConjuncts(start, end)
  170. whereConjuncts = append(whereConjuncts, "sku.description like 'Commitment - dollar based v1:%'")
  171. whereClause := strings.Join(whereConjuncts, " AND ")
  172. query := fmt.Sprintf(queryFmt, table, whereClause)
  173. iter, err := bqi.Query(context.Background(), query)
  174. if err != nil {
  175. return nil, fmt.Errorf("queryCUDAmountPayed: query error %w", err)
  176. }
  177. var loader FlexibleCUDCostTotalsLoader
  178. for {
  179. err = iter.Next(&loader)
  180. if errors.Is(err, iterator.Done) {
  181. break
  182. }
  183. if err != nil {
  184. return nil, fmt.Errorf("queryCUDAmountPayed: load error %w", err)
  185. }
  186. }
  187. return loader.values, nil
  188. }
  189. func (bqi *BigQueryIntegration) queryFlexibleCUDTotalCredits(start time.Time, end time.Time) (map[time.Time]float64, error) {
  190. queryFmt := `SELECT
  191. TIMESTAMP_TRUNC(usage_start_time, day) as usage_date,
  192. sum(credits.amount)
  193. FROM %s
  194. CROSS JOIN UNNEST(bd.credits) AS credits
  195. WHERE %s
  196. GROUP BY usage_date
  197. `
  198. table := fmt.Sprintf(" `%s` bd ", bqi.GetBillingDataDataset())
  199. whereConjuncts := GetWhereConjuncts(start, end)
  200. whereConjuncts = append(whereConjuncts, "credits.type = 'COMMITTED_USAGE_DISCOUNT_DOLLAR_BASE'")
  201. whereClause := strings.Join(whereConjuncts, " AND ")
  202. query := fmt.Sprintf(queryFmt, table, whereClause)
  203. iter, err := bqi.Query(context.Background(), query)
  204. if err != nil {
  205. return nil, fmt.Errorf("queryFlexibleCUDTotalCredits: query error %w", err)
  206. }
  207. var loader FlexibleCUDCreditTotalsLoader
  208. for {
  209. err = iter.Next(&loader)
  210. if errors.Is(err, iterator.Done) {
  211. break
  212. }
  213. if err != nil {
  214. return nil, fmt.Errorf("queryFlexibleCUDTotalCredits: load error %w", err)
  215. }
  216. }
  217. return loader.values, nil
  218. }