bigqueryintegration.go 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. package gcp
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "regexp"
  7. "strings"
  8. "time"
  9. "cloud.google.com/go/bigquery"
  10. "github.com/opencost/opencost/pkg/kubecost"
  11. "github.com/opencost/opencost/pkg/log"
  12. "github.com/opencost/opencost/pkg/util/timeutil"
  13. "google.golang.org/api/iterator"
  14. )
  15. type BigQueryIntegration struct {
  16. BigQueryQuerier
  17. }
  18. const (
  19. UsageDateColumnName = "usage_date"
  20. BillingAccountIDColumnName = "billing_id"
  21. ProjectIDColumnName = "project_id"
  22. ServiceDescriptionColumnName = "service"
  23. SKUDescriptionColumnName = "description"
  24. LabelsColumnName = "labels"
  25. ResourceNameColumnName = "resource"
  26. CostColumnName = "cost"
  27. CreditsColumnName = "credits"
  28. )
  29. const BiqQueryWherePartitionFmt = `DATE(_PARTITIONTIME) >= "%s" AND DATE(_PARTITIONTIME) < "%s"`
  30. const BiqQueryWhereDateFmt = `usage_start_time >= "%s" AND usage_start_time < "%s"`
  31. func (bqi *BigQueryIntegration) GetCloudCost(start time.Time, end time.Time) (*kubecost.CloudCostSetRange, error) {
  32. // Build Query
  33. selectColumns := []string{
  34. fmt.Sprintf("TIMESTAMP_TRUNC(usage_start_time, day) as %s", UsageDateColumnName),
  35. fmt.Sprintf("billing_account_id as %s", BillingAccountIDColumnName),
  36. fmt.Sprintf("project.id as %s", ProjectIDColumnName),
  37. fmt.Sprintf("service.description as %s", ServiceDescriptionColumnName),
  38. fmt.Sprintf("sku.description as %s", SKUDescriptionColumnName),
  39. fmt.Sprintf("resource.name as %s", ResourceNameColumnName),
  40. fmt.Sprintf("TO_JSON_STRING(labels) as %s", LabelsColumnName),
  41. fmt.Sprintf("SUM(cost) as %s", CostColumnName),
  42. fmt.Sprintf("IFNULL(SUM((Select SUM(amount) FROM bd.credits)),0) as %s", CreditsColumnName),
  43. }
  44. groupByColumns := []string{
  45. UsageDateColumnName,
  46. BillingAccountIDColumnName,
  47. ProjectIDColumnName,
  48. ServiceDescriptionColumnName,
  49. SKUDescriptionColumnName,
  50. LabelsColumnName,
  51. ResourceNameColumnName,
  52. }
  53. partitionStart := start
  54. partitionEnd := end.AddDate(0, 0, 2)
  55. wherePartition := fmt.Sprintf(BiqQueryWherePartitionFmt, partitionStart.Format("2006-01-02"), partitionEnd.Format("2006-01-02"))
  56. whereDate := fmt.Sprintf(BiqQueryWhereDateFmt, start.Format("2006-01-02"), end.Format("2006-01-02"))
  57. whereConjuncts := []string{
  58. wherePartition,
  59. whereDate,
  60. }
  61. columnStr := strings.Join(selectColumns, ", ")
  62. table := fmt.Sprintf(" `%s` bd ", bqi.GetBillingDataDataset())
  63. whereClause := strings.Join(whereConjuncts, " AND ")
  64. groupByStr := strings.Join(groupByColumns, ", ")
  65. queryStr := `
  66. SELECT %s
  67. FROM %s
  68. WHERE %s
  69. GROUP BY %s
  70. `
  71. querystr := fmt.Sprintf(queryStr, columnStr, table, whereClause, groupByStr)
  72. // Perform Query and parse values
  73. ccsr, err := kubecost.NewCloudCostSetRange(start, end, timeutil.Day, bqi.Key())
  74. if err != nil {
  75. return ccsr, fmt.Errorf("error creating new CloudCostSetRange: %s", err)
  76. }
  77. iter, err := bqi.Query(context.Background(), querystr)
  78. if err != nil {
  79. return ccsr, fmt.Errorf("error querying: %s", err)
  80. }
  81. // Parse query into CloudCostSetRange
  82. for {
  83. var ccl CloudCostLoader
  84. err = iter.Next(&ccl)
  85. if err == iterator.Done {
  86. break
  87. }
  88. if err != nil {
  89. return ccsr, err
  90. }
  91. if ccl.CloudCost == nil {
  92. continue
  93. }
  94. ccsr.LoadCloudCost(ccl.CloudCost)
  95. }
  96. return ccsr, nil
  97. }
  98. type CloudCostLoader struct {
  99. CloudCost *kubecost.CloudCost
  100. }
  101. // Load populates the fields of a CloudCostValues with bigquery.Value from provided slice
  102. func (ccl *CloudCostLoader) Load(values []bigquery.Value, schema bigquery.Schema) error {
  103. // Create Cloud Cost Properties
  104. properties := kubecost.CloudCostProperties{
  105. Provider: kubecost.GCPProvider,
  106. }
  107. var window kubecost.Window
  108. var description string
  109. var listCost float64
  110. var credits float64
  111. for i, field := range schema {
  112. if field == nil {
  113. log.DedupedErrorf(5, "GCP: BigQuery: found nil field in schema")
  114. continue
  115. }
  116. switch field.Name {
  117. case UsageDateColumnName:
  118. usageDate, ok := values[i].(time.Time)
  119. if !ok {
  120. // It would be very surprising if an unparsable time came back from the API, so it should be ok to return here.
  121. return fmt.Errorf("error parsing usage date: %v", values[0])
  122. }
  123. // start and end will be the day that the usage occurred on
  124. s := usageDate
  125. e := s.Add(timeutil.Day)
  126. window = kubecost.NewWindow(&s, &e)
  127. case BillingAccountIDColumnName:
  128. invoiceEntityID, ok := values[i].(string)
  129. if !ok {
  130. log.DedupedErrorf(5, "error parsing GCP CloudCost %s: %v", BillingAccountIDColumnName, values[i])
  131. invoiceEntityID = ""
  132. }
  133. properties.InvoiceEntityID = invoiceEntityID
  134. case ProjectIDColumnName:
  135. accountID, ok := values[i].(string)
  136. if !ok {
  137. log.DedupedErrorf(5, "error parsing GCP CloudCost %s: %v", ProjectIDColumnName, values[i])
  138. accountID = ""
  139. }
  140. properties.AccountID = accountID
  141. case ServiceDescriptionColumnName:
  142. service, ok := values[i].(string)
  143. if !ok {
  144. log.DedupedErrorf(5, "error parsing GCP CloudCost %s: %v", ServiceDescriptionColumnName, values[i])
  145. service = ""
  146. }
  147. properties.Service = service
  148. case SKUDescriptionColumnName:
  149. d, ok := values[i].(string)
  150. if !ok {
  151. log.DedupedErrorf(5, "error parsing GCP CloudCost %s: %v", SKUDescriptionColumnName, values[i])
  152. d = ""
  153. }
  154. description = d
  155. case LabelsColumnName:
  156. labelJSON, ok := values[i].(string)
  157. if !ok {
  158. log.DedupedErrorf(5, "error parsing GCP CloudCost %s: %v", LabelsColumnName, values[i])
  159. }
  160. labelList := []map[string]string{}
  161. err := json.Unmarshal([]byte(labelJSON), &labelList)
  162. if err != nil {
  163. log.Warnf("GCP Cloud Assets: error unmarshaling GCP CloudCost labels: %s", err)
  164. }
  165. labels := map[string]string{}
  166. for _, pair := range labelList {
  167. key := pair["key"]
  168. value := pair["value"]
  169. labels[key] = value
  170. }
  171. properties.Labels = labels
  172. case ResourceNameColumnName:
  173. resouceNameValue := values[i]
  174. if resouceNameValue == nil {
  175. properties.ProviderID = ""
  176. continue
  177. }
  178. resource, ok := resouceNameValue.(string)
  179. if !ok {
  180. log.DedupedErrorf(5, "error parsing GCP CloudCost %s: %v", ResourceNameColumnName, values[i])
  181. properties.ProviderID = ""
  182. continue
  183. }
  184. properties.ProviderID = ParseProviderID(resource)
  185. case CostColumnName:
  186. cost, ok := values[i].(float64)
  187. if !ok {
  188. log.DedupedErrorf(5, "error parsing GCP CloudCost %s: %v", CostColumnName, values[i])
  189. cost = 0.0
  190. }
  191. listCost = cost
  192. case CreditsColumnName:
  193. creditSum, ok := values[i].(float64)
  194. if !ok {
  195. log.DedupedErrorf(5, "error parsing GCP CloudCost %s: %v", CreditsColumnName, values[i])
  196. creditSum = 0.0
  197. }
  198. credits = creditSum
  199. default:
  200. log.DedupedErrorf(5, "GCP: BigQuery: found unrecognized column name %s", field.Name)
  201. }
  202. }
  203. // Check required Fields
  204. if window.IsOpen() {
  205. return fmt.Errorf("GCP: BigQuery: error parsing, item had invalid window")
  206. }
  207. // Determine Category
  208. properties.Category = SelectCategory(properties.Service, description)
  209. // sum credit and cost for NetCost
  210. netCost := listCost + credits
  211. // Using the NetCost as a 'placeholder' for these costs now, until we can revisit and spend the time to do
  212. // the calculations correctly
  213. amortizedCost := netCost
  214. amortizedNetCost := netCost
  215. invoicedCost := netCost
  216. // percent k8s is determined by the presence of labels
  217. k8sPercent := 0.0
  218. if IsK8s(properties.Labels) {
  219. k8sPercent = 1.0
  220. }
  221. ccl.CloudCost = &kubecost.CloudCost{
  222. Properties: &properties,
  223. Window: window,
  224. ListCost: kubecost.CostMetric{
  225. Cost: listCost,
  226. KubernetesPercent: k8sPercent,
  227. },
  228. AmortizedCost: kubecost.CostMetric{
  229. Cost: amortizedCost,
  230. KubernetesPercent: k8sPercent,
  231. },
  232. AmortizedNetCost: kubecost.CostMetric{
  233. Cost: amortizedNetCost,
  234. KubernetesPercent: k8sPercent,
  235. },
  236. InvoicedCost: kubecost.CostMetric{
  237. Cost: invoicedCost,
  238. KubernetesPercent: k8sPercent,
  239. },
  240. NetCost: kubecost.CostMetric{
  241. Cost: netCost,
  242. KubernetesPercent: k8sPercent,
  243. },
  244. }
  245. return nil
  246. }
  247. func IsK8s(labels map[string]string) bool {
  248. if _, ok := labels["goog-gke-volume"]; ok {
  249. return true
  250. }
  251. if _, ok := labels["goog-gke-node"]; ok {
  252. return true
  253. }
  254. if _, ok := labels["goog-k8s-cluster-name"]; ok {
  255. return true
  256. }
  257. return false
  258. }
  259. var parseProviderIDRx = regexp.MustCompile("^.+\\/(.+)?") // Capture "gke-cluster-3-default-pool-xxxx-yy" from "projects/###/instances/gke-cluster-3-default-pool-xxxx-yy"
  260. func ParseProviderID(id string) string {
  261. match := parseProviderIDRx.FindStringSubmatch(id)
  262. if len(match) == 0 {
  263. return id
  264. }
  265. return match[len(match)-1]
  266. }
  267. func SelectCategory(service, description string) string {
  268. s := strings.ToLower(service)
  269. d := strings.ToLower(description)
  270. // Network descriptions
  271. if strings.Contains(d, "download") {
  272. return kubecost.NetworkCategory
  273. }
  274. if strings.Contains(d, "network") {
  275. return kubecost.NetworkCategory
  276. }
  277. if strings.Contains(d, "ingress") {
  278. return kubecost.NetworkCategory
  279. }
  280. if strings.Contains(d, "egress") {
  281. return kubecost.NetworkCategory
  282. }
  283. if strings.Contains(d, "static ip") {
  284. return kubecost.NetworkCategory
  285. }
  286. if strings.Contains(d, "external ip") {
  287. return kubecost.NetworkCategory
  288. }
  289. if strings.Contains(d, "load balanced") {
  290. return kubecost.NetworkCategory
  291. }
  292. if strings.Contains(d, "licensing fee") {
  293. return kubecost.OtherCategory
  294. }
  295. // Storage Descriptions
  296. if strings.Contains(d, "storage") {
  297. return kubecost.StorageCategory
  298. }
  299. if strings.Contains(d, "pd capacity") {
  300. return kubecost.StorageCategory
  301. }
  302. if strings.Contains(d, "pd iops") {
  303. return kubecost.StorageCategory
  304. }
  305. if strings.Contains(d, "pd snapshot") {
  306. return kubecost.StorageCategory
  307. }
  308. // Service Defaults
  309. if strings.Contains(s, "storage") {
  310. return kubecost.StorageCategory
  311. }
  312. if strings.Contains(s, "compute") {
  313. return kubecost.ComputeCategory
  314. }
  315. if strings.Contains(s, "sql") {
  316. return kubecost.StorageCategory
  317. }
  318. if strings.Contains(s, "bigquery") {
  319. return kubecost.StorageCategory
  320. }
  321. if strings.Contains(s, "kubernetes") {
  322. return kubecost.ManagementCategory
  323. } else if strings.Contains(s, "pub/sub") {
  324. return kubecost.NetworkCategory
  325. }
  326. return kubecost.OtherCategory
  327. }