s3selectintegration.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363
  1. package aws
  2. import (
  3. "encoding/csv"
  4. "fmt"
  5. "io"
  6. "strings"
  7. "time"
  8. "github.com/opencost/opencost/core/pkg/log"
  9. "github.com/opencost/opencost/core/pkg/opencost"
  10. )
  11. const S3SelectDateLayout = "2006-01-02T15:04:05Z"
  12. // S3Object is aliased as "s" in queries
  13. const S3SelectBillPayerAccountID = `s."bill/PayerAccountId"`
  14. const S3SelectAccountID = `s."lineItem/UsageAccountId"`
  15. const S3SelectItemType = `s."lineItem/LineItemType"`
  16. const S3SelectStartDate = `s."lineItem/UsageStartDate"`
  17. const S3SelectProductCode = `s."lineItem/ProductCode"`
  18. const S3SelectResourceID = `s."lineItem/ResourceId"`
  19. const S3SelectUsageType = `s."lineItem/UsageType"`
  20. const S3SelectRegionCode = `s."product/regionCode"`
  21. const S3SelectAvailabilityZone = `s."lineItem/AvailabilityZone"`
  22. const S3SelectListCost = `s."lineItem/UnblendedCost"`
  23. const S3SelectNetCost = `s."lineItem/NetUnblendedCost"`
  24. // These two may be used for Amortized<Net>Cost
  25. const S3SelectRICost = `s."reservation/EffectiveCost"`
  26. const S3SelectSPCost = `s."savingsPlan/SavingsPlanEffectiveCost"`
  27. const S3SelectNetRICost = `s."reservation/NetEffectiveCost"`
  28. const S3SelectNetSPCost = `s."savingsPlan/NetSavingsPlanEffectiveCost"`
  29. const S3SelectUserLabelPrefix = "resourceTags/user:"
  30. const S3SelectAWSLabelPrefix = "resourceTags/aws:"
  31. const S3SelectResourceTagsPrefix = "resourceTags/"
  32. type S3SelectIntegration struct {
  33. S3SelectQuerier
  34. }
  35. func (s3si *S3SelectIntegration) GetCloudCost(
  36. start,
  37. end time.Time,
  38. ) (*opencost.CloudCostSetRange, error) {
  39. log.Infof(
  40. "S3SelectIntegration[%s]: GetCloudCost: %s",
  41. s3si.Key(),
  42. opencost.NewWindow(&start, &end).String(),
  43. )
  44. // ccsr to populate with cloudcosts.
  45. ccsr, err := opencost.NewCloudCostSetRange(
  46. start,
  47. end,
  48. opencost.AccumulateOptionDay,
  49. s3si.Key(),
  50. )
  51. if err != nil {
  52. return nil, err
  53. }
  54. // acquire S3 client
  55. client, err := s3si.GetS3Client()
  56. if err != nil {
  57. return nil, err
  58. }
  59. // Acquire query keys
  60. queryKeys, err := s3si.GetQueryKeys(start, end, client)
  61. if err != nil {
  62. return nil, err
  63. }
  64. // Acquire headers
  65. headers, err := s3si.GetHeaders(queryKeys[0], client)
  66. if err != nil {
  67. return nil, err
  68. }
  69. allColumns := map[string]struct{}{}
  70. for _, header := range headers {
  71. allColumns[header] = struct{}{}
  72. }
  73. formattedStart := start.Format("2006-01-02")
  74. formattedEnd := end.Format("2006-01-02")
  75. selectColumns := []string{
  76. S3SelectStartDate,
  77. S3SelectBillPayerAccountID,
  78. S3SelectAccountID,
  79. S3SelectResourceID,
  80. S3SelectItemType,
  81. S3SelectProductCode,
  82. S3SelectUsageType,
  83. S3SelectRegionCode,
  84. S3SelectAvailabilityZone,
  85. S3SelectListCost,
  86. }
  87. _, checkNet := allColumns[S3SelectNetCost]
  88. if checkNet {
  89. selectColumns = append(selectColumns, S3SelectNetCost)
  90. }
  91. // Check for Reservation columns in CUR and query if available
  92. _, checkReservations := allColumns[S3SelectRICost]
  93. if checkReservations {
  94. selectColumns = append(selectColumns, S3SelectRICost)
  95. }
  96. _, checkNetReservations := allColumns[S3SelectNetRICost]
  97. if checkNetReservations {
  98. selectColumns = append(selectColumns, S3SelectNetRICost)
  99. }
  100. // Check for Savings Plan Columns in CUR and query if available
  101. _, checkSavingsPlan := allColumns[S3SelectSPCost]
  102. if checkSavingsPlan {
  103. selectColumns = append(selectColumns, S3SelectSPCost)
  104. }
  105. _, checkNetSavingsPlan := allColumns[S3SelectNetSPCost]
  106. if checkNetSavingsPlan {
  107. selectColumns = append(selectColumns, S3SelectNetSPCost)
  108. }
  109. // Determine which columns are user-defined tags and add those to the list
  110. // of columns to query.
  111. labelColumns := []string{}
  112. awsLabelColumns := []string{}
  113. for column := range allColumns {
  114. if strings.HasPrefix(column, S3SelectUserLabelPrefix) {
  115. quotedTag := fmt.Sprintf(`s."%s"`, column)
  116. selectColumns = append(selectColumns, quotedTag)
  117. labelColumns = append(labelColumns, quotedTag)
  118. }
  119. if strings.HasPrefix(column, S3SelectAWSLabelPrefix) {
  120. quotedTag := fmt.Sprintf(`s."%s"`, column)
  121. selectColumns = append(selectColumns, quotedTag)
  122. awsLabelColumns = append(awsLabelColumns, quotedTag)
  123. }
  124. }
  125. // Build map of query columns to use for parsing query
  126. columnIndexes := map[string]int{}
  127. for i, column := range selectColumns {
  128. columnIndexes[column] = i
  129. }
  130. // Build query
  131. selectStr := strings.Join(selectColumns, ", ")
  132. queryStr := `SELECT %s FROM s3object s
  133. WHERE (CAST(s."lineItem/UsageStartDate" AS TIMESTAMP) BETWEEN CAST('%s' AS TIMESTAMP) AND CAST('%s' AS TIMESTAMP))
  134. AND (s."lineItem/LineItemType" = 'Usage' OR s."lineItem/LineItemType" = 'DiscountedUsage' OR s."lineItem/LineItemType" = 'SavingsPlanCoveredUsage' OR s."lineItem/LineItemType" = 'EdpDiscount' OR s."lineItem/LineItemType" = 'PrivateRateDiscount')
  135. `
  136. query := fmt.Sprintf(queryStr, selectStr, formattedStart, formattedEnd)
  137. processResults := func(reader *csv.Reader) error {
  138. _, err2 := reader.Read()
  139. if err2 == io.EOF {
  140. return nil
  141. }
  142. for {
  143. row, err3 := reader.Read()
  144. if err3 == io.EOF {
  145. return nil
  146. }
  147. startStr := GetCSVRowValue(row, columnIndexes, S3SelectStartDate)
  148. billPayerAccountID := GetCSVRowValue(row, columnIndexes, S3SelectBillPayerAccountID)
  149. itemAccountID := GetCSVRowValue(row, columnIndexes, S3SelectAccountID)
  150. itemProviderID := GetCSVRowValue(row, columnIndexes, S3SelectResourceID)
  151. lineItemType := GetCSVRowValue(row, columnIndexes, S3SelectItemType)
  152. itemProductCode := GetCSVRowValue(row, columnIndexes, S3SelectProductCode)
  153. usageType := GetCSVRowValue(row, columnIndexes, S3SelectUsageType)
  154. regionCode := GetCSVRowValue(row, columnIndexes, S3SelectRegionCode)
  155. availabilityZone := GetCSVRowValue(row, columnIndexes, S3SelectAvailabilityZone)
  156. // Iterate through the slice of tag columns, assigning
  157. // values to the column names, minus the tag prefix.
  158. labels := opencost.CloudCostLabels{}
  159. for _, labelColumnName := range labelColumns {
  160. // remove quotes
  161. labelName := strings.TrimPrefix(labelColumnName, `s."`)
  162. labelName = strings.TrimSuffix(labelName, `"`)
  163. // remove prefix
  164. labelName = strings.TrimPrefix(labelName, S3SelectUserLabelPrefix)
  165. value := GetCSVRowValue(row, columnIndexes, labelColumnName)
  166. if value != "" {
  167. labels[labelName] = value
  168. }
  169. }
  170. for _, awsLabelColumnName := range awsLabelColumns {
  171. // remove quotes
  172. labelName := strings.TrimPrefix(awsLabelColumnName, `s."`)
  173. labelName = strings.TrimSuffix(labelName, `"`)
  174. // partially remove prefix leaving "aws:"
  175. labelName = strings.TrimPrefix(labelName, S3SelectResourceTagsPrefix)
  176. value := GetCSVRowValue(row, columnIndexes, awsLabelColumnName)
  177. if value != "" {
  178. labels[labelName] = value
  179. }
  180. }
  181. isKubernetes := 0.0
  182. if itemProductCode == "AmazonEKS" || hasK8sLabel(labels) {
  183. isKubernetes = 1.0
  184. }
  185. var (
  186. amortizedCost float64
  187. amortizedNetCost float64
  188. listCost float64
  189. netCost float64
  190. )
  191. // Get list and net costs
  192. if lineItemType != "EdpDiscount" && lineItemType != "PrivateRateDiscount" {
  193. listCost, err = GetCSVRowValueFloat(row, columnIndexes, S3SelectListCost)
  194. if err != nil {
  195. return err
  196. }
  197. }
  198. // Get net cost if available
  199. netCost = listCost
  200. if checkNet {
  201. netCost, err = GetCSVRowValueFloat(row, columnIndexes, S3SelectNetCost)
  202. if err != nil {
  203. return err
  204. }
  205. }
  206. // If there is a reservation_reservation_a_r_n on the line item use the awsRIPricingSUMColumn as cost
  207. amortizedCost = listCost
  208. amortizedNetCost = listCost
  209. if lineItemType == "DiscountedUsage" {
  210. if checkReservations {
  211. amortizedCost, err = GetCSVRowValueFloat(row, columnIndexes, S3SelectRICost)
  212. if err != nil {
  213. log.Errorf(err.Error())
  214. continue
  215. }
  216. amortizedNetCost = amortizedCost
  217. }
  218. if checkNetReservations {
  219. amortizedNetCost, err = GetCSVRowValueFloat(row, columnIndexes, S3SelectNetRICost)
  220. if err != nil {
  221. log.Errorf(err.Error())
  222. continue
  223. }
  224. }
  225. // If there is a lineItemType of SavingsPlanCoveredUsage use the awsSPPricingSUMColumn
  226. } else if lineItemType == "SavingsPlanCoveredUsage" {
  227. if checkSavingsPlan {
  228. amortizedCost, err = GetCSVRowValueFloat(row, columnIndexes, S3SelectSPCost)
  229. if err != nil {
  230. log.Errorf(err.Error())
  231. continue
  232. }
  233. amortizedNetCost = amortizedCost
  234. }
  235. if checkNetSavingsPlan {
  236. amortizedNetCost, err = GetCSVRowValueFloat(row, columnIndexes, S3SelectNetSPCost)
  237. if err != nil {
  238. log.Errorf(err.Error())
  239. continue
  240. }
  241. }
  242. }
  243. category := SelectAWSCategory(itemProviderID, usageType, itemProductCode)
  244. // Retrieve final stanza of product code for ProviderID
  245. if itemProductCode == "AWSELB" || itemProductCode == "AmazonFSx" {
  246. itemProviderID = ParseARN(itemProviderID)
  247. }
  248. properties := opencost.CloudCostProperties{}
  249. properties.Provider = opencost.AWSProvider
  250. properties.InvoiceEntityID = billPayerAccountID
  251. properties.InvoiceEntityName = billPayerAccountID
  252. properties.AccountID = itemAccountID
  253. properties.AccountName = itemAccountID
  254. properties.Category = category
  255. properties.Service = itemProductCode
  256. properties.ProviderID = itemProviderID
  257. properties.RegionID = regionCode
  258. properties.AvailabilityZone = availabilityZone
  259. properties.Labels = labels
  260. itemStart, err := time.Parse(S3SelectDateLayout, startStr)
  261. if err != nil {
  262. log.Infof(
  263. "Unable to parse '%s': '%s'",
  264. S3SelectStartDate,
  265. err.Error(),
  266. )
  267. itemStart = time.Now()
  268. }
  269. itemStart = itemStart.Truncate(time.Hour * 24)
  270. itemEnd := itemStart.AddDate(0, 0, 1)
  271. cc := &opencost.CloudCost{
  272. Properties: &properties,
  273. Window: opencost.NewWindow(&itemStart, &itemEnd),
  274. ListCost: opencost.CostMetric{
  275. Cost: listCost,
  276. KubernetesPercent: isKubernetes,
  277. },
  278. NetCost: opencost.CostMetric{
  279. Cost: netCost,
  280. KubernetesPercent: isKubernetes,
  281. },
  282. AmortizedNetCost: opencost.CostMetric{
  283. Cost: amortizedCost,
  284. KubernetesPercent: isKubernetes,
  285. },
  286. AmortizedCost: opencost.CostMetric{
  287. Cost: amortizedNetCost,
  288. KubernetesPercent: isKubernetes,
  289. },
  290. InvoicedCost: opencost.CostMetric{
  291. Cost: netCost,
  292. KubernetesPercent: isKubernetes,
  293. },
  294. }
  295. ccsr.LoadCloudCost(cc)
  296. }
  297. }
  298. err = s3si.Query(query, queryKeys, client, processResults)
  299. if err != nil {
  300. return nil, err
  301. }
  302. return ccsr, nil
  303. }
  304. const (
  305. TagAWSEKSClusterName = "aws:eks:cluster-name"
  306. TagEKSClusterName = "eks:cluster-name"
  307. TagEKSCtlClusterName = "alpha.eksctl.io/cluster-name"
  308. TagKubernetesServiceName = "kubernetes.io/service-name"
  309. TagKubernetesPVCName = "kubernetes.io/created-for/pvc/name"
  310. TagKubernetesPVName = "kubernetes.io/created-for/pv/name"
  311. )
  312. // hsK8sLabel checks if the labels contain a k8s label
  313. func hasK8sLabel(labels opencost.CloudCostLabels) bool {
  314. if _, ok := labels[TagAWSEKSClusterName]; ok {
  315. return true
  316. }
  317. if _, ok := labels[TagEKSClusterName]; ok {
  318. return true
  319. }
  320. if _, ok := labels[TagEKSCtlClusterName]; ok {
  321. return true
  322. }
  323. if _, ok := labels[TagKubernetesServiceName]; ok {
  324. return true
  325. }
  326. if _, ok := labels[TagKubernetesPVCName]; ok {
  327. return true
  328. }
  329. if _, ok := labels[TagKubernetesPVName]; ok {
  330. return true
  331. }
  332. return false
  333. }