bigqueryintegration.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. package gcp
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "strings"
  7. "time"
  8. "github.com/opencost/opencost/core/pkg/log"
  9. "github.com/opencost/opencost/core/pkg/opencost"
  10. "google.golang.org/api/iterator"
  11. )
  12. type BigQueryIntegration struct {
  13. BigQueryQuerier
  14. }
  15. const (
  16. UsageDateColumnName = "usage_date"
  17. BillingAccountIDColumnName = "billing_id"
  18. ProjectIDColumnName = "project_id"
  19. ServiceDescriptionColumnName = "service"
  20. SKUDescriptionColumnName = "description"
  21. LabelsColumnName = "labels"
  22. ResourceNameColumnName = "resource"
  23. CostColumnName = "cost"
  24. ListCostColumnName = "list_cost"
  25. CreditsColumnName = "credits"
  26. )
  27. const BiqQueryWherePartitionFmt = `DATE(_PARTITIONTIME) >= "%s" AND DATE(_PARTITIONTIME) < "%s"`
  28. const BiqQueryWhereDateFmt = `usage_start_time >= "%s" AND usage_start_time < "%s"`
  29. func (bqi *BigQueryIntegration) GetCloudCost(start time.Time, end time.Time) (*opencost.CloudCostSetRange, error) {
  30. cudRates, err := bqi.GetFlexibleCUDRates(start, end)
  31. if err != nil {
  32. return nil, fmt.Errorf("error retrieving CUD rates: %w", err)
  33. }
  34. // Build Query
  35. selectColumns := []string{
  36. fmt.Sprintf("TIMESTAMP_TRUNC(usage_start_time, day) as %s", UsageDateColumnName),
  37. fmt.Sprintf("billing_account_id as %s", BillingAccountIDColumnName),
  38. fmt.Sprintf("project.id as %s", ProjectIDColumnName),
  39. fmt.Sprintf("service.description as %s", ServiceDescriptionColumnName),
  40. fmt.Sprintf("sku.description as %s", SKUDescriptionColumnName),
  41. fmt.Sprintf("resource.name as %s", ResourceNameColumnName),
  42. fmt.Sprintf("TO_JSON_STRING(labels) as %s", LabelsColumnName),
  43. fmt.Sprintf("SUM(cost) as %s", CostColumnName),
  44. fmt.Sprintf("SUM(cost_at_list) as %s", ListCostColumnName),
  45. fmt.Sprintf("ARRAY_CONCAT_AGG(credits) as %s", CreditsColumnName),
  46. }
  47. groupByColumns := []string{
  48. UsageDateColumnName,
  49. BillingAccountIDColumnName,
  50. ProjectIDColumnName,
  51. ServiceDescriptionColumnName,
  52. SKUDescriptionColumnName,
  53. LabelsColumnName,
  54. ResourceNameColumnName,
  55. }
  56. whereConjuncts := GetWhereConjuncts(start, end)
  57. columnStr := strings.Join(selectColumns, ", ")
  58. table := fmt.Sprintf(" `%s` bd ", bqi.GetBillingDataDataset())
  59. whereClause := strings.Join(whereConjuncts, " AND ")
  60. groupByStr := strings.Join(groupByColumns, ", ")
  61. queryStr := `
  62. SELECT %s
  63. FROM %s
  64. WHERE %s
  65. GROUP BY %s
  66. `
  67. querystr := fmt.Sprintf(queryStr, columnStr, table, whereClause, groupByStr)
  68. // Perform Query and parse values
  69. ccsr, err := opencost.NewCloudCostSetRange(start, end, opencost.AccumulateOptionDay, bqi.Key())
  70. if err != nil {
  71. return ccsr, fmt.Errorf("error creating new CloudCostSetRange: %s", err)
  72. }
  73. iter, err := bqi.Query(context.Background(), querystr)
  74. if err != nil {
  75. return ccsr, fmt.Errorf("error querying: %s", err)
  76. }
  77. // Parse query into CloudCostSetRange
  78. for {
  79. ccl := CloudCostLoader{
  80. FlexibleCUDRates: cudRates,
  81. }
  82. err = iter.Next(&ccl)
  83. if err == iterator.Done {
  84. break
  85. }
  86. if err != nil {
  87. return ccsr, err
  88. }
  89. if ccl.CloudCost == nil {
  90. continue
  91. }
  92. ccsr.LoadCloudCost(ccl.CloudCost)
  93. }
  94. return ccsr, nil
  95. }
  96. // GetWhereConjuncts creates a list of Where filter statements that filter for usage start date and partition time
  97. // additional filters can be added before combining into the final where clause
  98. func GetWhereConjuncts(start time.Time, end time.Time) []string {
  99. partitionStart := start
  100. partitionEnd := end.AddDate(0, 0, 2)
  101. wherePartition := fmt.Sprintf(BiqQueryWherePartitionFmt, partitionStart.Format("2006-01-02"), partitionEnd.Format("2006-01-02"))
  102. whereDate := fmt.Sprintf(BiqQueryWhereDateFmt, start.Format("2006-01-02"), end.Format("2006-01-02"))
  103. return []string{wherePartition, whereDate}
  104. }
  105. // FlexibleCUDRates are the total amount paid / total amount credited per day for all Flexible CUDs. Since credited will be a negative value
  106. // this will be a negative ratio. This can then be multiplied with the credits from Flexible CUDs on specific line items to determine
  107. // the amount paid for the credit it received. This allows us to amortize the Flexible CUD costs which are not associated with resources
  108. // in the billing export. AmountPayed itself may have some credits on it so a Rate and a NetRate are created.
  109. // Having both allow us to populate AmortizedCost and AmortizedNetCost respectively.
  110. type FlexibleCUDRates struct {
  111. NetRate float64
  112. Rate float64
  113. }
  114. // GetFlexibleCUDRates returns a map of FlexibleCUDRates keyed on the start time of the day which those
  115. // FlexibleCUDRates were derived from.
  116. func (bqi *BigQueryIntegration) GetFlexibleCUDRates(start time.Time, end time.Time) (map[time.Time]FlexibleCUDRates, error) {
  117. costsByDate, err := bqi.queryFlexibleCUDTotalCosts(start, end)
  118. if err != nil {
  119. return nil, fmt.Errorf("GetFlexibleCUDRates: %w", err)
  120. }
  121. creditsByDate, err := bqi.queryFlexibleCUDTotalCredits(start, end)
  122. if err != nil {
  123. return nil, fmt.Errorf("GetFlexibleCUDRates: %w", err)
  124. }
  125. results := map[time.Time]FlexibleCUDRates{}
  126. for date, amountCredited := range creditsByDate {
  127. // Protection against divide by zero
  128. if amountCredited == 0 {
  129. log.Warnf("GetFlexibleCUDRates: 0 value total credit for Flexible CUDs for date %s", date.Format(time.RFC3339))
  130. continue
  131. }
  132. amountPayed, ok := costsByDate[date]
  133. if !ok {
  134. log.Warnf("GetFlexibleCUDRates: could not find Flexible CUD payments for date %s", date.Format(time.RFC3339))
  135. continue
  136. }
  137. // amountPayed itself may have some credits on it so a Rate and a NetRate are created.
  138. // Having both allow us to populate AmortizedCost and AmortizedNetCost respectively.
  139. results[date] = FlexibleCUDRates{
  140. NetRate: (amountPayed.cost + amountPayed.credits) / amountCredited,
  141. Rate: amountPayed.cost / amountCredited,
  142. }
  143. }
  144. return results, nil
  145. }
  146. func (bqi *BigQueryIntegration) queryFlexibleCUDTotalCosts(start time.Time, end time.Time) (map[time.Time]flexibleCUDCostTotals, error) {
  147. queryFmt := `
  148. SELECT
  149. TIMESTAMP_TRUNC(usage_start_time, day) as usage_date,
  150. sum(cost),
  151. IFNULL(SUM((Select SUM(amount) FROM bd.credits)),0),
  152. FROM %s
  153. WHERE %s
  154. GROUP BY usage_date, sku.description
  155. `
  156. table := fmt.Sprintf(" `%s` bd ", bqi.GetBillingDataDataset())
  157. whereConjuncts := GetWhereConjuncts(start, end)
  158. whereConjuncts = append(whereConjuncts, "sku.description like 'Commitment - dollar based v1:%'")
  159. whereClause := strings.Join(whereConjuncts, " AND ")
  160. query := fmt.Sprintf(queryFmt, table, whereClause)
  161. iter, err := bqi.Query(context.Background(), query)
  162. if err != nil {
  163. return nil, fmt.Errorf("queryCUDAmountPayed: query error %w", err)
  164. }
  165. var loader FlexibleCUDCostTotalsLoader
  166. for {
  167. err = iter.Next(&loader)
  168. if errors.Is(err, iterator.Done) {
  169. break
  170. }
  171. if err != nil {
  172. return nil, fmt.Errorf("queryCUDAmountPayed: load error %w", err)
  173. }
  174. }
  175. return loader.values, nil
  176. }
  177. func (bqi *BigQueryIntegration) queryFlexibleCUDTotalCredits(start time.Time, end time.Time) (map[time.Time]float64, error) {
  178. queryFmt := `SELECT
  179. TIMESTAMP_TRUNC(usage_start_time, day) as usage_date,
  180. sum(credits.amount)
  181. FROM %s
  182. CROSS JOIN UNNEST(bd.credits) AS credits
  183. WHERE %s
  184. GROUP BY usage_date, credits.id
  185. `
  186. table := fmt.Sprintf(" `%s` bd ", bqi.GetBillingDataDataset())
  187. whereConjuncts := GetWhereConjuncts(start, end)
  188. whereConjuncts = append(whereConjuncts, "credits.type = 'COMMITTED_USAGE_DISCOUNT_DOLLAR_BASE'")
  189. whereClause := strings.Join(whereConjuncts, " AND ")
  190. query := fmt.Sprintf(queryFmt, table, whereClause)
  191. iter, err := bqi.Query(context.Background(), query)
  192. if err != nil {
  193. return nil, fmt.Errorf("queryFlexibleCUDTotalCredits: query error %w", err)
  194. }
  195. var loader FlexibleCUDCreditTotalsLoader
  196. for {
  197. err = iter.Next(&loader)
  198. if errors.Is(err, iterator.Done) {
  199. break
  200. }
  201. if err != nil {
  202. return nil, fmt.Errorf("queryFlexibleCUDTotalCredits: load error %w", err)
  203. }
  204. }
  205. return loader.values, nil
  206. }