bigqueryintegration.go 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. package gcp
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "strings"
  7. "time"
  8. "github.com/opencost/opencost/core/pkg/log"
  9. "github.com/opencost/opencost/core/pkg/opencost"
  10. "github.com/opencost/opencost/core/pkg/util/timeutil"
  11. "github.com/opencost/opencost/pkg/cloud"
  12. "google.golang.org/api/iterator"
  13. )
  14. type BigQueryIntegration struct {
  15. BigQueryQuerier
  16. }
  17. const (
  18. UsageDateColumnName = "usage_date"
  19. BillingAccountIDColumnName = "billing_id"
  20. ProjectIDColumnName = "project_id"
  21. ProjectNameColumnName = "project_name"
  22. RegionColumnName = "region"
  23. ZoneColumnName = "zone"
  24. ServiceDescriptionColumnName = "service"
  25. SKUDescriptionColumnName = "description"
  26. LabelsColumnName = "labels"
  27. ProjectLabelsColumnName = "project_labels"
  28. ResourceNameColumnName = "resource"
  29. ResourceGlobalNameColumnName = "global_resource"
  30. CostColumnName = "cost"
  31. ListCostColumnName = "list_cost"
  32. CreditsColumnName = "credits"
  33. )
  34. const BiqQueryWherePartitionFmt = `DATE(_PARTITIONTIME) >= "%s" AND DATE(_PARTITIONTIME) < "%s"`
  35. const BiqQueryWhereDateFmt = `usage_start_time >= "%s" AND usage_start_time < "%s"`
  36. func (bqi *BigQueryIntegration) GetCloudCost(start time.Time, end time.Time) (*opencost.CloudCostSetRange, error) {
  37. return bqi.getCloudCost(start, end, 0)
  38. }
  39. func (bqi *BigQueryIntegration) RefreshStatus() cloud.ConnectionStatus {
  40. end := time.Now().UTC().Truncate(timeutil.Day)
  41. start := end.Add(-7 * timeutil.Day)
  42. // the call to Query within getCloudCost already sets ConnectionStatus in the event there is no error, so we don't
  43. // need to handle the positive case here
  44. _, err := bqi.getCloudCost(start, end, 1)
  45. if err != nil {
  46. bqi.ConnectionStatus = cloud.FailedConnection
  47. }
  48. return bqi.ConnectionStatus
  49. }
  50. func (bqi *BigQueryIntegration) getCloudCost(start time.Time, end time.Time, limit int) (*opencost.CloudCostSetRange, error) {
  51. cudRates, err := bqi.GetFlexibleCUDRates(start, end)
  52. if err != nil {
  53. return nil, fmt.Errorf("error retrieving CUD rates: %w", err)
  54. }
  55. // Build Query
  56. selectColumns := []string{
  57. fmt.Sprintf("TIMESTAMP_TRUNC(usage_start_time, day) as %s", UsageDateColumnName),
  58. fmt.Sprintf("billing_account_id as %s", BillingAccountIDColumnName),
  59. fmt.Sprintf("project.id as %s", ProjectIDColumnName),
  60. fmt.Sprintf("project.name as %s", ProjectNameColumnName),
  61. fmt.Sprintf("location.region as %s", RegionColumnName),
  62. fmt.Sprintf("location.zone as %s", ZoneColumnName),
  63. fmt.Sprintf("service.description as %s", ServiceDescriptionColumnName),
  64. fmt.Sprintf("sku.description as %s", SKUDescriptionColumnName),
  65. fmt.Sprintf("resource.name as %s", ResourceNameColumnName),
  66. fmt.Sprintf("resource.global_name as %s", ResourceGlobalNameColumnName),
  67. fmt.Sprintf("TO_JSON_STRING(labels) as %s", LabelsColumnName),
  68. fmt.Sprintf("SUM(cost) as %s", CostColumnName),
  69. fmt.Sprintf("SUM(cost_at_list) as %s", ListCostColumnName),
  70. fmt.Sprintf("ARRAY_CONCAT_AGG(credits) as %s", CreditsColumnName),
  71. }
  72. groupByColumns := []string{
  73. UsageDateColumnName,
  74. BillingAccountIDColumnName,
  75. ProjectIDColumnName,
  76. ProjectNameColumnName,
  77. RegionColumnName,
  78. ZoneColumnName,
  79. ServiceDescriptionColumnName,
  80. SKUDescriptionColumnName,
  81. LabelsColumnName,
  82. ResourceNameColumnName,
  83. ResourceGlobalNameColumnName,
  84. }
  85. whereConjuncts := GetWhereConjuncts(start, end, !bqi.ExcludePartitionTime)
  86. columnStr := strings.Join(selectColumns, ", ")
  87. table := fmt.Sprintf(" `%s` bd ", bqi.GetBillingDataDataset())
  88. whereClause := strings.Join(whereConjuncts, " AND ")
  89. groupByStr := strings.Join(groupByColumns, ", ")
  90. queryStr := `
  91. SELECT %s
  92. FROM %s
  93. WHERE %s
  94. GROUP BY %s
  95. `
  96. if limit > 0 {
  97. queryStr = fmt.Sprintf("%s LIMIT %d", queryStr, limit)
  98. }
  99. querystr := fmt.Sprintf(queryStr, columnStr, table, whereClause, groupByStr)
  100. // Perform Query and parse values
  101. ccsr, err := opencost.NewCloudCostSetRange(start, end, opencost.AccumulateOptionDay, bqi.Key())
  102. if err != nil {
  103. return ccsr, fmt.Errorf("error creating new CloudCostSetRange: %s", err)
  104. }
  105. iter, err := bqi.Query(context.Background(), querystr)
  106. if err != nil {
  107. return ccsr, fmt.Errorf("error querying: %s", err)
  108. }
  109. // Parse query into CloudCostSetRange
  110. for {
  111. ccl := CloudCostLoader{
  112. FlexibleCUDRates: cudRates,
  113. }
  114. err = iter.Next(&ccl)
  115. if err == iterator.Done {
  116. break
  117. }
  118. if err != nil {
  119. return ccsr, err
  120. }
  121. if ccl.CloudCost == nil {
  122. continue
  123. }
  124. ccsr.LoadCloudCost(ccl.CloudCost)
  125. }
  126. return ccsr, nil
  127. }
  128. // GetWhereConjuncts creates a list of Where filter statements that filter for usage start date and partition time
  129. // additional filters can be added before combining into the final where clause
  130. func GetWhereConjuncts(start time.Time, end time.Time, includePartition bool) []string {
  131. partitionStart := start
  132. partitionEnd := end.AddDate(0, 0, 2)
  133. conjuncts := []string{}
  134. if includePartition {
  135. wherePartition := fmt.Sprintf(BiqQueryWherePartitionFmt, partitionStart.Format("2006-01-02"), partitionEnd.Format("2006-01-02"))
  136. conjuncts = append(conjuncts, wherePartition)
  137. }
  138. whereDate := fmt.Sprintf(BiqQueryWhereDateFmt, start.Format("2006-01-02"), end.Format("2006-01-02"))
  139. conjuncts = append(conjuncts, whereDate)
  140. return conjuncts
  141. }
  142. // FlexibleCUDRates are the total amount paid / total amount credited per day for all Flexible CUDs. Since credited will be a negative value
  143. // this will be a negative ratio. This can then be multiplied with the credits from Flexible CUDs on specific line items to determine
  144. // the amount paid for the credit it received. This allows us to amortize the Flexible CUD costs which are not associated with resources
  145. // in the billing export. AmountPayed itself may have some credits on it so a Rate and a NetRate are created.
  146. // Having both allow us to populate AmortizedCost and AmortizedNetCost respectively.
  147. type FlexibleCUDRates struct {
  148. NetRate float64
  149. Rate float64
  150. }
  151. // GetFlexibleCUDRates returns a map of FlexibleCUDRates keyed on the start time of the day which those
  152. // FlexibleCUDRates were derived from.
  153. func (bqi *BigQueryIntegration) GetFlexibleCUDRates(start time.Time, end time.Time) (map[time.Time]FlexibleCUDRates, error) {
  154. costsByDate, err := bqi.queryFlexibleCUDTotalCosts(start, end)
  155. if err != nil {
  156. return nil, fmt.Errorf("GetFlexibleCUDRates: %w", err)
  157. }
  158. creditsByDate, err := bqi.queryFlexibleCUDTotalCredits(start, end)
  159. if err != nil {
  160. return nil, fmt.Errorf("GetFlexibleCUDRates: %w", err)
  161. }
  162. results := map[time.Time]FlexibleCUDRates{}
  163. for date, amountCredited := range creditsByDate {
  164. // Protection against divide by zero
  165. if amountCredited == 0 {
  166. log.Warnf("GetFlexibleCUDRates: 0 value total credit for Flexible CUDs for date %s", date.Format(time.RFC3339))
  167. continue
  168. }
  169. amountPayed, ok := costsByDate[date]
  170. if !ok {
  171. log.Warnf("GetFlexibleCUDRates: could not find Flexible CUD payments for date %s", date.Format(time.RFC3339))
  172. continue
  173. }
  174. // amountPayed itself may have some credits on it so a Rate and a NetRate are created.
  175. // Having both allow us to populate AmortizedCost and AmortizedNetCost respectively.
  176. results[date] = FlexibleCUDRates{
  177. NetRate: (amountPayed.cost + amountPayed.credits) / amountCredited,
  178. Rate: amountPayed.cost / amountCredited,
  179. }
  180. }
  181. return results, nil
  182. }
  183. func (bqi *BigQueryIntegration) queryFlexibleCUDTotalCosts(start time.Time, end time.Time) (map[time.Time]flexibleCUDCostTotals, error) {
  184. queryFmt := `
  185. SELECT
  186. TIMESTAMP_TRUNC(usage_start_time, day) as usage_date,
  187. sum(cost),
  188. IFNULL(SUM((Select SUM(amount) FROM bd.credits)),0),
  189. FROM %s
  190. WHERE %s
  191. GROUP BY usage_date
  192. `
  193. table := fmt.Sprintf(" `%s` bd ", bqi.GetBillingDataDataset())
  194. whereConjuncts := GetWhereConjuncts(start, end, !bqi.ExcludePartitionTime)
  195. whereConjuncts = append(whereConjuncts, "sku.description like 'Commitment - dollar based v1:%'")
  196. whereClause := strings.Join(whereConjuncts, " AND ")
  197. query := fmt.Sprintf(queryFmt, table, whereClause)
  198. iter, err := bqi.Query(context.Background(), query)
  199. if err != nil {
  200. return nil, fmt.Errorf("queryCUDAmountPayed: query error %w", err)
  201. }
  202. var loader FlexibleCUDCostTotalsLoader
  203. for {
  204. err = iter.Next(&loader)
  205. if errors.Is(err, iterator.Done) {
  206. break
  207. }
  208. if err != nil {
  209. return nil, fmt.Errorf("queryCUDAmountPayed: load error %w", err)
  210. }
  211. }
  212. return loader.values, nil
  213. }
  214. func (bqi *BigQueryIntegration) queryFlexibleCUDTotalCredits(start time.Time, end time.Time) (map[time.Time]float64, error) {
  215. queryFmt := `SELECT
  216. TIMESTAMP_TRUNC(usage_start_time, day) as usage_date,
  217. sum(credits.amount)
  218. FROM %s
  219. CROSS JOIN UNNEST(bd.credits) AS credits
  220. WHERE %s
  221. GROUP BY usage_date
  222. `
  223. table := fmt.Sprintf(" `%s` bd ", bqi.GetBillingDataDataset())
  224. whereConjuncts := GetWhereConjuncts(start, end, !bqi.ExcludePartitionTime)
  225. whereConjuncts = append(whereConjuncts, "credits.type = 'COMMITTED_USAGE_DISCOUNT_DOLLAR_BASE'")
  226. whereClause := strings.Join(whereConjuncts, " AND ")
  227. query := fmt.Sprintf(queryFmt, table, whereClause)
  228. iter, err := bqi.Query(context.Background(), query)
  229. if err != nil {
  230. return nil, fmt.Errorf("queryFlexibleCUDTotalCredits: query error %w", err)
  231. }
  232. var loader FlexibleCUDCreditTotalsLoader
  233. for {
  234. err = iter.Next(&loader)
  235. if errors.Is(err, iterator.Done) {
  236. break
  237. }
  238. if err != nil {
  239. return nil, fmt.Errorf("queryFlexibleCUDTotalCredits: load error %w", err)
  240. }
  241. }
  242. return loader.values, nil
  243. }