athenaintegration.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471
  1. package aws
  2. import (
  3. "context"
  4. "fmt"
  5. "strconv"
  6. "strings"
  7. "time"
  8. "github.com/aws/aws-sdk-go-v2/service/athena/types"
  9. "github.com/opencost/opencost/core/pkg/log"
  10. "github.com/opencost/opencost/core/pkg/opencost"
  11. "github.com/opencost/opencost/pkg/cloud"
  12. )
  13. const LabelColumnPrefix = "resource_tags_user_"
  14. const AWSLabelColumnPrefix = "resource_tags_aws_"
  15. const AthenaResourceTagPrefix = "resource_tags_"
  16. // athenaDateLayout is the default AWS date format
  17. const AthenaDateLayout = "2006-01-02 15:04:05.000"
  18. // Cost Columns
  19. const AthenaPricingColumn = "line_item_unblended_cost"
  20. // Amortized Cost Columns
  21. const AthenaRIPricingColumn = "reservation_effective_cost"
  22. const AthenaSPPricingColumn = "savings_plan_savings_plan_effective_cost"
  23. // Net Cost Columns
  24. const AthenaNetPricingColumn = "line_item_net_unblended_cost"
  25. var AthenaNetPricingCoalesce = fmt.Sprintf("COALESCE(%s, %s, 0)", AthenaNetPricingColumn, AthenaPricingColumn)
  26. // Amortized Net Cost Columns
  27. const AthenaNetRIPricingColumn = "reservation_net_effective_cost"
  28. var AthenaNetRIPricingCoalesce = fmt.Sprintf("COALESCE(%s, %s, 0)", AthenaNetRIPricingColumn, AthenaRIPricingColumn)
  29. const AthenaNetSPPricingColumn = "savings_plan_net_savings_plan_effective_cost"
  30. var AthenaNetSPPricingCoalesce = fmt.Sprintf("COALESCE(%s, %s, 0)", AthenaNetSPPricingColumn, AthenaSPPricingColumn)
  31. // athenaDateTruncColumn Aggregates line items from the hourly level to daily. "line_item_usage_start_date" is used because at
  32. // all time values 00:00-23:00 it will truncate to the correct date.
  33. const AthenaDateColumn = "line_item_usage_start_date"
  34. const AthenaDateTruncColumn = "DATE_TRUNC('day'," + AthenaDateColumn + ") as usage_date"
  35. const AthenaWhereDateFmt = `line_item_usage_start_date >= date '%s' AND line_item_usage_start_date < date '%s'`
  36. const AthenaWhereUsage = "(line_item_line_item_type = 'Usage' OR line_item_line_item_type = 'DiscountedUsage' OR line_item_line_item_type = 'SavingsPlanCoveredUsage' OR line_item_line_item_type = 'EdpDiscount' OR line_item_line_item_type = 'PrivateRateDiscount')"
  37. // AthenaQueryIndexes is a struct for holding the context of a query
  38. type AthenaQueryIndexes struct {
  39. Query string
  40. ColumnIndexes map[string]int
  41. TagColumns []string
  42. AWSTagColumns []string
  43. ListCostColumn string
  44. NetCostColumn string
  45. AmortizedNetCostColumn string
  46. AmortizedCostColumn string
  47. IsK8sColumn string
  48. }
  49. type AthenaIntegration struct {
  50. AthenaQuerier
  51. }
  52. // Query Athena for CUR data and build a new CloudCostSetRange containing the info
  53. func (ai *AthenaIntegration) GetCloudCost(start, end time.Time) (*opencost.CloudCostSetRange, error) {
  54. log.Infof("AthenaIntegration[%s]: GetCloudCost: %s", ai.Key(), opencost.NewWindow(&start, &end).String())
  55. // Query for all column names
  56. allColumns, err := ai.GetColumns()
  57. if err != nil {
  58. return nil, fmt.Errorf("GetCloudCost: error getting Athena columns: %w", err)
  59. }
  60. // List known, hard-coded columns to query
  61. groupByColumns := []string{
  62. AthenaDateTruncColumn,
  63. "line_item_resource_id",
  64. "bill_payer_account_id",
  65. "line_item_usage_account_id",
  66. "line_item_product_code",
  67. "line_item_usage_type",
  68. "product_region_code",
  69. "line_item_availability_zone",
  70. }
  71. // Create query indices
  72. aqi := AthenaQueryIndexes{}
  73. // Add is k8s column
  74. isK8sColumn := ai.GetIsKubernetesColumn(allColumns)
  75. groupByColumns = append(groupByColumns, isK8sColumn)
  76. aqi.IsK8sColumn = isK8sColumn
  77. // Determine which columns are user-defined tags and add those to the list
  78. // of columns to query.
  79. for column := range allColumns {
  80. if strings.HasPrefix(column, LabelColumnPrefix) {
  81. quotedTag := fmt.Sprintf(`"%s"`, column)
  82. groupByColumns = append(groupByColumns, quotedTag)
  83. aqi.TagColumns = append(aqi.TagColumns, quotedTag)
  84. }
  85. if strings.HasPrefix(column, AWSLabelColumnPrefix) {
  86. groupByColumns = append(groupByColumns, column)
  87. aqi.AWSTagColumns = append(aqi.AWSTagColumns, column)
  88. }
  89. }
  90. var selectColumns []string
  91. // Duplicate GroupBy Columns into select columns
  92. selectColumns = append(selectColumns, groupByColumns...)
  93. // Clean Up group by columns
  94. ai.RemoveColumnAliases(groupByColumns)
  95. // Build list cost column and add it to the select columns
  96. listCostColumn := ai.GetListCostColumn()
  97. selectColumns = append(selectColumns, listCostColumn)
  98. aqi.ListCostColumn = listCostColumn
  99. // Build net cost column and add it to select columns
  100. netCostColumn := ai.GetNetCostColumn(allColumns)
  101. selectColumns = append(selectColumns, netCostColumn)
  102. aqi.NetCostColumn = netCostColumn
  103. // Build amortized net cost column and add it to select columns
  104. amortizedNetCostColumn := ai.GetAmortizedNetCostColumn(allColumns)
  105. selectColumns = append(selectColumns, amortizedNetCostColumn)
  106. aqi.AmortizedNetCostColumn = amortizedNetCostColumn
  107. // Build Amortized cost column and add it to select columns
  108. amortizedCostColumn := ai.GetAmortizedCostColumn(allColumns)
  109. selectColumns = append(selectColumns, amortizedCostColumn)
  110. aqi.AmortizedCostColumn = amortizedCostColumn
  111. // Build map of query columns to use for parsing query
  112. aqi.ColumnIndexes = map[string]int{}
  113. for i, column := range selectColumns {
  114. aqi.ColumnIndexes[column] = i
  115. }
  116. whereDate := fmt.Sprintf(AthenaWhereDateFmt, start.Format("2006-01-02"), end.Format("2006-01-02"))
  117. wherePartitions := ai.GetPartitionWhere(start, end)
  118. // Query for all line items with a resource_id or from AWS Marketplace, which did not end before
  119. // the range or start after it. This captures all costs with any amount of
  120. // overlap with the range, for which we will only extract the relevant costs
  121. whereConjuncts := []string{
  122. wherePartitions,
  123. whereDate,
  124. AthenaWhereUsage,
  125. }
  126. columnStr := strings.Join(selectColumns, ", ")
  127. whereClause := strings.Join(whereConjuncts, " AND ")
  128. groupByStr := strings.Join(groupByColumns, ", ")
  129. queryStr := `
  130. SELECT %s
  131. FROM "%s"
  132. WHERE %s
  133. GROUP BY %s
  134. `
  135. aqi.Query = fmt.Sprintf(queryStr, columnStr, ai.Table, whereClause, groupByStr)
  136. ccsr, err := opencost.NewCloudCostSetRange(start, end, opencost.AccumulateOptionDay, ai.Key())
  137. if err != nil {
  138. return nil, err
  139. }
  140. // Generate row handling function.
  141. rowHandler := func(row types.Row) {
  142. cc, err2 := athenaRowToCloudCost(row, aqi)
  143. if err2 != nil {
  144. log.Errorf("AthenaIntegration: GetCloudCost: error while parsing row: %s", err2.Error())
  145. return
  146. }
  147. ccsr.LoadCloudCost(cc)
  148. }
  149. log.Debugf("AthenaIntegration[%s]: GetCloudCost: querying: %s", ai.Key(), aqi.Query)
  150. // Query CUR data and fill out CCSR
  151. err = ai.Query(context.TODO(), aqi.Query, GetAthenaQueryFunc(rowHandler))
  152. if err != nil {
  153. return nil, err
  154. }
  155. ai.ConnectionStatus = ai.GetConnectionStatusFromResult(ccsr, ai.ConnectionStatus)
  156. return ccsr, nil
  157. }
  158. func (ai *AthenaIntegration) GetListCostColumn() string {
  159. var listCostBuilder strings.Builder
  160. listCostBuilder.WriteString("CASE line_item_line_item_type")
  161. listCostBuilder.WriteString(" WHEN 'EdpDiscount' THEN 0")
  162. listCostBuilder.WriteString(" WHEN 'PrivateRateDiscount' THEN 0")
  163. listCostBuilder.WriteString(" ELSE ")
  164. listCostBuilder.WriteString(AthenaPricingColumn)
  165. listCostBuilder.WriteString(" END")
  166. return fmt.Sprintf("SUM(%s) as list_cost", listCostBuilder.String())
  167. }
  168. func (ai *AthenaIntegration) GetNetCostColumn(allColumns map[string]bool) string {
  169. netCostColumn := ""
  170. if allColumns[AthenaNetPricingColumn] { // if Net pricing exists
  171. netCostColumn = AthenaNetPricingCoalesce
  172. } else { // Non-net for if there's no net pricing.
  173. netCostColumn = AthenaPricingColumn
  174. }
  175. return fmt.Sprintf("SUM(%s) as net_cost", netCostColumn)
  176. }
  177. func (ai *AthenaIntegration) GetAmortizedCostColumn(allColumns map[string]bool) string {
  178. amortizedCostCase := ai.GetAmortizedCostCase(allColumns)
  179. return fmt.Sprintf("SUM(%s) as amortized_cost", amortizedCostCase)
  180. }
  181. func (ai *AthenaIntegration) GetAmortizedNetCostColumn(allColumns map[string]bool) string {
  182. amortizedNetCostCase := ""
  183. if allColumns[AthenaNetPricingColumn] { // if Net pricing exists
  184. amortizedNetCostCase = ai.GetAmortizedNetCostCase(allColumns)
  185. } else { // Non-net for if there's no net pricing.
  186. amortizedNetCostCase = ai.GetAmortizedCostCase(allColumns)
  187. }
  188. return fmt.Sprintf("SUM(%s) as amortized_net_cost", amortizedNetCostCase)
  189. }
  190. func (ai *AthenaIntegration) GetAmortizedCostCase(allColumns map[string]bool) string {
  191. // Use unblended costs if Reserved Instances/Savings Plans aren't in use
  192. if !allColumns[AthenaRIPricingColumn] && !allColumns[AthenaSPPricingColumn] {
  193. return AthenaPricingColumn
  194. }
  195. var costBuilder strings.Builder
  196. costBuilder.WriteString("CASE line_item_line_item_type")
  197. if allColumns[AthenaRIPricingColumn] {
  198. costBuilder.WriteString(" WHEN 'DiscountedUsage' THEN ")
  199. costBuilder.WriteString(AthenaRIPricingColumn)
  200. }
  201. if allColumns[AthenaSPPricingColumn] {
  202. costBuilder.WriteString(" WHEN 'SavingsPlanCoveredUsage' THEN ")
  203. costBuilder.WriteString(AthenaSPPricingColumn)
  204. }
  205. costBuilder.WriteString(" ELSE ")
  206. costBuilder.WriteString(AthenaPricingColumn)
  207. costBuilder.WriteString(" END")
  208. return costBuilder.String()
  209. }
  210. func (ai *AthenaIntegration) GetAmortizedNetCostCase(allColumns map[string]bool) string {
  211. // Use net unblended costs if Reserved Instances/Savings Plans aren't in use
  212. if !allColumns[AthenaNetRIPricingColumn] && !allColumns[AthenaNetSPPricingColumn] {
  213. return AthenaNetPricingCoalesce
  214. }
  215. var costBuilder strings.Builder
  216. costBuilder.WriteString("CASE line_item_line_item_type")
  217. if allColumns[AthenaNetRIPricingColumn] {
  218. costBuilder.WriteString(" WHEN 'DiscountedUsage' THEN ")
  219. costBuilder.WriteString(AthenaNetRIPricingCoalesce)
  220. }
  221. if allColumns[AthenaNetSPPricingColumn] {
  222. costBuilder.WriteString(" WHEN 'SavingsPlanCoveredUsage' THEN ")
  223. costBuilder.WriteString(AthenaNetSPPricingCoalesce)
  224. }
  225. costBuilder.WriteString(" ELSE ")
  226. costBuilder.WriteString(AthenaNetPricingCoalesce)
  227. costBuilder.WriteString(" END")
  228. return costBuilder.String()
  229. }
  230. func (ai *AthenaIntegration) RemoveColumnAliases(columns []string) {
  231. for i, column := range columns {
  232. if strings.Contains(column, " as ") {
  233. columnValues := strings.Split(column, " as ")
  234. columns[i] = columnValues[0]
  235. }
  236. }
  237. }
  238. func (ai *AthenaIntegration) ConvertLabelToAWSTag(label string) string {
  239. // if the label already has the column prefix assume that it is in the correct format
  240. if strings.HasPrefix(label, LabelColumnPrefix) {
  241. return label
  242. }
  243. // replace characters with underscore
  244. tag := label
  245. tag = strings.ReplaceAll(tag, ".", "_")
  246. tag = strings.ReplaceAll(tag, "/", "_")
  247. tag = strings.ReplaceAll(tag, ":", "_")
  248. tag = strings.ReplaceAll(tag, "-", "_")
  249. // add prefix and return
  250. return LabelColumnPrefix + tag
  251. }
  252. // GetIsKubernetesColumn builds a column that determines if a row represents kubernetes spend
  253. func (ai *AthenaIntegration) GetIsKubernetesColumn(allColumns map[string]bool) string {
  254. disjuncts := []string{
  255. "line_item_product_code = 'AmazonEKS'", // EKS is always kubernetes
  256. }
  257. // tagColumns is a list of columns where the presence of a value indicates that a resource is part of a kubernetes cluster
  258. tagColumns := []string{
  259. "resource_tags_aws_eks_cluster_name",
  260. "resource_tags_user_eks_cluster_name",
  261. "resource_tags_user_alpha_eksctl_io_cluster_name",
  262. "resource_tags_user_kubernetes_io_service_name",
  263. "resource_tags_user_kubernetes_io_created_for_pvc_name",
  264. "resource_tags_user_kubernetes_io_created_for_pv_name",
  265. }
  266. for _, tagColumn := range tagColumns {
  267. // if tag column is present in the CUR check for it
  268. if _, ok := allColumns[tagColumn]; ok {
  269. disjunctStr := fmt.Sprintf("%s <> ''", tagColumn)
  270. disjuncts = append(disjuncts, disjunctStr)
  271. }
  272. }
  273. return fmt.Sprintf("(%s) as is_kubernetes", strings.Join(disjuncts, " OR "))
  274. }
  275. func (ai *AthenaIntegration) GetPartitionWhere(start, end time.Time) string {
  276. month := time.Date(start.Year(), start.Month(), 1, 0, 0, 0, 0, time.UTC)
  277. endMonth := time.Date(end.Year(), end.Month(), 1, 0, 0, 0, 0, time.UTC)
  278. var disjuncts []string
  279. for !month.After(endMonth) {
  280. disjuncts = append(disjuncts, fmt.Sprintf("(year = '%d' AND month = '%d')", month.Year(), month.Month()))
  281. month = month.AddDate(0, 1, 0)
  282. }
  283. str := fmt.Sprintf("(%s)", strings.Join(disjuncts, " OR "))
  284. return str
  285. }
  286. func athenaRowToCloudCost(row types.Row, aqi AthenaQueryIndexes) (*opencost.CloudCost, error) {
  287. if len(row.Data) < len(aqi.ColumnIndexes) {
  288. return nil, fmt.Errorf("rowToCloudCost: row with fewer than %d columns (has only %d)", len(aqi.ColumnIndexes), len(row.Data))
  289. }
  290. // Iterate through the slice of tag columns, assigning
  291. // values to the column names, minus the tag prefix.
  292. labels := opencost.CloudCostLabels{}
  293. for _, tagColumnName := range aqi.TagColumns {
  294. // remove quotes
  295. labelName := strings.TrimPrefix(tagColumnName, `"`)
  296. labelName = strings.TrimSuffix(labelName, `"`)
  297. // remove prefix
  298. labelName = strings.TrimPrefix(labelName, LabelColumnPrefix)
  299. value := GetAthenaRowValue(row, aqi.ColumnIndexes, tagColumnName)
  300. if value != "" {
  301. labels[labelName] = value
  302. }
  303. }
  304. for _, awsColumnName := range aqi.AWSTagColumns {
  305. // partially remove prefix leaving "aws_"
  306. labelName := strings.TrimPrefix(awsColumnName, AthenaResourceTagPrefix)
  307. value := GetAthenaRowValue(row, aqi.ColumnIndexes, awsColumnName)
  308. if value != "" {
  309. labels[labelName] = value
  310. }
  311. }
  312. invoiceEntityID := GetAthenaRowValue(row, aqi.ColumnIndexes, "bill_payer_account_id")
  313. accountID := GetAthenaRowValue(row, aqi.ColumnIndexes, "line_item_usage_account_id")
  314. startStr := GetAthenaRowValue(row, aqi.ColumnIndexes, AthenaDateTruncColumn)
  315. providerID := GetAthenaRowValue(row, aqi.ColumnIndexes, "line_item_resource_id")
  316. productCode := GetAthenaRowValue(row, aqi.ColumnIndexes, "line_item_product_code")
  317. usageType := GetAthenaRowValue(row, aqi.ColumnIndexes, "line_item_usage_type")
  318. regionCode := GetAthenaRowValue(row, aqi.ColumnIndexes, "product_region_code")
  319. availabilityZone := GetAthenaRowValue(row, aqi.ColumnIndexes, "line_item_availability_zone")
  320. isK8s, _ := strconv.ParseBool(GetAthenaRowValue(row, aqi.ColumnIndexes, aqi.IsK8sColumn))
  321. k8sPct := 0.0
  322. if isK8s {
  323. k8sPct = 1.0
  324. }
  325. listCost, err := GetAthenaRowValueFloat(row, aqi.ColumnIndexes, aqi.ListCostColumn)
  326. if err != nil {
  327. return nil, err
  328. }
  329. netCost, err := GetAthenaRowValueFloat(row, aqi.ColumnIndexes, aqi.NetCostColumn)
  330. if err != nil {
  331. return nil, err
  332. }
  333. amortizedNetCost, err := GetAthenaRowValueFloat(row, aqi.ColumnIndexes, aqi.AmortizedNetCostColumn)
  334. if err != nil {
  335. return nil, err
  336. }
  337. amortizedCost, err := GetAthenaRowValueFloat(row, aqi.ColumnIndexes, aqi.AmortizedCostColumn)
  338. if err != nil {
  339. return nil, err
  340. }
  341. // Identify resource category in the CUR
  342. category := SelectAWSCategory(providerID, usageType, productCode)
  343. // Retrieve final stanza of product code for ProviderID
  344. if productCode == "AWSELB" || productCode == "AmazonFSx" {
  345. providerID = ParseARN(providerID)
  346. }
  347. if productCode == "AmazonEKS" && category == opencost.ComputeCategory {
  348. if strings.Contains(usageType, "CPU") {
  349. providerID = fmt.Sprintf("%s/CPU", providerID)
  350. } else if strings.Contains(usageType, "GB") {
  351. providerID = fmt.Sprintf("%s/RAM", providerID)
  352. }
  353. }
  354. properties := opencost.CloudCostProperties{
  355. ProviderID: providerID,
  356. Provider: opencost.AWSProvider,
  357. AccountID: accountID,
  358. AccountName: accountID,
  359. InvoiceEntityID: invoiceEntityID,
  360. InvoiceEntityName: invoiceEntityID,
  361. RegionID: regionCode,
  362. AvailabilityZone: availabilityZone,
  363. Service: productCode,
  364. Category: category,
  365. Labels: labels,
  366. }
  367. start, err := time.Parse(AthenaDateLayout, startStr)
  368. if err != nil {
  369. return nil, fmt.Errorf("unable to parse %s: '%s'", AthenaDateTruncColumn, err.Error())
  370. }
  371. end := start.AddDate(0, 0, 1)
  372. cc := &opencost.CloudCost{
  373. Properties: &properties,
  374. Window: opencost.NewWindow(&start, &end),
  375. ListCost: opencost.CostMetric{
  376. Cost: listCost,
  377. KubernetesPercent: k8sPct,
  378. },
  379. NetCost: opencost.CostMetric{
  380. Cost: netCost,
  381. KubernetesPercent: k8sPct,
  382. },
  383. AmortizedNetCost: opencost.CostMetric{
  384. Cost: amortizedNetCost,
  385. KubernetesPercent: k8sPct,
  386. },
  387. AmortizedCost: opencost.CostMetric{
  388. Cost: amortizedCost,
  389. KubernetesPercent: k8sPct,
  390. },
  391. InvoicedCost: opencost.CostMetric{
  392. Cost: netCost, // We are using Net Cost for Invoiced Cost for now as it is the closest approximation
  393. KubernetesPercent: k8sPct,
  394. },
  395. }
  396. return cc, nil
  397. }
  398. func (ai *AthenaIntegration) GetConnectionStatusFromResult(result cloud.EmptyChecker, currentStatus cloud.ConnectionStatus) cloud.ConnectionStatus {
  399. if result.IsEmpty() && currentStatus != cloud.SuccessfulConnection {
  400. return cloud.MissingData
  401. }
  402. return cloud.SuccessfulConnection
  403. }