bigqueryintegration.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. package gcp
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "strings"
  7. "time"
  8. "github.com/opencost/opencost/core/pkg/log"
  9. "github.com/opencost/opencost/core/pkg/opencost"
  10. "google.golang.org/api/iterator"
  11. )
  12. type BigQueryIntegration struct {
  13. BigQueryQuerier
  14. }
  15. const (
  16. UsageDateColumnName = "usage_date"
  17. BillingAccountIDColumnName = "billing_id"
  18. ProjectIDColumnName = "project_id"
  19. ServiceDescriptionColumnName = "service"
  20. SKUDescriptionColumnName = "description"
  21. LabelsColumnName = "labels"
  22. ResourceNameColumnName = "resource"
  23. ResourceGlobalNameColumnName = "global_resource"
  24. CostColumnName = "cost"
  25. ListCostColumnName = "list_cost"
  26. CreditsColumnName = "credits"
  27. )
  28. const BiqQueryWherePartitionFmt = `DATE(_PARTITIONTIME) >= "%s" AND DATE(_PARTITIONTIME) < "%s"`
  29. const BiqQueryWhereDateFmt = `usage_start_time >= "%s" AND usage_start_time < "%s"`
  30. func (bqi *BigQueryIntegration) GetCloudCost(start time.Time, end time.Time) (*opencost.CloudCostSetRange, error) {
  31. cudRates, err := bqi.GetFlexibleCUDRates(start, end)
  32. if err != nil {
  33. return nil, fmt.Errorf("error retrieving CUD rates: %w", err)
  34. }
  35. // Build Query
  36. selectColumns := []string{
  37. fmt.Sprintf("TIMESTAMP_TRUNC(usage_start_time, day) as %s", UsageDateColumnName),
  38. fmt.Sprintf("billing_account_id as %s", BillingAccountIDColumnName),
  39. fmt.Sprintf("project.id as %s", ProjectIDColumnName),
  40. fmt.Sprintf("service.description as %s", ServiceDescriptionColumnName),
  41. fmt.Sprintf("sku.description as %s", SKUDescriptionColumnName),
  42. fmt.Sprintf("resource.name as %s", ResourceNameColumnName),
  43. fmt.Sprintf("resource.global_name as %s", ResourceGlobalNameColumnName),
  44. fmt.Sprintf("TO_JSON_STRING(labels) as %s", LabelsColumnName),
  45. fmt.Sprintf("SUM(cost) as %s", CostColumnName),
  46. fmt.Sprintf("SUM(cost_at_list) as %s", ListCostColumnName),
  47. fmt.Sprintf("ARRAY_CONCAT_AGG(credits) as %s", CreditsColumnName),
  48. }
  49. groupByColumns := []string{
  50. UsageDateColumnName,
  51. BillingAccountIDColumnName,
  52. ProjectIDColumnName,
  53. ServiceDescriptionColumnName,
  54. SKUDescriptionColumnName,
  55. LabelsColumnName,
  56. ResourceNameColumnName,
  57. ResourceGlobalNameColumnName,
  58. }
  59. whereConjuncts := GetWhereConjuncts(start, end)
  60. columnStr := strings.Join(selectColumns, ", ")
  61. table := fmt.Sprintf(" `%s` bd ", bqi.GetBillingDataDataset())
  62. whereClause := strings.Join(whereConjuncts, " AND ")
  63. groupByStr := strings.Join(groupByColumns, ", ")
  64. queryStr := `
  65. SELECT %s
  66. FROM %s
  67. WHERE %s
  68. GROUP BY %s
  69. `
  70. querystr := fmt.Sprintf(queryStr, columnStr, table, whereClause, groupByStr)
  71. // Perform Query and parse values
  72. ccsr, err := opencost.NewCloudCostSetRange(start, end, opencost.AccumulateOptionDay, bqi.Key())
  73. if err != nil {
  74. return ccsr, fmt.Errorf("error creating new CloudCostSetRange: %s", err)
  75. }
  76. iter, err := bqi.Query(context.Background(), querystr)
  77. if err != nil {
  78. return ccsr, fmt.Errorf("error querying: %s", err)
  79. }
  80. // Parse query into CloudCostSetRange
  81. for {
  82. ccl := CloudCostLoader{
  83. FlexibleCUDRates: cudRates,
  84. }
  85. err = iter.Next(&ccl)
  86. if err == iterator.Done {
  87. break
  88. }
  89. if err != nil {
  90. return ccsr, err
  91. }
  92. if ccl.CloudCost == nil {
  93. continue
  94. }
  95. ccsr.LoadCloudCost(ccl.CloudCost)
  96. }
  97. return ccsr, nil
  98. }
  99. // GetWhereConjuncts creates a list of Where filter statements that filter for usage start date and partition time
  100. // additional filters can be added before combining into the final where clause
  101. func GetWhereConjuncts(start time.Time, end time.Time) []string {
  102. partitionStart := start
  103. partitionEnd := end.AddDate(0, 0, 2)
  104. wherePartition := fmt.Sprintf(BiqQueryWherePartitionFmt, partitionStart.Format("2006-01-02"), partitionEnd.Format("2006-01-02"))
  105. whereDate := fmt.Sprintf(BiqQueryWhereDateFmt, start.Format("2006-01-02"), end.Format("2006-01-02"))
  106. return []string{wherePartition, whereDate}
  107. }
  108. // FlexibleCUDRates are the total amount paid / total amount credited per day for all Flexible CUDs. Since credited will be a negative value
  109. // this will be a negative ratio. This can then be multiplied with the credits from Flexible CUDs on specific line items to determine
  110. // the amount paid for the credit it received. This allows us to amortize the Flexible CUD costs which are not associated with resources
  111. // in the billing export. AmountPayed itself may have some credits on it so a Rate and a NetRate are created.
  112. // Having both allow us to populate AmortizedCost and AmortizedNetCost respectively.
  113. type FlexibleCUDRates struct {
  114. NetRate float64
  115. Rate float64
  116. }
  117. // GetFlexibleCUDRates returns a map of FlexibleCUDRates keyed on the start time of the day which those
  118. // FlexibleCUDRates were derived from.
  119. func (bqi *BigQueryIntegration) GetFlexibleCUDRates(start time.Time, end time.Time) (map[time.Time]FlexibleCUDRates, error) {
  120. costsByDate, err := bqi.queryFlexibleCUDTotalCosts(start, end)
  121. if err != nil {
  122. return nil, fmt.Errorf("GetFlexibleCUDRates: %w", err)
  123. }
  124. creditsByDate, err := bqi.queryFlexibleCUDTotalCredits(start, end)
  125. if err != nil {
  126. return nil, fmt.Errorf("GetFlexibleCUDRates: %w", err)
  127. }
  128. results := map[time.Time]FlexibleCUDRates{}
  129. for date, amountCredited := range creditsByDate {
  130. // Protection against divide by zero
  131. if amountCredited == 0 {
  132. log.Warnf("GetFlexibleCUDRates: 0 value total credit for Flexible CUDs for date %s", date.Format(time.RFC3339))
  133. continue
  134. }
  135. amountPayed, ok := costsByDate[date]
  136. if !ok {
  137. log.Warnf("GetFlexibleCUDRates: could not find Flexible CUD payments for date %s", date.Format(time.RFC3339))
  138. continue
  139. }
  140. // amountPayed itself may have some credits on it so a Rate and a NetRate are created.
  141. // Having both allow us to populate AmortizedCost and AmortizedNetCost respectively.
  142. results[date] = FlexibleCUDRates{
  143. NetRate: (amountPayed.cost + amountPayed.credits) / amountCredited,
  144. Rate: amountPayed.cost / amountCredited,
  145. }
  146. }
  147. return results, nil
  148. }
  149. func (bqi *BigQueryIntegration) queryFlexibleCUDTotalCosts(start time.Time, end time.Time) (map[time.Time]flexibleCUDCostTotals, error) {
  150. queryFmt := `
  151. SELECT
  152. TIMESTAMP_TRUNC(usage_start_time, day) as usage_date,
  153. sum(cost),
  154. IFNULL(SUM((Select SUM(amount) FROM bd.credits)),0),
  155. FROM %s
  156. WHERE %s
  157. GROUP BY usage_date, sku.description
  158. `
  159. table := fmt.Sprintf(" `%s` bd ", bqi.GetBillingDataDataset())
  160. whereConjuncts := GetWhereConjuncts(start, end)
  161. whereConjuncts = append(whereConjuncts, "sku.description like 'Commitment - dollar based v1:%'")
  162. whereClause := strings.Join(whereConjuncts, " AND ")
  163. query := fmt.Sprintf(queryFmt, table, whereClause)
  164. iter, err := bqi.Query(context.Background(), query)
  165. if err != nil {
  166. return nil, fmt.Errorf("queryCUDAmountPayed: query error %w", err)
  167. }
  168. var loader FlexibleCUDCostTotalsLoader
  169. for {
  170. err = iter.Next(&loader)
  171. if errors.Is(err, iterator.Done) {
  172. break
  173. }
  174. if err != nil {
  175. return nil, fmt.Errorf("queryCUDAmountPayed: load error %w", err)
  176. }
  177. }
  178. return loader.values, nil
  179. }
  180. func (bqi *BigQueryIntegration) queryFlexibleCUDTotalCredits(start time.Time, end time.Time) (map[time.Time]float64, error) {
  181. queryFmt := `SELECT
  182. TIMESTAMP_TRUNC(usage_start_time, day) as usage_date,
  183. sum(credits.amount)
  184. FROM %s
  185. CROSS JOIN UNNEST(bd.credits) AS credits
  186. WHERE %s
  187. GROUP BY usage_date, credits.id
  188. `
  189. table := fmt.Sprintf(" `%s` bd ", bqi.GetBillingDataDataset())
  190. whereConjuncts := GetWhereConjuncts(start, end)
  191. whereConjuncts = append(whereConjuncts, "credits.type = 'COMMITTED_USAGE_DISCOUNT_DOLLAR_BASE'")
  192. whereClause := strings.Join(whereConjuncts, " AND ")
  193. query := fmt.Sprintf(queryFmt, table, whereClause)
  194. iter, err := bqi.Query(context.Background(), query)
  195. if err != nil {
  196. return nil, fmt.Errorf("queryFlexibleCUDTotalCredits: query error %w", err)
  197. }
  198. var loader FlexibleCUDCreditTotalsLoader
  199. for {
  200. err = iter.Next(&loader)
  201. if errors.Is(err, iterator.Done) {
  202. break
  203. }
  204. if err != nil {
  205. return nil, fmt.Errorf("queryFlexibleCUDTotalCredits: load error %w", err)
  206. }
  207. }
  208. return loader.values, nil
  209. }