athenaintegration.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558
  1. package aws
  2. import (
  3. "context"
  4. "fmt"
  5. "strconv"
  6. "strings"
  7. "time"
  8. "github.com/aws/aws-sdk-go-v2/service/athena/types"
  9. "github.com/opencost/opencost/core/pkg/log"
  10. "github.com/opencost/opencost/core/pkg/opencost"
  11. "github.com/opencost/opencost/core/pkg/util/json"
  12. "github.com/opencost/opencost/core/pkg/util/timeutil"
  13. "github.com/opencost/opencost/pkg/cloud"
  14. )
  15. const LabelColumnPrefix = "resource_tags_user_"
  16. const AWSLabelColumnPrefix = "resource_tags_aws_"
  17. const AthenaResourceTagPrefix = "resource_tags_"
  18. const AthenaResourceTagsColumn = "resource_tags"
  19. const AthenaResourceTagsCastToJsonColumn = "CAST(resource_tags AS JSON) as resource_tags"
  20. const AthenaInvoiceEntityNameColumn = "bill_payer_account_name"
  21. const AthenaAccountNameColumn = "line_item_usage_account_name"
  22. // athenaDateLayout is the default AWS date format
  23. const AthenaDateLayout = "2006-01-02 15:04:05.000"
  24. // Cost Columns
  25. const AthenaPricingColumn = "line_item_unblended_cost"
  26. // Amortized Cost Columns
  27. const AthenaRIPricingColumn = "reservation_effective_cost"
  28. const AthenaSPPricingColumn = "savings_plan_savings_plan_effective_cost"
  29. // Net Cost Columns
  30. const AthenaNetPricingColumn = "line_item_net_unblended_cost"
  31. var AthenaNetPricingCoalesce = fmt.Sprintf("COALESCE(%s, %s, 0)", AthenaNetPricingColumn, AthenaPricingColumn)
  32. // Amortized Net Cost Columns
  33. const AthenaNetRIPricingColumn = "reservation_net_effective_cost"
  34. var AthenaNetRIPricingCoalesce = fmt.Sprintf("COALESCE(%s, %s, 0)", AthenaNetRIPricingColumn, AthenaRIPricingColumn)
  35. const AthenaNetSPPricingColumn = "savings_plan_net_savings_plan_effective_cost"
  36. var AthenaNetSPPricingCoalesce = fmt.Sprintf("COALESCE(%s, %s, 0)", AthenaNetSPPricingColumn, AthenaSPPricingColumn)
  37. // athenaDateTruncColumn Aggregates line items from the hourly level to daily. "line_item_usage_start_date" is used because at
  38. // all time values 00:00-23:00 it will truncate to the correct date.
  39. const AthenaDateColumn = "line_item_usage_start_date"
  40. const AthenaDateTruncColumn = "DATE_TRUNC('day'," + AthenaDateColumn + ") as usage_date"
  41. const AthenaWhereDateFmt = `line_item_usage_start_date >= date '%s' AND line_item_usage_start_date < date '%s'`
  42. const AthenaWhereUsage = "(line_item_line_item_type = 'Usage' OR line_item_line_item_type = 'DiscountedUsage' OR line_item_line_item_type = 'SavingsPlanCoveredUsage' OR line_item_line_item_type = 'EdpDiscount' OR line_item_line_item_type = 'PrivateRateDiscount')"
  43. // AthenaQueryIndexes is a struct for holding the context of a query
  44. type AthenaQueryIndexes struct {
  45. Query string
  46. ColumnIndexes map[string]int
  47. TagColumns []string
  48. AWSTagColumns []string
  49. ListCostColumn string
  50. NetCostColumn string
  51. AmortizedNetCostColumn string
  52. AmortizedCostColumn string
  53. IsK8sColumn string
  54. }
  55. type AthenaIntegration struct {
  56. AthenaQuerier
  57. }
  58. // Query Athena for CUR data and build a new CloudCostSetRange containing the info
  59. func (ai *AthenaIntegration) GetCloudCost(start, end time.Time) (*opencost.CloudCostSetRange, error) {
  60. return ai.getCloudCost(start, end, 0)
  61. }
  62. func (ai *AthenaIntegration) RefreshStatus() cloud.ConnectionStatus {
  63. end := time.Now().UTC().Truncate(timeutil.Day)
  64. start := end.Add(-3 * timeutil.Day) // lookback 72 hours
  65. // getCloudCost already sets ConnectionStatus in the event there is no error, so we don't need to handle the positive
  66. // case here
  67. _, err := ai.getCloudCost(start, end, 1)
  68. if err != nil {
  69. log.Errorf("AthenaIntegration: RefreshStatus: error while refreshing status: %s", err.Error())
  70. ai.ConnectionStatus = cloud.FailedConnection
  71. }
  72. return ai.ConnectionStatus
  73. }
  74. func (ai *AthenaIntegration) getCloudCost(start, end time.Time, limit int) (*opencost.CloudCostSetRange, error) {
  75. log.Infof("AthenaIntegration[%s]: GetCloudCost: %s", ai.Key(), opencost.NewWindow(&start, &end).String())
  76. // Query for all column names
  77. allColumns, err := ai.GetColumns()
  78. if err != nil {
  79. return nil, fmt.Errorf("GetCloudCost: error getting Athena columns: %w", err)
  80. }
  81. // List known, hard-coded columns to query
  82. groupByColumns := []string{
  83. AthenaDateTruncColumn,
  84. "line_item_resource_id",
  85. "bill_payer_account_id",
  86. "line_item_usage_account_id",
  87. "line_item_product_code",
  88. "line_item_usage_type",
  89. "product_region_code",
  90. "line_item_availability_zone",
  91. }
  92. // Create query indices
  93. aqi := AthenaQueryIndexes{}
  94. // Add is k8s column
  95. isK8sColumn := ai.GetIsKubernetesColumn(allColumns)
  96. groupByColumns = append(groupByColumns, isK8sColumn)
  97. aqi.IsK8sColumn = isK8sColumn
  98. // Determine which columns are user-defined tags and add those to the list
  99. // of columns to query.
  100. for column := range allColumns {
  101. if strings.HasPrefix(column, LabelColumnPrefix) {
  102. quotedTag := fmt.Sprintf(`"%s"`, column)
  103. groupByColumns = append(groupByColumns, quotedTag)
  104. aqi.TagColumns = append(aqi.TagColumns, quotedTag)
  105. }
  106. if strings.HasPrefix(column, AWSLabelColumnPrefix) {
  107. groupByColumns = append(groupByColumns, column)
  108. aqi.AWSTagColumns = append(aqi.AWSTagColumns, column)
  109. }
  110. }
  111. // CUR 2.0 specific columns, CUR 2.0 has ability to disable any column, so we check for any of these columns before querying
  112. if allColumns[AthenaResourceTagsColumn] {
  113. groupByColumns = append(groupByColumns, AthenaResourceTagsCastToJsonColumn)
  114. }
  115. if allColumns[AthenaAccountNameColumn] {
  116. groupByColumns = append(groupByColumns, AthenaAccountNameColumn)
  117. }
  118. if allColumns[AthenaInvoiceEntityNameColumn] {
  119. groupByColumns = append(groupByColumns, AthenaInvoiceEntityNameColumn)
  120. }
  121. var selectColumns []string
  122. // Duplicate GroupBy Columns into select columns
  123. selectColumns = append(selectColumns, groupByColumns...)
  124. // Clean Up group by columns
  125. ai.RemoveColumnAliases(groupByColumns)
  126. // Build list cost column and add it to the select columns
  127. listCostColumn := ai.GetListCostColumn()
  128. selectColumns = append(selectColumns, listCostColumn)
  129. aqi.ListCostColumn = listCostColumn
  130. // Build net cost column and add it to select columns
  131. netCostColumn := ai.GetNetCostColumn(allColumns)
  132. selectColumns = append(selectColumns, netCostColumn)
  133. aqi.NetCostColumn = netCostColumn
  134. // Build amortized net cost column and add it to select columns
  135. amortizedNetCostColumn := ai.GetAmortizedNetCostColumn(allColumns)
  136. selectColumns = append(selectColumns, amortizedNetCostColumn)
  137. aqi.AmortizedNetCostColumn = amortizedNetCostColumn
  138. // Build Amortized cost column and add it to select columns
  139. amortizedCostColumn := ai.GetAmortizedCostColumn(allColumns)
  140. selectColumns = append(selectColumns, amortizedCostColumn)
  141. aqi.AmortizedCostColumn = amortizedCostColumn
  142. // Build map of query columns to use for parsing query
  143. aqi.ColumnIndexes = map[string]int{}
  144. for i, column := range selectColumns {
  145. aqi.ColumnIndexes[column] = i
  146. }
  147. whereDate := fmt.Sprintf(AthenaWhereDateFmt, start.Format("2006-01-02"), end.Format("2006-01-02"))
  148. wherePartitions := ai.GetPartitionWhere(start, end, isCUR20(allColumns))
  149. // Query for all line items with a resource_id or from AWS Marketplace, which did not end before
  150. // the range or start after it. This captures all costs with any amount of
  151. // overlap with the range, for which we will only extract the relevant costs
  152. whereConjuncts := []string{
  153. wherePartitions,
  154. whereDate,
  155. AthenaWhereUsage,
  156. }
  157. columnStr := strings.Join(selectColumns, ", ")
  158. whereClause := strings.Join(whereConjuncts, " AND ")
  159. groupByStr := strings.Join(groupByColumns, ", ")
  160. queryStr := `
  161. SELECT %s
  162. FROM "%s"
  163. WHERE %s
  164. GROUP BY %s
  165. `
  166. if limit > 0 {
  167. queryStr = fmt.Sprintf("%s LIMIT %d", queryStr, limit)
  168. }
  169. aqi.Query = fmt.Sprintf(queryStr, columnStr, ai.Table, whereClause, groupByStr)
  170. ccsr, err := opencost.NewCloudCostSetRange(start, end, opencost.AccumulateOptionDay, ai.Key())
  171. if err != nil {
  172. return nil, err
  173. }
  174. // Generate row handling function.
  175. rowHandler := func(row types.Row) {
  176. cc, err2 := athenaRowToCloudCost(row, aqi)
  177. if err2 != nil {
  178. log.Errorf("AthenaIntegration: GetCloudCost: error while parsing row: %s", err2.Error())
  179. return
  180. }
  181. ccsr.LoadCloudCost(cc)
  182. }
  183. log.Debugf("AthenaIntegration[%s]: GetCloudCost: querying: %s", ai.Key(), aqi.Query)
  184. // Query CUR data and fill out CCSR
  185. err = ai.Query(context.TODO(), aqi.Query, GetAthenaQueryFunc(rowHandler))
  186. if err != nil {
  187. return nil, err
  188. }
  189. ai.ConnectionStatus = ai.GetConnectionStatusFromResult(ccsr, ai.ConnectionStatus)
  190. return ccsr, nil
  191. }
  192. func (ai *AthenaIntegration) GetListCostColumn() string {
  193. var listCostBuilder strings.Builder
  194. listCostBuilder.WriteString("CASE line_item_line_item_type")
  195. listCostBuilder.WriteString(" WHEN 'EdpDiscount' THEN 0")
  196. listCostBuilder.WriteString(" WHEN 'PrivateRateDiscount' THEN 0")
  197. listCostBuilder.WriteString(" ELSE ")
  198. listCostBuilder.WriteString(AthenaPricingColumn)
  199. listCostBuilder.WriteString(" END")
  200. return fmt.Sprintf("SUM(%s) as list_cost", listCostBuilder.String())
  201. }
  202. func (ai *AthenaIntegration) GetNetCostColumn(allColumns map[string]bool) string {
  203. netCostColumn := ""
  204. if allColumns[AthenaNetPricingColumn] { // if Net pricing exists
  205. netCostColumn = AthenaNetPricingCoalesce
  206. } else { // Non-net for if there's no net pricing.
  207. netCostColumn = AthenaPricingColumn
  208. }
  209. return fmt.Sprintf("SUM(%s) as net_cost", netCostColumn)
  210. }
  211. func (ai *AthenaIntegration) GetAmortizedCostColumn(allColumns map[string]bool) string {
  212. amortizedCostCase := ai.GetAmortizedCostCase(allColumns)
  213. return fmt.Sprintf("SUM(%s) as amortized_cost", amortizedCostCase)
  214. }
  215. func (ai *AthenaIntegration) GetAmortizedNetCostColumn(allColumns map[string]bool) string {
  216. amortizedNetCostCase := ""
  217. if allColumns[AthenaNetPricingColumn] { // if Net pricing exists
  218. amortizedNetCostCase = ai.GetAmortizedNetCostCase(allColumns)
  219. } else { // Non-net for if there's no net pricing.
  220. amortizedNetCostCase = ai.GetAmortizedCostCase(allColumns)
  221. }
  222. return fmt.Sprintf("SUM(%s) as amortized_net_cost", amortizedNetCostCase)
  223. }
  224. func (ai *AthenaIntegration) GetAmortizedCostCase(allColumns map[string]bool) string {
  225. // Use unblended costs if Reserved Instances/Savings Plans aren't in use
  226. if !allColumns[AthenaRIPricingColumn] && !allColumns[AthenaSPPricingColumn] {
  227. return AthenaPricingColumn
  228. }
  229. var costBuilder strings.Builder
  230. costBuilder.WriteString("CASE line_item_line_item_type")
  231. if allColumns[AthenaRIPricingColumn] {
  232. costBuilder.WriteString(" WHEN 'DiscountedUsage' THEN ")
  233. costBuilder.WriteString(AthenaRIPricingColumn)
  234. }
  235. if allColumns[AthenaSPPricingColumn] {
  236. costBuilder.WriteString(" WHEN 'SavingsPlanCoveredUsage' THEN ")
  237. costBuilder.WriteString(AthenaSPPricingColumn)
  238. }
  239. costBuilder.WriteString(" ELSE ")
  240. costBuilder.WriteString(AthenaPricingColumn)
  241. costBuilder.WriteString(" END")
  242. return costBuilder.String()
  243. }
  244. func (ai *AthenaIntegration) GetAmortizedNetCostCase(allColumns map[string]bool) string {
  245. // Use net unblended costs if Reserved Instances/Savings Plans aren't in use
  246. if !allColumns[AthenaNetRIPricingColumn] && !allColumns[AthenaNetSPPricingColumn] {
  247. return AthenaNetPricingCoalesce
  248. }
  249. var costBuilder strings.Builder
  250. costBuilder.WriteString("CASE line_item_line_item_type")
  251. if allColumns[AthenaNetRIPricingColumn] {
  252. costBuilder.WriteString(" WHEN 'DiscountedUsage' THEN ")
  253. costBuilder.WriteString(AthenaNetRIPricingCoalesce)
  254. }
  255. if allColumns[AthenaNetSPPricingColumn] {
  256. costBuilder.WriteString(" WHEN 'SavingsPlanCoveredUsage' THEN ")
  257. costBuilder.WriteString(AthenaNetSPPricingCoalesce)
  258. }
  259. costBuilder.WriteString(" ELSE ")
  260. costBuilder.WriteString(AthenaNetPricingCoalesce)
  261. costBuilder.WriteString(" END")
  262. return costBuilder.String()
  263. }
  264. func (ai *AthenaIntegration) RemoveColumnAliases(columns []string) {
  265. for i, column := range columns {
  266. if strings.Contains(column, " as ") {
  267. columnValues := strings.Split(column, " as ")
  268. columns[i] = columnValues[0]
  269. }
  270. }
  271. }
  272. func (ai *AthenaIntegration) ConvertLabelToAWSTag(label string) string {
  273. // if the label already has the column prefix assume that it is in the correct format
  274. if strings.HasPrefix(label, LabelColumnPrefix) {
  275. return label
  276. }
  277. // replace characters with underscore
  278. tag := label
  279. tag = strings.ReplaceAll(tag, ".", "_")
  280. tag = strings.ReplaceAll(tag, "/", "_")
  281. tag = strings.ReplaceAll(tag, ":", "_")
  282. tag = strings.ReplaceAll(tag, "-", "_")
  283. // add prefix and return
  284. return LabelColumnPrefix + tag
  285. }
  286. // GetIsKubernetesColumn builds a column that determines if a row represents kubernetes spend
  287. func (ai *AthenaIntegration) GetIsKubernetesColumn(allColumns map[string]bool) string {
  288. // tagColumns is a list of columns where the presence of a value indicates that a resource is part of a kubernetes cluster
  289. // Known columns hardcoded for CUR 1.0 and CUR 2.0
  290. tagColumnsIsK8sCUR10 := []string{
  291. "resource_tags_aws_eks_cluster_name",
  292. "resource_tags_user_eks_cluster_name",
  293. "resource_tags_user_alpha_eksctl_io_cluster_name",
  294. "resource_tags_user_kubernetes_io_service_name",
  295. "resource_tags_user_kubernetes_io_created_for_pvc_name",
  296. "resource_tags_user_kubernetes_io_created_for_pv_name",
  297. }
  298. tagColumnsIsK8sCUR20 := []string{
  299. "resource_tags['aws_eks_cluster_name']",
  300. "resource_tags['user_eks_cluster_name']",
  301. "resource_tags['user_alpha_eksctl_io_cluster_name']",
  302. "resource_tags['user_kubernetes_io_service_name']",
  303. "resource_tags['user_kubernetes_io_created_for_pvc_name']",
  304. "resource_tags['user_kubernetes_io_created_for_pv_name']",
  305. }
  306. disjuncts := []string{
  307. "line_item_product_code = 'AmazonEKS'", // EKS is always kubernetes
  308. }
  309. if allColumns[AthenaResourceTagsColumn] {
  310. // if resource tags column is present in the CUR check for IsKubernetes keys in the resource tags map
  311. for _, tagColumn := range tagColumnsIsK8sCUR20 {
  312. disjunctStr := fmt.Sprintf("COALESCE(%s, '') <> ''", tagColumn)
  313. disjuncts = append(disjuncts, disjunctStr)
  314. }
  315. } else {
  316. for _, tagColumn := range tagColumnsIsK8sCUR10 {
  317. // if tag column is present in the CUR check for it
  318. if _, ok := allColumns[tagColumn]; ok {
  319. disjunctStr := fmt.Sprintf("%s <> ''", tagColumn)
  320. disjuncts = append(disjuncts, disjunctStr)
  321. }
  322. }
  323. }
  324. return fmt.Sprintf("(%s) as is_kubernetes", strings.Join(disjuncts, " OR "))
  325. }
  326. func (ai *AthenaIntegration) GetPartitionWhere(start, end time.Time, isCUR20 bool) string {
  327. month := time.Date(start.Year(), start.Month(), 1, 0, 0, 0, 0, time.UTC)
  328. endMonth := time.Date(end.Year(), end.Month(), 1, 0, 0, 0, 0, time.UTC)
  329. var disjuncts []string
  330. for !month.After(endMonth) {
  331. if isCUR20 {
  332. // CUR 2.0 with billing_period partitions
  333. disjuncts = append(disjuncts, fmt.Sprintf("(billing_period = '%d-%02d')", month.Year(), month.Month()))
  334. } else {
  335. // CUR 1.0 uses year and month columns for partitioning
  336. disjuncts = append(disjuncts, fmt.Sprintf("(year = '%d' AND month = '%d')", month.Year(), month.Month()))
  337. }
  338. month = month.AddDate(0, 1, 0)
  339. }
  340. str := fmt.Sprintf("(%s)", strings.Join(disjuncts, " OR "))
  341. return str
  342. }
  343. func athenaRowToCloudCost(row types.Row, aqi AthenaQueryIndexes) (*opencost.CloudCost, error) {
  344. if len(row.Data) < len(aqi.ColumnIndexes) {
  345. return nil, fmt.Errorf("rowToCloudCost: row with fewer than %d columns (has only %d)", len(aqi.ColumnIndexes), len(row.Data))
  346. }
  347. // Iterate through the slice of tag columns, assigning
  348. // values to the column names, minus the tag prefix.
  349. labels := opencost.CloudCostLabels{}
  350. for _, tagColumnName := range aqi.TagColumns {
  351. // remove quotes
  352. labelName := strings.TrimPrefix(tagColumnName, `"`)
  353. labelName = strings.TrimSuffix(labelName, `"`)
  354. // remove prefix
  355. labelName = strings.TrimPrefix(labelName, LabelColumnPrefix)
  356. value := GetAthenaRowValue(row, aqi.ColumnIndexes, tagColumnName)
  357. if value != "" {
  358. labels[labelName] = value
  359. }
  360. }
  361. for _, awsColumnName := range aqi.AWSTagColumns {
  362. // partially remove prefix leaving "aws_"
  363. labelName := strings.TrimPrefix(awsColumnName, AthenaResourceTagPrefix)
  364. value := GetAthenaRowValue(row, aqi.ColumnIndexes, awsColumnName)
  365. if value != "" {
  366. labels[labelName] = value
  367. }
  368. }
  369. if _, ok := aqi.ColumnIndexes[AthenaResourceTagsCastToJsonColumn]; ok {
  370. resourceTags := GetAthenaRowValue(row, aqi.ColumnIndexes, AthenaResourceTagsCastToJsonColumn)
  371. err := json.Unmarshal([]byte(resourceTags), &labels)
  372. if err != nil {
  373. log.Errorf("athenaRowToCloudCost: error unmarshalling resource tags: %s", err.Error())
  374. }
  375. }
  376. invoiceEntityID := GetAthenaRowValue(row, aqi.ColumnIndexes, "bill_payer_account_id")
  377. accountID := GetAthenaRowValue(row, aqi.ColumnIndexes, "line_item_usage_account_id")
  378. invoiceEntityName := invoiceEntityID
  379. accountName := accountID
  380. if _, ok := aqi.ColumnIndexes[AthenaInvoiceEntityNameColumn]; ok {
  381. invoiceEntityName = GetAthenaRowValue(row, aqi.ColumnIndexes, AthenaInvoiceEntityNameColumn)
  382. }
  383. if _, ok := aqi.ColumnIndexes[AthenaAccountNameColumn]; ok {
  384. accountName = GetAthenaRowValue(row, aqi.ColumnIndexes, AthenaAccountNameColumn)
  385. }
  386. startStr := GetAthenaRowValue(row, aqi.ColumnIndexes, AthenaDateTruncColumn)
  387. providerID := GetAthenaRowValue(row, aqi.ColumnIndexes, "line_item_resource_id")
  388. productCode := GetAthenaRowValue(row, aqi.ColumnIndexes, "line_item_product_code")
  389. usageType := GetAthenaRowValue(row, aqi.ColumnIndexes, "line_item_usage_type")
  390. regionCode := GetAthenaRowValue(row, aqi.ColumnIndexes, "product_region_code")
  391. availabilityZone := GetAthenaRowValue(row, aqi.ColumnIndexes, "line_item_availability_zone")
  392. isK8s, _ := strconv.ParseBool(GetAthenaRowValue(row, aqi.ColumnIndexes, aqi.IsK8sColumn))
  393. k8sPct := 0.0
  394. if isK8s {
  395. k8sPct = 1.0
  396. }
  397. listCost, err := GetAthenaRowValueFloat(row, aqi.ColumnIndexes, aqi.ListCostColumn)
  398. if err != nil {
  399. return nil, err
  400. }
  401. netCost, err := GetAthenaRowValueFloat(row, aqi.ColumnIndexes, aqi.NetCostColumn)
  402. if err != nil {
  403. return nil, err
  404. }
  405. amortizedNetCost, err := GetAthenaRowValueFloat(row, aqi.ColumnIndexes, aqi.AmortizedNetCostColumn)
  406. if err != nil {
  407. return nil, err
  408. }
  409. amortizedCost, err := GetAthenaRowValueFloat(row, aqi.ColumnIndexes, aqi.AmortizedCostColumn)
  410. if err != nil {
  411. return nil, err
  412. }
  413. // Identify resource category in the CUR
  414. category := SelectAWSCategory(providerID, usageType, productCode)
  415. // Retrieve final stanza of product code for ProviderID
  416. if productCode == "AWSELB" || productCode == "AmazonFSx" {
  417. providerID = ParseARN(providerID)
  418. }
  419. if productCode == "AmazonEKS" && category == opencost.ComputeCategory {
  420. if strings.Contains(usageType, "CPU") {
  421. providerID = fmt.Sprintf("%s/CPU", providerID)
  422. } else if strings.Contains(usageType, "GB") {
  423. providerID = fmt.Sprintf("%s/RAM", providerID)
  424. }
  425. }
  426. properties := opencost.CloudCostProperties{
  427. ProviderID: providerID,
  428. Provider: opencost.AWSProvider,
  429. AccountID: accountID,
  430. AccountName: accountName,
  431. InvoiceEntityID: invoiceEntityID,
  432. InvoiceEntityName: invoiceEntityName,
  433. RegionID: regionCode,
  434. AvailabilityZone: availabilityZone,
  435. Service: productCode,
  436. Category: category,
  437. Labels: labels,
  438. }
  439. start, err := time.Parse(AthenaDateLayout, startStr)
  440. if err != nil {
  441. return nil, fmt.Errorf("unable to parse %s: '%s'", AthenaDateTruncColumn, err.Error())
  442. }
  443. end := start.AddDate(0, 0, 1)
  444. cc := &opencost.CloudCost{
  445. Properties: &properties,
  446. Window: opencost.NewWindow(&start, &end),
  447. ListCost: opencost.CostMetric{
  448. Cost: listCost,
  449. KubernetesPercent: k8sPct,
  450. },
  451. NetCost: opencost.CostMetric{
  452. Cost: netCost,
  453. KubernetesPercent: k8sPct,
  454. },
  455. AmortizedNetCost: opencost.CostMetric{
  456. Cost: amortizedNetCost,
  457. KubernetesPercent: k8sPct,
  458. },
  459. AmortizedCost: opencost.CostMetric{
  460. Cost: amortizedCost,
  461. KubernetesPercent: k8sPct,
  462. },
  463. InvoicedCost: opencost.CostMetric{
  464. Cost: netCost, // We are using Net Cost for Invoiced Cost for now as it is the closest approximation
  465. KubernetesPercent: k8sPct,
  466. },
  467. }
  468. return cc, nil
  469. }
  470. func (ai *AthenaIntegration) GetConnectionStatusFromResult(result cloud.EmptyChecker, currentStatus cloud.ConnectionStatus) cloud.ConnectionStatus {
  471. if result.IsEmpty() && currentStatus != cloud.SuccessfulConnection {
  472. return cloud.MissingData
  473. }
  474. return cloud.SuccessfulConnection
  475. }
  476. // presence of any of resource_tags, line_item_usage_account_name, or bill_payer_account_name columns confirms CUR 2.0
  477. func isCUR20(allColumns map[string]bool) bool {
  478. return allColumns[AthenaResourceTagsColumn] || allColumns[AthenaAccountNameColumn] || allColumns[AthenaInvoiceEntityNameColumn]
  479. }