athenaquerier.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. package aws
  2. import (
  3. "context"
  4. "fmt"
  5. "regexp"
  6. "strconv"
  7. "strings"
  8. "time"
  9. cloudconfig "github.com/opencost/opencost/pkg/cloud/config"
  10. "github.com/aws/aws-sdk-go-v2/aws"
  11. "github.com/aws/aws-sdk-go-v2/service/athena"
  12. "github.com/aws/aws-sdk-go-v2/service/athena/types"
  13. "github.com/opencost/opencost/pkg/kubecost"
  14. "github.com/opencost/opencost/pkg/log"
  15. "github.com/opencost/opencost/pkg/util/stringutil"
  16. )
  17. type AthenaQuerier struct {
  18. AthenaConfiguration
  19. }
  20. func (aq *AthenaQuerier) Equals(config cloudconfig.Config) bool {
  21. thatConfig, ok := config.(*AthenaQuerier)
  22. if !ok {
  23. return false
  24. }
  25. return aq.AthenaConfiguration.Equals(&thatConfig.AthenaConfiguration)
  26. }
  27. // QueryAthenaPaginated executes athena query and processes results. An error from this method indicates a
  28. // FAILED_CONNECTION CloudConnectionStatus and should immediately stop the caller to maintain the correct CloudConnectionStatus
  29. func (aq *AthenaQuerier) QueryAthenaPaginated(ctx context.Context, query string, fn func(*athena.GetQueryResultsOutput) bool) error {
  30. queryExecutionCtx := &types.QueryExecutionContext{
  31. Database: aws.String(aq.Database),
  32. }
  33. resultConfiguration := &types.ResultConfiguration{
  34. OutputLocation: aws.String(aq.Bucket),
  35. }
  36. startQueryExecutionInput := &athena.StartQueryExecutionInput{
  37. QueryString: aws.String(query),
  38. QueryExecutionContext: queryExecutionCtx,
  39. ResultConfiguration: resultConfiguration,
  40. }
  41. // Only set if there is a value, the default input is nil
  42. if aq.Workgroup != "" {
  43. startQueryExecutionInput.WorkGroup = aws.String(aq.Workgroup)
  44. }
  45. // Create Athena Client
  46. cli, err := aq.AthenaConfiguration.GetAthenaClient()
  47. // Query Athena
  48. startQueryExecutionOutput, err := cli.StartQueryExecution(ctx, startQueryExecutionInput)
  49. if err != nil {
  50. return fmt.Errorf("QueryAthenaPaginated: start query error: %s", err.Error())
  51. }
  52. err = waitForQueryToComplete(ctx, cli, startQueryExecutionOutput.QueryExecutionId)
  53. if err != nil {
  54. return fmt.Errorf("QueryAthenaPaginated: query execution error: %s", err.Error())
  55. }
  56. queryResultsInput := &athena.GetQueryResultsInput{
  57. QueryExecutionId: startQueryExecutionOutput.QueryExecutionId,
  58. }
  59. getQueryResultsPaginator := athena.NewGetQueryResultsPaginator(cli, queryResultsInput)
  60. for getQueryResultsPaginator.HasMorePages() {
  61. pg, err := getQueryResultsPaginator.NextPage(ctx)
  62. if err != nil {
  63. log.Errorf("queryAthenaPaginated: NextPage error: %s", err.Error())
  64. continue
  65. }
  66. fn(pg)
  67. }
  68. return nil
  69. }
  70. func waitForQueryToComplete(ctx context.Context, client *athena.Client, queryExecutionID *string) error {
  71. inp := &athena.GetQueryExecutionInput{
  72. QueryExecutionId: queryExecutionID,
  73. }
  74. isQueryStillRunning := true
  75. for isQueryStillRunning {
  76. qe, err := client.GetQueryExecution(ctx, inp)
  77. if err != nil {
  78. return err
  79. }
  80. if qe.QueryExecution.Status.State == "SUCCEEDED" {
  81. isQueryStillRunning = false
  82. continue
  83. }
  84. if qe.QueryExecution.Status.State != "RUNNING" && qe.QueryExecution.Status.State != "QUEUED" {
  85. return fmt.Errorf("no query results available for query %s", *queryExecutionID)
  86. }
  87. time.Sleep(2 * time.Second)
  88. }
  89. return nil
  90. }
  91. // GetAthenaRowValue retrieve value from athena row based on column names and used stringutil.Bank() to prevent duplicate
  92. // allocation of strings
  93. func GetAthenaRowValue(row types.Row, queryColumnIndexes map[string]int, columnName string) string {
  94. columnIndex, ok := queryColumnIndexes[columnName]
  95. if !ok {
  96. return ""
  97. }
  98. valuePointer := row.Data[columnIndex].VarCharValue
  99. if valuePointer == nil {
  100. return ""
  101. }
  102. return stringutil.Bank(*valuePointer)
  103. }
  104. // getAthenaRowValueFloat retrieve value from athena row based on column names and convert to float if possible
  105. func GetAthenaRowValueFloat(row types.Row, queryColumnIndexes map[string]int, columnName string) (float64, error) {
  106. columnIndex, ok := queryColumnIndexes[columnName]
  107. if !ok {
  108. return 0.0, fmt.Errorf("getAthenaRowValueFloat: missing column index: %s", columnName)
  109. }
  110. valuePointer := row.Data[columnIndex].VarCharValue
  111. if valuePointer == nil {
  112. return 0.0, fmt.Errorf("getAthenaRowValueFloat: nil field")
  113. }
  114. cost, err := strconv.ParseFloat(*valuePointer, 64)
  115. if err != nil {
  116. return cost, fmt.Errorf("getAthenaRowValueFloat: failed to parse %s: '%s': %s", columnName, *valuePointer, err.Error())
  117. }
  118. return cost, nil
  119. }
  120. func SelectAWSCategory(isNode, isVol, isNetwork bool, providerID, service string) string {
  121. // Network has the highest priority and is based on the usage type ending in "Bytes"
  122. if isNetwork {
  123. return kubecost.NetworkCategory
  124. }
  125. // The node and volume conditions are mutually exclusive.
  126. // Provider ID has prefix "i-"
  127. if isNode {
  128. return kubecost.ComputeCategory
  129. }
  130. // Provider ID has prefix "vol-"
  131. if isVol {
  132. return kubecost.StorageCategory
  133. }
  134. // Default categories based on service
  135. switch strings.ToUpper(service) {
  136. case "AWSELB", "AWSGLUE", "AMAZONROUTE53":
  137. return kubecost.NetworkCategory
  138. case "AMAZONEC2", "AWSLAMBDA", "AMAZONELASTICACHE":
  139. return kubecost.ComputeCategory
  140. case "AMAZONEKS":
  141. // Check if line item is a fargate pod
  142. if strings.Contains(providerID, ":pod/") {
  143. return kubecost.ComputeCategory
  144. }
  145. return kubecost.ManagementCategory
  146. case "AMAZONS3", "AMAZONATHENA", "AMAZONRDS", "AMAZONDYNAMODB", "AWSSECRETSMANAGER", "AMAZONFSX":
  147. return kubecost.StorageCategory
  148. default:
  149. return kubecost.OtherCategory
  150. }
  151. }
  152. var parseARNRx = regexp.MustCompile("^.+\\/(.+)?") // Capture "a406f7761142e4ef58a8f2ba478d2db2" from "arn:aws:elasticloadbalancing:us-east-1:297945954695:loadbalancer/a406f7761142e4ef58a8f2ba478d2db2"
  153. func ParseARN(id string) string {
  154. match := parseARNRx.FindStringSubmatch(id)
  155. if len(match) == 0 {
  156. if id != "" {
  157. log.DedupedInfof(10, "aws.parseARN: failed to parse %s", id)
  158. }
  159. return id
  160. }
  161. return match[len(match)-1]
  162. }
  163. func GetAthenaQueryFunc(fn func(types.Row)) func(*athena.GetQueryResultsOutput) bool {
  164. pageNum := 0
  165. processItemQueryResults := func(page *athena.GetQueryResultsOutput) bool {
  166. if page == nil {
  167. log.Errorf("AthenaQuerier: Athena page is nil")
  168. return false
  169. } else if page.ResultSet == nil {
  170. log.Errorf("AthenaQuerier: Athena page.ResultSet is nil")
  171. return false
  172. }
  173. rows := page.ResultSet.Rows
  174. if pageNum == 0 {
  175. rows = page.ResultSet.Rows[1:len(page.ResultSet.Rows)]
  176. }
  177. for _, row := range rows {
  178. fn(row)
  179. }
  180. pageNum++
  181. return true
  182. }
  183. return processItemQueryResults
  184. }