athenaintegration.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514
  1. package aws
  2. import (
  3. "context"
  4. "fmt"
  5. "strconv"
  6. "strings"
  7. "time"
  8. "github.com/aws/aws-sdk-go-v2/service/athena/types"
  9. "github.com/opencost/opencost/core/pkg/log"
  10. "github.com/opencost/opencost/core/pkg/opencost"
  11. "github.com/opencost/opencost/core/pkg/util/timeutil"
  12. "github.com/opencost/opencost/pkg/cloud"
  13. )
  14. const LabelColumnPrefix = "resource_tags_user_"
  15. const AWSLabelColumnPrefix = "resource_tags_aws_"
  16. const AthenaResourceTagPrefix = "resource_tags_"
  17. // athenaDateLayout is the default AWS date format
  18. const AthenaDateLayout = "2006-01-02 15:04:05.000"
  19. // Cost Columns
  20. const AthenaPricingColumn = "line_item_unblended_cost"
  21. // Amortized Cost Columns
  22. const AthenaRIPricingColumn = "reservation_effective_cost"
  23. const AthenaSPPricingColumn = "savings_plan_savings_plan_effective_cost"
  24. // Net Cost Columns
  25. const AthenaNetPricingColumn = "line_item_net_unblended_cost"
  26. var AthenaNetPricingCoalesce = fmt.Sprintf("COALESCE(%s, %s, 0)", AthenaNetPricingColumn, AthenaPricingColumn)
  27. // Amortized Net Cost Columns
  28. const AthenaNetRIPricingColumn = "reservation_net_effective_cost"
  29. var AthenaNetRIPricingCoalesce = fmt.Sprintf("COALESCE(%s, %s, 0)", AthenaNetRIPricingColumn, AthenaRIPricingColumn)
  30. const AthenaNetSPPricingColumn = "savings_plan_net_savings_plan_effective_cost"
  31. var AthenaNetSPPricingCoalesce = fmt.Sprintf("COALESCE(%s, %s, 0)", AthenaNetSPPricingColumn, AthenaSPPricingColumn)
  32. // athenaDateTruncColumn Aggregates line items from the hourly level to daily. "line_item_usage_start_date" is used because at
  33. // all time values 00:00-23:00 it will truncate to the correct date.
  34. const AthenaDateColumn = "line_item_usage_start_date"
  35. const AthenaDateTruncColumn = "DATE_TRUNC('day'," + AthenaDateColumn + ") as usage_date"
  36. const AthenaWhereDateFmt = `line_item_usage_start_date >= date '%s' AND line_item_usage_start_date < date '%s'`
  37. const AthenaWhereUsage = "(line_item_line_item_type = 'Usage' OR line_item_line_item_type = 'DiscountedUsage' OR line_item_line_item_type = 'SavingsPlanCoveredUsage' OR line_item_line_item_type = 'EdpDiscount' OR line_item_line_item_type = 'PrivateRateDiscount')"
  38. // AthenaQueryIndexes is a struct for holding the context of a query
  39. type AthenaQueryIndexes struct {
  40. Query string
  41. ColumnIndexes map[string]int
  42. TagColumns []string
  43. AWSTagColumns []string
  44. ListCostColumn string
  45. NetCostColumn string
  46. AmortizedNetCostColumn string
  47. AmortizedCostColumn string
  48. IsK8sColumn string
  49. }
  50. type AthenaIntegration struct {
  51. AthenaQuerier
  52. }
  53. // Query Athena for CUR data and build a new CloudCostSetRange containing the info
  54. func (ai *AthenaIntegration) GetCloudCost(start, end time.Time) (*opencost.CloudCostSetRange, error) {
  55. return ai.getCloudCost(start, end, 0)
  56. }
  57. func (ai *AthenaIntegration) RefreshStatus() cloud.ConnectionStatus {
  58. end := time.Now().UTC().Truncate(timeutil.Day)
  59. start := end.Add(-3 * timeutil.Day) // lookback 72 hours
  60. // getCloudCost already sets ConnectionStatus in the event there is no error, so we don't need to handle the positive
  61. // case here
  62. _, err := ai.getCloudCost(start, end, 1)
  63. if err != nil {
  64. log.Errorf("AthenaIntegration: RefreshStatus: error while refreshing status: %s", err.Error())
  65. ai.ConnectionStatus = cloud.FailedConnection
  66. }
  67. return ai.ConnectionStatus
  68. }
  69. func (ai *AthenaIntegration) getCloudCost(start, end time.Time, limit int) (*opencost.CloudCostSetRange, error) {
  70. log.Infof("AthenaIntegration[%s]: GetCloudCost: %s", ai.Key(), opencost.NewWindow(&start, &end).String())
  71. // Query for all column names
  72. allColumns, err := ai.GetColumns()
  73. if err != nil {
  74. return nil, fmt.Errorf("GetCloudCost: error getting Athena columns: %w", err)
  75. }
  76. // List known, hard-coded columns to query
  77. groupByColumns := []string{
  78. AthenaDateTruncColumn,
  79. "line_item_resource_id",
  80. "bill_payer_account_id",
  81. "line_item_usage_account_id",
  82. "line_item_product_code",
  83. "line_item_usage_type",
  84. "product_region_code",
  85. "line_item_availability_zone",
  86. }
  87. // Create query indices
  88. aqi := AthenaQueryIndexes{}
  89. // Add is k8s column
  90. isK8sColumn := ai.GetIsKubernetesColumn(allColumns)
  91. groupByColumns = append(groupByColumns, isK8sColumn)
  92. aqi.IsK8sColumn = isK8sColumn
  93. // Determine which columns are user-defined tags and add those to the list
  94. // of columns to query.
  95. for column := range allColumns {
  96. if strings.HasPrefix(column, LabelColumnPrefix) {
  97. quotedTag := fmt.Sprintf(`"%s"`, column)
  98. groupByColumns = append(groupByColumns, quotedTag)
  99. aqi.TagColumns = append(aqi.TagColumns, quotedTag)
  100. }
  101. if strings.HasPrefix(column, AWSLabelColumnPrefix) {
  102. groupByColumns = append(groupByColumns, column)
  103. aqi.AWSTagColumns = append(aqi.AWSTagColumns, column)
  104. }
  105. }
  106. var selectColumns []string
  107. // Duplicate GroupBy Columns into select columns
  108. selectColumns = append(selectColumns, groupByColumns...)
  109. // Clean Up group by columns
  110. ai.RemoveColumnAliases(groupByColumns)
  111. // Build list cost column and add it to the select columns
  112. listCostColumn := ai.GetListCostColumn()
  113. selectColumns = append(selectColumns, listCostColumn)
  114. aqi.ListCostColumn = listCostColumn
  115. // Build net cost column and add it to select columns
  116. netCostColumn := ai.GetNetCostColumn(allColumns)
  117. selectColumns = append(selectColumns, netCostColumn)
  118. aqi.NetCostColumn = netCostColumn
  119. // Build amortized net cost column and add it to select columns
  120. amortizedNetCostColumn := ai.GetAmortizedNetCostColumn(allColumns)
  121. selectColumns = append(selectColumns, amortizedNetCostColumn)
  122. aqi.AmortizedNetCostColumn = amortizedNetCostColumn
  123. // Build Amortized cost column and add it to select columns
  124. amortizedCostColumn := ai.GetAmortizedCostColumn(allColumns)
  125. selectColumns = append(selectColumns, amortizedCostColumn)
  126. aqi.AmortizedCostColumn = amortizedCostColumn
  127. // Build map of query columns to use for parsing query
  128. aqi.ColumnIndexes = map[string]int{}
  129. for i, column := range selectColumns {
  130. aqi.ColumnIndexes[column] = i
  131. }
  132. whereDate := fmt.Sprintf(AthenaWhereDateFmt, start.Format("2006-01-02"), end.Format("2006-01-02"))
  133. wherePartitions := ai.GetPartitionWhere(start, end)
  134. // Query for all line items with a resource_id or from AWS Marketplace, which did not end before
  135. // the range or start after it. This captures all costs with any amount of
  136. // overlap with the range, for which we will only extract the relevant costs
  137. whereConjuncts := []string{
  138. wherePartitions,
  139. whereDate,
  140. AthenaWhereUsage,
  141. }
  142. columnStr := strings.Join(selectColumns, ", ")
  143. whereClause := strings.Join(whereConjuncts, " AND ")
  144. groupByStr := strings.Join(groupByColumns, ", ")
  145. queryStr := `
  146. SELECT %s
  147. FROM "%s"
  148. WHERE %s
  149. GROUP BY %s
  150. `
  151. if limit > 0 {
  152. queryStr = fmt.Sprintf("%s LIMIT %d", queryStr, limit)
  153. }
  154. aqi.Query = fmt.Sprintf(queryStr, columnStr, ai.Table, whereClause, groupByStr)
  155. ccsr, err := opencost.NewCloudCostSetRange(start, end, opencost.AccumulateOptionDay, ai.Key())
  156. if err != nil {
  157. return nil, err
  158. }
  159. // Generate row handling function.
  160. rowHandler := func(row types.Row) {
  161. cc, err2 := athenaRowToCloudCost(row, aqi)
  162. if err2 != nil {
  163. log.Errorf("AthenaIntegration: GetCloudCost: error while parsing row: %s", err2.Error())
  164. return
  165. }
  166. ccsr.LoadCloudCost(cc)
  167. }
  168. log.Debugf("AthenaIntegration[%s]: GetCloudCost: querying: %s", ai.Key(), aqi.Query)
  169. // Query CUR data and fill out CCSR
  170. err = ai.Query(context.TODO(), aqi.Query, GetAthenaQueryFunc(rowHandler))
  171. if err != nil {
  172. return nil, err
  173. }
  174. ai.ConnectionStatus = ai.GetConnectionStatusFromResult(ccsr, ai.ConnectionStatus)
  175. return ccsr, nil
  176. }
  177. func (ai *AthenaIntegration) GetListCostColumn() string {
  178. var listCostBuilder strings.Builder
  179. listCostBuilder.WriteString("CASE line_item_line_item_type")
  180. listCostBuilder.WriteString(" WHEN 'EdpDiscount' THEN 0")
  181. listCostBuilder.WriteString(" WHEN 'PrivateRateDiscount' THEN 0")
  182. listCostBuilder.WriteString(" ELSE ")
  183. listCostBuilder.WriteString(AthenaPricingColumn)
  184. listCostBuilder.WriteString(" END")
  185. return fmt.Sprintf("SUM(%s) as list_cost", listCostBuilder.String())
  186. }
  187. func (ai *AthenaIntegration) GetNetCostColumn(allColumns map[string]bool) string {
  188. netCostColumn := ""
  189. if allColumns[AthenaNetPricingColumn] { // if Net pricing exists
  190. netCostColumn = AthenaNetPricingCoalesce
  191. } else { // Non-net for if there's no net pricing.
  192. netCostColumn = AthenaPricingColumn
  193. }
  194. return fmt.Sprintf("SUM(%s) as net_cost", netCostColumn)
  195. }
  196. func (ai *AthenaIntegration) GetAmortizedCostColumn(allColumns map[string]bool) string {
  197. amortizedCostCase := ai.GetAmortizedCostCase(allColumns)
  198. return fmt.Sprintf("SUM(%s) as amortized_cost", amortizedCostCase)
  199. }
  200. func (ai *AthenaIntegration) GetAmortizedNetCostColumn(allColumns map[string]bool) string {
  201. amortizedNetCostCase := ""
  202. if allColumns[AthenaNetPricingColumn] { // if Net pricing exists
  203. amortizedNetCostCase = ai.GetAmortizedNetCostCase(allColumns)
  204. } else { // Non-net for if there's no net pricing.
  205. amortizedNetCostCase = ai.GetAmortizedCostCase(allColumns)
  206. }
  207. return fmt.Sprintf("SUM(%s) as amortized_net_cost", amortizedNetCostCase)
  208. }
  209. func (ai *AthenaIntegration) GetAmortizedCostCase(allColumns map[string]bool) string {
  210. // Use unblended costs if Reserved Instances/Savings Plans aren't in use
  211. if !allColumns[AthenaRIPricingColumn] && !allColumns[AthenaSPPricingColumn] {
  212. return AthenaPricingColumn
  213. }
  214. var costBuilder strings.Builder
  215. costBuilder.WriteString("CASE line_item_line_item_type")
  216. if allColumns[AthenaRIPricingColumn] {
  217. costBuilder.WriteString(" WHEN 'DiscountedUsage' THEN ")
  218. costBuilder.WriteString(AthenaRIPricingColumn)
  219. }
  220. if allColumns[AthenaSPPricingColumn] {
  221. costBuilder.WriteString(" WHEN 'SavingsPlanCoveredUsage' THEN ")
  222. costBuilder.WriteString(AthenaSPPricingColumn)
  223. }
  224. costBuilder.WriteString(" ELSE ")
  225. costBuilder.WriteString(AthenaPricingColumn)
  226. costBuilder.WriteString(" END")
  227. return costBuilder.String()
  228. }
  229. func (ai *AthenaIntegration) GetAmortizedNetCostCase(allColumns map[string]bool) string {
  230. // Use net unblended costs if Reserved Instances/Savings Plans aren't in use
  231. if !allColumns[AthenaNetRIPricingColumn] && !allColumns[AthenaNetSPPricingColumn] {
  232. return AthenaNetPricingCoalesce
  233. }
  234. var costBuilder strings.Builder
  235. costBuilder.WriteString("CASE line_item_line_item_type")
  236. if allColumns[AthenaNetRIPricingColumn] {
  237. costBuilder.WriteString(" WHEN 'DiscountedUsage' THEN ")
  238. costBuilder.WriteString(AthenaNetRIPricingCoalesce)
  239. }
  240. if allColumns[AthenaNetSPPricingColumn] {
  241. costBuilder.WriteString(" WHEN 'SavingsPlanCoveredUsage' THEN ")
  242. costBuilder.WriteString(AthenaNetSPPricingCoalesce)
  243. }
  244. costBuilder.WriteString(" ELSE ")
  245. costBuilder.WriteString(AthenaNetPricingCoalesce)
  246. costBuilder.WriteString(" END")
  247. return costBuilder.String()
  248. }
  249. func (ai *AthenaIntegration) RemoveColumnAliases(columns []string) {
  250. for i, column := range columns {
  251. if strings.Contains(column, " as ") {
  252. columnValues := strings.Split(column, " as ")
  253. columns[i] = columnValues[0]
  254. }
  255. }
  256. }
  257. func (ai *AthenaIntegration) ConvertLabelToAWSTag(label string) string {
  258. // if the label already has the column prefix assume that it is in the correct format
  259. if strings.HasPrefix(label, LabelColumnPrefix) {
  260. return label
  261. }
  262. // replace characters with underscore
  263. tag := label
  264. tag = strings.ReplaceAll(tag, ".", "_")
  265. tag = strings.ReplaceAll(tag, "/", "_")
  266. tag = strings.ReplaceAll(tag, ":", "_")
  267. tag = strings.ReplaceAll(tag, "-", "_")
  268. // add prefix and return
  269. return LabelColumnPrefix + tag
  270. }
  271. // GetIsKubernetesColumn builds a column that determines if a row represents kubernetes spend
  272. func (ai *AthenaIntegration) GetIsKubernetesColumn(allColumns map[string]bool) string {
  273. disjuncts := []string{
  274. "line_item_product_code = 'AmazonEKS'", // EKS is always kubernetes
  275. }
  276. // tagColumns is a list of columns where the presence of a value indicates that a resource is part of a kubernetes cluster
  277. tagColumns := []string{
  278. "resource_tags_aws_eks_cluster_name",
  279. "resource_tags_user_eks_cluster_name",
  280. "resource_tags_user_alpha_eksctl_io_cluster_name",
  281. "resource_tags_user_kubernetes_io_service_name",
  282. "resource_tags_user_kubernetes_io_created_for_pvc_name",
  283. "resource_tags_user_kubernetes_io_created_for_pv_name",
  284. }
  285. for _, tagColumn := range tagColumns {
  286. // if tag column is present in the CUR check for it
  287. if _, ok := allColumns[tagColumn]; ok {
  288. disjunctStr := fmt.Sprintf("%s <> ''", tagColumn)
  289. disjuncts = append(disjuncts, disjunctStr)
  290. }
  291. }
  292. return fmt.Sprintf("(%s) as is_kubernetes", strings.Join(disjuncts, " OR "))
  293. }
  294. func (ai *AthenaIntegration) GetPartitionWhere(start, end time.Time) string {
  295. month := time.Date(start.Year(), start.Month(), 1, 0, 0, 0, 0, time.UTC)
  296. endMonth := time.Date(end.Year(), end.Month(), 1, 0, 0, 0, 0, time.UTC)
  297. var disjuncts []string
  298. // For CUR 2.0, check if billing_period partitions actually exist
  299. useBillingPeriodPartitions := false
  300. if ai.CURVersion != "1.0" {
  301. // Check if billing_period partitions exist in the table
  302. if hasBillingPeriod, err := ai.HasBillingPeriodPartitions(); err == nil && hasBillingPeriod {
  303. useBillingPeriodPartitions = true
  304. }
  305. }
  306. for !month.After(endMonth) {
  307. if ai.CURVersion == "1.0" {
  308. // CUR 1.0 uses year and month columns for partitioning
  309. disjuncts = append(disjuncts, fmt.Sprintf("(year = '%d' AND month = '%d')", month.Year(), month.Month()))
  310. } else if useBillingPeriodPartitions {
  311. // CUR 2.0 with billing_period partitions
  312. disjuncts = append(disjuncts, fmt.Sprintf("(billing_period = '%d-%02d')", month.Year(), month.Month()))
  313. } else {
  314. // CUR 2.0 fallback - use date_format functions (less efficient but works without partitions)
  315. disjuncts = append(disjuncts, fmt.Sprintf("(date_format(line_item_usage_start_date, '%%Y') = '%d' AND date_format(line_item_usage_start_date, '%%m') = '%02d')",
  316. month.Year(), month.Month()))
  317. }
  318. month = month.AddDate(0, 1, 0)
  319. }
  320. str := fmt.Sprintf("(%s)", strings.Join(disjuncts, " OR "))
  321. return str
  322. }
  323. func athenaRowToCloudCost(row types.Row, aqi AthenaQueryIndexes) (*opencost.CloudCost, error) {
  324. if len(row.Data) < len(aqi.ColumnIndexes) {
  325. return nil, fmt.Errorf("rowToCloudCost: row with fewer than %d columns (has only %d)", len(aqi.ColumnIndexes), len(row.Data))
  326. }
  327. // Iterate through the slice of tag columns, assigning
  328. // values to the column names, minus the tag prefix.
  329. labels := opencost.CloudCostLabels{}
  330. for _, tagColumnName := range aqi.TagColumns {
  331. // remove quotes
  332. labelName := strings.TrimPrefix(tagColumnName, `"`)
  333. labelName = strings.TrimSuffix(labelName, `"`)
  334. // remove prefix
  335. labelName = strings.TrimPrefix(labelName, LabelColumnPrefix)
  336. value := GetAthenaRowValue(row, aqi.ColumnIndexes, tagColumnName)
  337. if value != "" {
  338. labels[labelName] = value
  339. }
  340. }
  341. for _, awsColumnName := range aqi.AWSTagColumns {
  342. // partially remove prefix leaving "aws_"
  343. labelName := strings.TrimPrefix(awsColumnName, AthenaResourceTagPrefix)
  344. value := GetAthenaRowValue(row, aqi.ColumnIndexes, awsColumnName)
  345. if value != "" {
  346. labels[labelName] = value
  347. }
  348. }
  349. invoiceEntityID := GetAthenaRowValue(row, aqi.ColumnIndexes, "bill_payer_account_id")
  350. accountID := GetAthenaRowValue(row, aqi.ColumnIndexes, "line_item_usage_account_id")
  351. startStr := GetAthenaRowValue(row, aqi.ColumnIndexes, AthenaDateTruncColumn)
  352. providerID := GetAthenaRowValue(row, aqi.ColumnIndexes, "line_item_resource_id")
  353. productCode := GetAthenaRowValue(row, aqi.ColumnIndexes, "line_item_product_code")
  354. usageType := GetAthenaRowValue(row, aqi.ColumnIndexes, "line_item_usage_type")
  355. regionCode := GetAthenaRowValue(row, aqi.ColumnIndexes, "product_region_code")
  356. availabilityZone := GetAthenaRowValue(row, aqi.ColumnIndexes, "line_item_availability_zone")
  357. isK8s, _ := strconv.ParseBool(GetAthenaRowValue(row, aqi.ColumnIndexes, aqi.IsK8sColumn))
  358. k8sPct := 0.0
  359. if isK8s {
  360. k8sPct = 1.0
  361. }
  362. listCost, err := GetAthenaRowValueFloat(row, aqi.ColumnIndexes, aqi.ListCostColumn)
  363. if err != nil {
  364. return nil, err
  365. }
  366. netCost, err := GetAthenaRowValueFloat(row, aqi.ColumnIndexes, aqi.NetCostColumn)
  367. if err != nil {
  368. return nil, err
  369. }
  370. amortizedNetCost, err := GetAthenaRowValueFloat(row, aqi.ColumnIndexes, aqi.AmortizedNetCostColumn)
  371. if err != nil {
  372. return nil, err
  373. }
  374. amortizedCost, err := GetAthenaRowValueFloat(row, aqi.ColumnIndexes, aqi.AmortizedCostColumn)
  375. if err != nil {
  376. return nil, err
  377. }
  378. // Identify resource category in the CUR
  379. category := SelectAWSCategory(providerID, usageType, productCode)
  380. // Retrieve final stanza of product code for ProviderID
  381. if productCode == "AWSELB" || productCode == "AmazonFSx" {
  382. providerID = ParseARN(providerID)
  383. }
  384. if productCode == "AmazonEKS" && category == opencost.ComputeCategory {
  385. if strings.Contains(usageType, "CPU") {
  386. providerID = fmt.Sprintf("%s/CPU", providerID)
  387. } else if strings.Contains(usageType, "GB") {
  388. providerID = fmt.Sprintf("%s/RAM", providerID)
  389. }
  390. }
  391. properties := opencost.CloudCostProperties{
  392. ProviderID: providerID,
  393. Provider: opencost.AWSProvider,
  394. AccountID: accountID,
  395. AccountName: accountID,
  396. InvoiceEntityID: invoiceEntityID,
  397. InvoiceEntityName: invoiceEntityID,
  398. RegionID: regionCode,
  399. AvailabilityZone: availabilityZone,
  400. Service: productCode,
  401. Category: category,
  402. Labels: labels,
  403. }
  404. start, err := time.Parse(AthenaDateLayout, startStr)
  405. if err != nil {
  406. return nil, fmt.Errorf("unable to parse %s: '%s'", AthenaDateTruncColumn, err.Error())
  407. }
  408. end := start.AddDate(0, 0, 1)
  409. cc := &opencost.CloudCost{
  410. Properties: &properties,
  411. Window: opencost.NewWindow(&start, &end),
  412. ListCost: opencost.CostMetric{
  413. Cost: listCost,
  414. KubernetesPercent: k8sPct,
  415. },
  416. NetCost: opencost.CostMetric{
  417. Cost: netCost,
  418. KubernetesPercent: k8sPct,
  419. },
  420. AmortizedNetCost: opencost.CostMetric{
  421. Cost: amortizedNetCost,
  422. KubernetesPercent: k8sPct,
  423. },
  424. AmortizedCost: opencost.CostMetric{
  425. Cost: amortizedCost,
  426. KubernetesPercent: k8sPct,
  427. },
  428. InvoicedCost: opencost.CostMetric{
  429. Cost: netCost, // We are using Net Cost for Invoiced Cost for now as it is the closest approximation
  430. KubernetesPercent: k8sPct,
  431. },
  432. }
  433. return cc, nil
  434. }
  435. func (ai *AthenaIntegration) GetConnectionStatusFromResult(result cloud.EmptyChecker, currentStatus cloud.ConnectionStatus) cloud.ConnectionStatus {
  436. if result.IsEmpty() && currentStatus != cloud.SuccessfulConnection {
  437. return cloud.MissingData
  438. }
  439. return cloud.SuccessfulConnection
  440. }