2
0

athenaintegration.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453
  1. package aws
  2. import (
  3. "context"
  4. "fmt"
  5. "strconv"
  6. "strings"
  7. "time"
  8. "github.com/aws/aws-sdk-go-v2/service/athena/types"
  9. "github.com/opencost/opencost/pkg/cloud"
  10. "github.com/opencost/opencost/pkg/kubecost"
  11. "github.com/opencost/opencost/pkg/log"
  12. "github.com/opencost/opencost/pkg/util/timeutil"
  13. )
  14. const LabelColumnPrefix = "resource_tags_user_"
  15. // athenaDateLayout is the default AWS date format
  16. const AthenaDateLayout = "2006-01-02 15:04:05.000"
  17. // Cost Columns
  18. const AthenaPricingColumn = "line_item_unblended_cost"
  19. // Amortized Cost Columns
  20. const AthenaRIPricingColumn = "reservation_effective_cost"
  21. const AthenaSPPricingColumn = "savings_plan_savings_plan_effective_cost"
  22. // Net Cost Columns
  23. const AthenaNetPricingColumn = "line_item_net_unblended_cost"
  24. var AthenaNetPricingCoalesce = fmt.Sprintf("COALESCE(%s, %s, 0)", AthenaNetPricingColumn, AthenaPricingColumn)
  25. // Amortized Net Cost Columns
  26. const AthenaNetRIPricingColumn = "reservation_net_effective_cost"
  27. var AthenaNetRIPricingCoalesce = fmt.Sprintf("COALESCE(%s, %s, 0)", AthenaNetRIPricingColumn, AthenaRIPricingColumn)
  28. const AthenaNetSPPricingColumn = "savings_plan_net_savings_plan_effective_cost"
  29. var AthenaNetSPPricingCoalesce = fmt.Sprintf("COALESCE(%s, %s, 0)", AthenaNetSPPricingColumn, AthenaSPPricingColumn)
  30. // athenaDateTruncColumn Aggregates line items from the hourly level to daily. "line_item_usage_start_date" is used because at
  31. // all time values 00:00-23:00 it will truncate to the correct date.
  32. const AthenaDateColumn = "line_item_usage_start_date"
  33. const AthenaDateTruncColumn = "DATE_TRUNC('day'," + AthenaDateColumn + ") as usage_date"
  34. const AthenaWhereDateFmt = `line_item_usage_start_date >= date '%s' AND line_item_usage_start_date < date '%s'`
  35. const AthenaWhereUsage = "(line_item_line_item_type = 'Usage' OR line_item_line_item_type = 'DiscountedUsage' OR line_item_line_item_type = 'SavingsPlanCoveredUsage' OR line_item_line_item_type = 'EdpDiscount' OR line_item_line_item_type = 'PrivateRateDiscount')"
  36. // AthenaQueryIndexes is a struct for holding the context of a query
  37. type AthenaQueryIndexes struct {
  38. Query string
  39. ColumnIndexes map[string]int
  40. TagColumns []string
  41. ListCostColumn string
  42. NetCostColumn string
  43. AmortizedNetCostColumn string
  44. AmortizedCostColumn string
  45. IsK8sColumn string
  46. }
  47. type AthenaIntegration struct {
  48. AthenaQuerier
  49. }
  50. // Query Athena for CUR data and build a new CloudCostSetRange containing the info
  51. func (ai *AthenaIntegration) GetCloudCost(start, end time.Time) (*kubecost.CloudCostSetRange, error) {
  52. log.Infof("AthenaIntegration[%s]: GetCloudCost: %s", ai.Key(), kubecost.NewWindow(&start, &end).String())
  53. // Query for all column names
  54. allColumns, err := ai.GetColumns()
  55. if err != nil {
  56. return nil, fmt.Errorf("GetCloudCost: error getting Athena columns: %w", err)
  57. }
  58. // List known, hard-coded columns to query
  59. groupByColumns := []string{
  60. AthenaDateTruncColumn,
  61. "line_item_resource_id",
  62. "bill_payer_account_id",
  63. "line_item_usage_account_id",
  64. "line_item_product_code",
  65. "line_item_usage_type",
  66. }
  67. // Create query indices
  68. aqi := AthenaQueryIndexes{}
  69. // Add is k8s column
  70. isK8sColumn := ai.GetIsKubernetesColumn(allColumns)
  71. groupByColumns = append(groupByColumns, isK8sColumn)
  72. aqi.IsK8sColumn = isK8sColumn
  73. // Determine which columns are user-defined tags and add those to the list
  74. // of columns to query.
  75. for column := range allColumns {
  76. if strings.HasPrefix(column, LabelColumnPrefix) {
  77. groupByColumns = append(groupByColumns, column)
  78. aqi.TagColumns = append(aqi.TagColumns, column)
  79. }
  80. }
  81. var selectColumns []string
  82. // Duplicate GroupBy Columns into select columns
  83. selectColumns = append(selectColumns, groupByColumns...)
  84. // Clean Up group by columns
  85. ai.RemoveColumnAliases(groupByColumns)
  86. // Build list cost column and add it to the select columns
  87. listCostColumn := ai.GetListCostColumn()
  88. selectColumns = append(selectColumns, listCostColumn)
  89. aqi.ListCostColumn = listCostColumn
  90. // Build net cost column and add it to select columns
  91. netCostColumn := ai.GetNetCostColumn(allColumns)
  92. selectColumns = append(selectColumns, netCostColumn)
  93. aqi.NetCostColumn = netCostColumn
  94. // Build amortized net cost column and add it to select columns
  95. amortizedNetCostColumn := ai.GetAmortizedNetCostColumn(allColumns)
  96. selectColumns = append(selectColumns, amortizedNetCostColumn)
  97. aqi.AmortizedNetCostColumn = amortizedNetCostColumn
  98. // Build Amortized cost column and add it to select columns
  99. amortizedCostColumn := ai.GetAmortizedCostColumn(allColumns)
  100. selectColumns = append(selectColumns, amortizedCostColumn)
  101. aqi.AmortizedCostColumn = amortizedCostColumn
  102. // Build map of query columns to use for parsing query
  103. aqi.ColumnIndexes = map[string]int{}
  104. for i, column := range selectColumns {
  105. aqi.ColumnIndexes[column] = i
  106. }
  107. whereDate := fmt.Sprintf(AthenaWhereDateFmt, start.Format("2006-01-02"), end.Format("2006-01-02"))
  108. wherePartitions := ai.GetPartitionWhere(start, end)
  109. // Query for all line items with a resource_id or from AWS Marketplace, which did not end before
  110. // the range or start after it. This captures all costs with any amount of
  111. // overlap with the range, for which we will only extract the relevant costs
  112. whereConjuncts := []string{
  113. wherePartitions,
  114. whereDate,
  115. AthenaWhereUsage,
  116. }
  117. columnStr := strings.Join(selectColumns, ", ")
  118. whereClause := strings.Join(whereConjuncts, " AND ")
  119. groupByStr := strings.Join(groupByColumns, ", ")
  120. queryStr := `
  121. SELECT %s
  122. FROM %s
  123. WHERE %s
  124. GROUP BY %s
  125. `
  126. aqi.Query = fmt.Sprintf(queryStr, columnStr, ai.Table, whereClause, groupByStr)
  127. ccsr, err := kubecost.NewCloudCostSetRange(start, end, timeutil.Day, ai.Key())
  128. if err != nil {
  129. return nil, err
  130. }
  131. // Generate row handling function.
  132. rowHandler := func(row types.Row) {
  133. err2 := ai.RowToCloudCost(row, aqi, ccsr)
  134. if err2 != nil {
  135. log.Errorf("AthenaIntegration: GetCloudCost: error while parsing row: %s", err2.Error())
  136. }
  137. }
  138. log.Debugf("AthenaIntegration[%s]: GetCloudCost: querying: %s", ai.Key(), aqi.Query)
  139. // Query CUR data and fill out CCSR
  140. err = ai.Query(context.TODO(), aqi.Query, GetAthenaQueryFunc(rowHandler))
  141. if err != nil {
  142. return nil, err
  143. }
  144. ai.ConnectionStatus = ai.GetConnectionStatusFromResult(ccsr, ai.ConnectionStatus)
  145. return ccsr, nil
  146. }
  147. func (ai *AthenaIntegration) GetListCostColumn() string {
  148. var listCostBuilder strings.Builder
  149. listCostBuilder.WriteString("CASE line_item_line_item_type")
  150. listCostBuilder.WriteString(" WHEN 'EdpDiscount' THEN 0")
  151. listCostBuilder.WriteString(" WHEN 'PrivateRateDiscount' THEN 0")
  152. listCostBuilder.WriteString(" ELSE ")
  153. listCostBuilder.WriteString(AthenaPricingColumn)
  154. listCostBuilder.WriteString(" END")
  155. return fmt.Sprintf("SUM(%s) as list_cost", listCostBuilder.String())
  156. }
  157. func (ai *AthenaIntegration) GetNetCostColumn(allColumns map[string]bool) string {
  158. netCostColumn := ""
  159. if allColumns[AthenaNetPricingColumn] { // if Net pricing exists
  160. netCostColumn = AthenaNetPricingCoalesce
  161. } else { // Non-net for if there's no net pricing.
  162. netCostColumn = AthenaPricingColumn
  163. }
  164. return fmt.Sprintf("SUM(%s) as net_cost", netCostColumn)
  165. }
  166. func (ai *AthenaIntegration) GetAmortizedCostColumn(allColumns map[string]bool) string {
  167. amortizedCostCase := ai.GetAmortizedCostCase(allColumns)
  168. return fmt.Sprintf("SUM(%s) as amortized_cost", amortizedCostCase)
  169. }
  170. func (ai *AthenaIntegration) GetAmortizedNetCostColumn(allColumns map[string]bool) string {
  171. amortizedNetCostCase := ""
  172. if allColumns[AthenaNetPricingColumn] { // if Net pricing exists
  173. amortizedNetCostCase = ai.GetAmortizedNetCostCase(allColumns)
  174. } else { // Non-net for if there's no net pricing.
  175. amortizedNetCostCase = ai.GetAmortizedCostCase(allColumns)
  176. }
  177. return fmt.Sprintf("SUM(%s) as amortized_net_cost", amortizedNetCostCase)
  178. }
  179. func (ai *AthenaIntegration) GetAmortizedCostCase(allColumns map[string]bool) string {
  180. // Use unblended costs if Reserved Instances/Savings Plans aren't in use
  181. if !allColumns[AthenaRIPricingColumn] && !allColumns[AthenaSPPricingColumn] {
  182. return AthenaPricingColumn
  183. }
  184. var costBuilder strings.Builder
  185. costBuilder.WriteString("CASE line_item_line_item_type")
  186. if allColumns[AthenaRIPricingColumn] {
  187. costBuilder.WriteString(" WHEN 'DiscountedUsage' THEN ")
  188. costBuilder.WriteString(AthenaRIPricingColumn)
  189. }
  190. if allColumns[AthenaSPPricingColumn] {
  191. costBuilder.WriteString(" WHEN 'SavingsPlanCoveredUsage' THEN ")
  192. costBuilder.WriteString(AthenaSPPricingColumn)
  193. }
  194. costBuilder.WriteString(" ELSE ")
  195. costBuilder.WriteString(AthenaPricingColumn)
  196. costBuilder.WriteString(" END")
  197. return costBuilder.String()
  198. }
  199. func (ai *AthenaIntegration) GetAmortizedNetCostCase(allColumns map[string]bool) string {
  200. // Use net unblended costs if Reserved Instances/Savings Plans aren't in use
  201. if !allColumns[AthenaNetRIPricingColumn] && !allColumns[AthenaNetSPPricingColumn] {
  202. return AthenaNetPricingCoalesce
  203. }
  204. var costBuilder strings.Builder
  205. costBuilder.WriteString("CASE line_item_line_item_type")
  206. if allColumns[AthenaNetRIPricingColumn] {
  207. costBuilder.WriteString(" WHEN 'DiscountedUsage' THEN ")
  208. costBuilder.WriteString(AthenaNetRIPricingCoalesce)
  209. }
  210. if allColumns[AthenaNetSPPricingColumn] {
  211. costBuilder.WriteString(" WHEN 'SavingsPlanCoveredUsage' THEN ")
  212. costBuilder.WriteString(AthenaNetSPPricingCoalesce)
  213. }
  214. costBuilder.WriteString(" ELSE ")
  215. costBuilder.WriteString(AthenaNetPricingCoalesce)
  216. costBuilder.WriteString(" END")
  217. return costBuilder.String()
  218. }
  219. func (ai *AthenaIntegration) RemoveColumnAliases(columns []string) {
  220. for i, column := range columns {
  221. if strings.Contains(column, " as ") {
  222. columnValues := strings.Split(column, " as ")
  223. columns[i] = columnValues[0]
  224. }
  225. }
  226. }
  227. func (ai *AthenaIntegration) ConvertLabelToAWSTag(label string) string {
  228. // if the label already has the column prefix assume that it is in the correct format
  229. if strings.HasPrefix(label, LabelColumnPrefix) {
  230. return label
  231. }
  232. // replace characters with underscore
  233. tag := label
  234. tag = strings.ReplaceAll(tag, ".", "_")
  235. tag = strings.ReplaceAll(tag, "/", "_")
  236. tag = strings.ReplaceAll(tag, ":", "_")
  237. tag = strings.ReplaceAll(tag, "-", "_")
  238. // add prefix and return
  239. return LabelColumnPrefix + tag
  240. }
  241. // GetIsKubernetesColumn builds a column that determines if a row represents kubernetes spend
  242. func (ai *AthenaIntegration) GetIsKubernetesColumn(allColumns map[string]bool) string {
  243. disjuncts := []string{
  244. "line_item_product_code = 'AmazonEKS'", // EKS is always kubernetes
  245. }
  246. // tagColumns is a list of columns where the presence of a value indicates that a resource is part of a kubernetes cluster
  247. tagColumns := []string{
  248. "resource_tags_aws_eks_cluster_name",
  249. "resource_tags_user_eks_cluster_name",
  250. "resource_tags_user_alpha_eksctl_io_cluster_name",
  251. "resource_tags_user_kubernetes_io_service_name",
  252. "resource_tags_user_kubernetes_io_created_for_pvc_name",
  253. "resource_tags_user_kubernetes_io_created_for_pv_name",
  254. }
  255. for _, tagColumn := range tagColumns {
  256. // if tag column is present in the CUR check for it
  257. if _, ok := allColumns[tagColumn]; ok {
  258. disjunctStr := fmt.Sprintf("%s <> ''", tagColumn)
  259. disjuncts = append(disjuncts, disjunctStr)
  260. }
  261. }
  262. return fmt.Sprintf("(%s) as is_kubernetes", strings.Join(disjuncts, " OR "))
  263. }
  264. func (ai *AthenaIntegration) GetPartitionWhere(start, end time.Time) string {
  265. month := time.Date(start.Year(), start.Month(), 1, 0, 0, 0, 0, time.UTC)
  266. endMonth := time.Date(end.Year(), end.Month(), 1, 0, 0, 0, 0, time.UTC)
  267. var disjuncts []string
  268. for !month.After(endMonth) {
  269. disjuncts = append(disjuncts, fmt.Sprintf("(year = '%d' AND month = '%d')", month.Year(), month.Month()))
  270. month = month.AddDate(0, 1, 0)
  271. }
  272. str := fmt.Sprintf("(%s)", strings.Join(disjuncts, " OR "))
  273. return str
  274. }
  275. func (ai *AthenaIntegration) RowToCloudCost(row types.Row, aqi AthenaQueryIndexes, ccsr *kubecost.CloudCostSetRange) error {
  276. if len(row.Data) < len(aqi.ColumnIndexes) {
  277. return fmt.Errorf("rowToCloudCost: row with fewer than %d columns (has only %d)", len(aqi.ColumnIndexes), len(row.Data))
  278. }
  279. // Iterate through the slice of tag columns, assigning
  280. // values to the column names, minus the tag prefix.
  281. labels := kubecost.CloudCostLabels{}
  282. labelValues := []string{}
  283. for _, tagColumnName := range aqi.TagColumns {
  284. labelName := strings.TrimPrefix(tagColumnName, LabelColumnPrefix)
  285. value := GetAthenaRowValue(row, aqi.ColumnIndexes, tagColumnName)
  286. if value != "" {
  287. labels[labelName] = value
  288. labelValues = append(labelValues, value)
  289. }
  290. }
  291. invoiceEntityID := GetAthenaRowValue(row, aqi.ColumnIndexes, "bill_payer_account_id")
  292. accountID := GetAthenaRowValue(row, aqi.ColumnIndexes, "line_item_usage_account_id")
  293. startStr := GetAthenaRowValue(row, aqi.ColumnIndexes, AthenaDateTruncColumn)
  294. providerID := GetAthenaRowValue(row, aqi.ColumnIndexes, "line_item_resource_id")
  295. productCode := GetAthenaRowValue(row, aqi.ColumnIndexes, "line_item_product_code")
  296. usageType := GetAthenaRowValue(row, aqi.ColumnIndexes, "line_item_usage_type")
  297. isK8s, _ := strconv.ParseBool(GetAthenaRowValue(row, aqi.ColumnIndexes, aqi.IsK8sColumn))
  298. k8sPct := 0.0
  299. if isK8s {
  300. k8sPct = 1.0
  301. }
  302. listCost, err := GetAthenaRowValueFloat(row, aqi.ColumnIndexes, aqi.ListCostColumn)
  303. if err != nil {
  304. return err
  305. }
  306. netCost, err := GetAthenaRowValueFloat(row, aqi.ColumnIndexes, aqi.NetCostColumn)
  307. if err != nil {
  308. return err
  309. }
  310. amortizedNetCost, err := GetAthenaRowValueFloat(row, aqi.ColumnIndexes, aqi.AmortizedNetCostColumn)
  311. if err != nil {
  312. return err
  313. }
  314. amortizedCost, err := GetAthenaRowValueFloat(row, aqi.ColumnIndexes, aqi.AmortizedCostColumn)
  315. if err != nil {
  316. return err
  317. }
  318. // Identify resource category in the CUR
  319. category := SelectAWSCategory(providerID, usageType, productCode)
  320. // Retrieve final stanza of product code for ProviderID
  321. if productCode == "AWSELB" || productCode == "AmazonFSx" {
  322. providerID = ParseARN(providerID)
  323. }
  324. if productCode == "AmazonEKS" && category == kubecost.ComputeCategory {
  325. if strings.Contains(usageType, "CPU") {
  326. providerID = fmt.Sprintf("%s/CPU", providerID)
  327. } else if strings.Contains(usageType, "GB") {
  328. providerID = fmt.Sprintf("%s/RAM", providerID)
  329. }
  330. }
  331. properties := kubecost.CloudCostProperties{
  332. ProviderID: providerID,
  333. Provider: kubecost.AWSProvider,
  334. AccountID: accountID,
  335. InvoiceEntityID: invoiceEntityID,
  336. Service: productCode,
  337. Category: category,
  338. Labels: labels,
  339. }
  340. start, err := time.Parse(AthenaDateLayout, startStr)
  341. if err != nil {
  342. return fmt.Errorf("unable to parse %s: '%s'", AthenaDateTruncColumn, err.Error())
  343. }
  344. end := start.AddDate(0, 0, 1)
  345. cc := &kubecost.CloudCost{
  346. Properties: &properties,
  347. Window: kubecost.NewWindow(&start, &end),
  348. ListCost: kubecost.CostMetric{
  349. Cost: listCost,
  350. KubernetesPercent: k8sPct,
  351. },
  352. NetCost: kubecost.CostMetric{
  353. Cost: netCost,
  354. KubernetesPercent: k8sPct,
  355. },
  356. AmortizedNetCost: kubecost.CostMetric{
  357. Cost: amortizedNetCost,
  358. KubernetesPercent: k8sPct,
  359. },
  360. AmortizedCost: kubecost.CostMetric{
  361. Cost: amortizedCost,
  362. KubernetesPercent: k8sPct,
  363. },
  364. InvoicedCost: kubecost.CostMetric{
  365. Cost: netCost, // We are using Net Cost for Invoiced Cost for now as it is the closest approximation
  366. KubernetesPercent: k8sPct,
  367. },
  368. }
  369. ccsr.LoadCloudCost(cc)
  370. return nil
  371. }
  372. func (ai *AthenaIntegration) GetConnectionStatusFromResult(result cloud.EmptyChecker, currentStatus cloud.ConnectionStatus) cloud.ConnectionStatus {
  373. if result.IsEmpty() && currentStatus != cloud.SuccessfulConnection {
  374. return cloud.MissingData
  375. }
  376. return cloud.SuccessfulConnection
  377. }
  378. func (ai *AthenaIntegration) GetConnectionStatus() string {
  379. // initialize status if it has not done so; this can happen if the integration is inactive
  380. if ai.ConnectionStatus.String() == "" {
  381. ai.ConnectionStatus = cloud.InitialStatus
  382. }
  383. return ai.ConnectionStatus.String()
  384. }