s3selectintegration.go 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332
  1. package aws
  2. import (
  3. "encoding/csv"
  4. "fmt"
  5. "io"
  6. "strings"
  7. "time"
  8. "github.com/opencost/opencost/core/pkg/log"
  9. "github.com/opencost/opencost/core/pkg/opencost"
  10. )
  11. const S3SelectDateLayout = "2006-01-02T15:04:05Z"
  12. // S3Object is aliased as "s" in queries
  13. const S3SelectBillPayerAccountID = `s."bill/PayerAccountId"`
  14. const S3SelectAccountID = `s."lineItem/UsageAccountId"`
  15. const S3SelectItemType = `s."lineItem/LineItemType"`
  16. const S3SelectStartDate = `s."lineItem/UsageStartDate"`
  17. const S3SelectProductCode = `s."lineItem/ProductCode"`
  18. const S3SelectResourceID = `s."lineItem/ResourceId"`
  19. const S3SelectUsageType = `s."lineItem/UsageType"`
  20. const S3SelectRegionCode = `s."product/regionCode"`
  21. const S3SelectAvailabilityZone = `s."lineItem/AvailabilityZone"`
  22. const S3SelectListCost = `s."lineItem/UnblendedCost"`
  23. const S3SelectNetCost = `s."lineItem/NetUnblendedCost"`
  24. // These two may be used for Amortized<Net>Cost
  25. const S3SelectRICost = `s."reservation/EffectiveCost"`
  26. const S3SelectSPCost = `s."savingsPlan/SavingsPlanEffectiveCost"`
  27. const S3SelectNetRICost = `s."reservation/NetEffectiveCost"`
  28. const S3SelectNetSPCost = `s."savingsPlan/NetSavingsPlanEffectiveCost"`
  29. const S3SelectUserLabelPrefix = "resourceTags/user:"
  30. type S3SelectIntegration struct {
  31. S3SelectQuerier
  32. }
  33. func (s3si *S3SelectIntegration) GetCloudCost(
  34. start,
  35. end time.Time,
  36. ) (*opencost.CloudCostSetRange, error) {
  37. log.Infof(
  38. "S3SelectIntegration[%s]: GetCloudCost: %s",
  39. s3si.Key(),
  40. opencost.NewWindow(&start, &end).String(),
  41. )
  42. // ccsr to populate with cloudcosts.
  43. ccsr, err := opencost.NewCloudCostSetRange(
  44. start,
  45. end,
  46. opencost.AccumulateOptionDay,
  47. s3si.Key(),
  48. )
  49. if err != nil {
  50. return nil, err
  51. }
  52. // acquire S3 client
  53. client, err := s3si.GetS3Client()
  54. if err != nil {
  55. return nil, err
  56. }
  57. // Acquire query keys
  58. queryKeys, err := s3si.GetQueryKeys(start, end, client)
  59. if err != nil {
  60. return nil, err
  61. }
  62. // Acquire headers
  63. headers, err := s3si.GetHeaders(queryKeys[0], client)
  64. if err != nil {
  65. return nil, err
  66. }
  67. allColumns := map[string]struct{}{}
  68. for _, header := range headers {
  69. allColumns[header] = struct{}{}
  70. }
  71. formattedStart := start.Format("2006-01-02")
  72. formattedEnd := end.Format("2006-01-02")
  73. selectColumns := []string{
  74. S3SelectStartDate,
  75. S3SelectBillPayerAccountID,
  76. S3SelectAccountID,
  77. S3SelectResourceID,
  78. S3SelectItemType,
  79. S3SelectProductCode,
  80. S3SelectUsageType,
  81. S3SelectRegionCode,
  82. S3SelectAvailabilityZone,
  83. S3SelectListCost,
  84. }
  85. _, checkNet := allColumns[S3SelectNetCost]
  86. if checkNet {
  87. selectColumns = append(selectColumns, S3SelectNetCost)
  88. }
  89. // Check for Reservation columns in CUR and query if available
  90. _, checkReservations := allColumns[S3SelectRICost]
  91. if checkReservations {
  92. selectColumns = append(selectColumns, S3SelectRICost)
  93. }
  94. _, checkNetReservations := allColumns[S3SelectNetRICost]
  95. if checkNetReservations {
  96. selectColumns = append(selectColumns, S3SelectNetRICost)
  97. }
  98. // Check for Savings Plan Columns in CUR and query if available
  99. _, checkSavingsPlan := allColumns[S3SelectSPCost]
  100. if checkSavingsPlan {
  101. selectColumns = append(selectColumns, S3SelectSPCost)
  102. }
  103. _, checkNetSavingsPlan := allColumns[S3SelectNetSPCost]
  104. if checkNetSavingsPlan {
  105. selectColumns = append(selectColumns, S3SelectNetSPCost)
  106. }
  107. // Determine which columns are user-defined tags and add those to the list
  108. // of columns to query.
  109. labelColumns := []string{}
  110. for column := range allColumns {
  111. if strings.HasPrefix(column, S3SelectUserLabelPrefix) {
  112. quotedTag := fmt.Sprintf(`s."%s"`, column)
  113. selectColumns = append(selectColumns, quotedTag)
  114. labelColumns = append(labelColumns, quotedTag)
  115. }
  116. }
  117. // Build map of query columns to use for parsing query
  118. columnIndexes := map[string]int{}
  119. for i, column := range selectColumns {
  120. columnIndexes[column] = i
  121. }
  122. // Build query
  123. selectStr := strings.Join(selectColumns, ", ")
  124. queryStr := `SELECT %s FROM s3object s
  125. WHERE (CAST(s."lineItem/UsageStartDate" AS TIMESTAMP) BETWEEN CAST('%s' AS TIMESTAMP) AND CAST('%s' AS TIMESTAMP))
  126. AND (s."lineItem/LineItemType" = 'Usage' OR s."lineItem/LineItemType" = 'DiscountedUsage' OR s."lineItem/LineItemType" = 'SavingsPlanCoveredUsage' OR s."lineItem/LineItemType" = 'EdpDiscount' OR s."lineItem/LineItemType" = 'PrivateRateDiscount')
  127. `
  128. query := fmt.Sprintf(queryStr, selectStr, formattedStart, formattedEnd)
  129. processResults := func(reader *csv.Reader) error {
  130. _, err2 := reader.Read()
  131. if err2 == io.EOF {
  132. return nil
  133. }
  134. for {
  135. row, err3 := reader.Read()
  136. if err3 == io.EOF {
  137. return nil
  138. }
  139. startStr := GetCSVRowValue(row, columnIndexes, S3SelectStartDate)
  140. billPayerAccountID := GetCSVRowValue(row, columnIndexes, S3SelectBillPayerAccountID)
  141. itemAccountID := GetCSVRowValue(row, columnIndexes, S3SelectAccountID)
  142. itemProviderID := GetCSVRowValue(row, columnIndexes, S3SelectResourceID)
  143. lineItemType := GetCSVRowValue(row, columnIndexes, S3SelectItemType)
  144. itemProductCode := GetCSVRowValue(row, columnIndexes, S3SelectProductCode)
  145. usageType := GetCSVRowValue(row, columnIndexes, S3SelectUsageType)
  146. regionCode := GetCSVRowValue(row, columnIndexes, S3SelectRegionCode)
  147. availabilityZone := GetCSVRowValue(row, columnIndexes, S3SelectAvailabilityZone)
  148. // Iterate through the slice of tag columns, assigning
  149. // values to the column names, minus the tag prefix.
  150. labels := opencost.CloudCostLabels{}
  151. for _, labelColumnName := range labelColumns {
  152. // remove quotes
  153. labelName := strings.TrimPrefix(labelColumnName, `s."`)
  154. labelName = strings.TrimSuffix(labelName, `"`)
  155. // remove prefix
  156. labelName = strings.TrimPrefix(labelName, S3SelectUserLabelPrefix)
  157. value := GetCSVRowValue(row, columnIndexes, labelColumnName)
  158. if value != "" {
  159. labels[labelName] = value
  160. }
  161. }
  162. isKubernetes := 0.0
  163. if itemProductCode == "AmazonEKS" || hasK8sLabel(labels) {
  164. isKubernetes = 1.0
  165. }
  166. var (
  167. amortizedCost float64
  168. amortizedNetCost float64
  169. listCost float64
  170. netCost float64
  171. )
  172. // Get list and net costs
  173. if lineItemType != "EdpDiscount" && lineItemType != "PrivateRateDiscount" {
  174. listCost, err = GetCSVRowValueFloat(row, columnIndexes, S3SelectListCost)
  175. if err != nil {
  176. return err
  177. }
  178. }
  179. // Get net cost if available
  180. netCost = listCost
  181. if checkNet {
  182. netCost, err = GetCSVRowValueFloat(row, columnIndexes, S3SelectNetCost)
  183. if err != nil {
  184. return err
  185. }
  186. }
  187. // If there is a reservation_reservation_a_r_n on the line item use the awsRIPricingSUMColumn as cost
  188. amortizedCost = listCost
  189. amortizedNetCost = listCost
  190. if lineItemType == "DiscountedUsage" {
  191. if checkReservations {
  192. amortizedCost, err = GetCSVRowValueFloat(row, columnIndexes, S3SelectRICost)
  193. if err != nil {
  194. log.Errorf(err.Error())
  195. continue
  196. }
  197. amortizedNetCost = amortizedCost
  198. }
  199. if checkNetReservations {
  200. amortizedNetCost, err = GetCSVRowValueFloat(row, columnIndexes, S3SelectNetRICost)
  201. if err != nil {
  202. log.Errorf(err.Error())
  203. continue
  204. }
  205. }
  206. // If there is a lineItemType of SavingsPlanCoveredUsage use the awsSPPricingSUMColumn
  207. } else if lineItemType == "SavingsPlanCoveredUsage" {
  208. if checkSavingsPlan {
  209. amortizedCost, err = GetCSVRowValueFloat(row, columnIndexes, S3SelectSPCost)
  210. if err != nil {
  211. log.Errorf(err.Error())
  212. continue
  213. }
  214. amortizedNetCost = amortizedCost
  215. }
  216. if checkNetSavingsPlan {
  217. amortizedNetCost, err = GetCSVRowValueFloat(row, columnIndexes, S3SelectNetSPCost)
  218. if err != nil {
  219. log.Errorf(err.Error())
  220. continue
  221. }
  222. }
  223. }
  224. category := SelectAWSCategory(itemProviderID, usageType, itemProductCode)
  225. // Retrieve final stanza of product code for ProviderID
  226. if itemProductCode == "AWSELB" || itemProductCode == "AmazonFSx" {
  227. itemProviderID = ParseARN(itemProviderID)
  228. }
  229. properties := opencost.CloudCostProperties{}
  230. properties.Provider = opencost.AWSProvider
  231. properties.InvoiceEntityID = billPayerAccountID
  232. properties.InvoiceEntityName = billPayerAccountID
  233. properties.AccountID = itemAccountID
  234. properties.AccountName = itemAccountID
  235. properties.Category = category
  236. properties.Service = itemProductCode
  237. properties.ProviderID = itemProviderID
  238. properties.RegionID = regionCode
  239. properties.AvailabilityZone = availabilityZone
  240. properties.Labels = labels
  241. itemStart, err := time.Parse(S3SelectDateLayout, startStr)
  242. if err != nil {
  243. log.Infof(
  244. "Unable to parse '%s': '%s'",
  245. S3SelectStartDate,
  246. err.Error(),
  247. )
  248. itemStart = time.Now()
  249. }
  250. itemStart = itemStart.Truncate(time.Hour * 24)
  251. itemEnd := itemStart.AddDate(0, 0, 1)
  252. cc := &opencost.CloudCost{
  253. Properties: &properties,
  254. Window: opencost.NewWindow(&itemStart, &itemEnd),
  255. ListCost: opencost.CostMetric{
  256. Cost: listCost,
  257. KubernetesPercent: isKubernetes,
  258. },
  259. NetCost: opencost.CostMetric{
  260. Cost: netCost,
  261. KubernetesPercent: isKubernetes,
  262. },
  263. AmortizedNetCost: opencost.CostMetric{
  264. Cost: amortizedCost,
  265. KubernetesPercent: isKubernetes,
  266. },
  267. AmortizedCost: opencost.CostMetric{
  268. Cost: amortizedNetCost,
  269. KubernetesPercent: isKubernetes,
  270. },
  271. InvoicedCost: opencost.CostMetric{
  272. Cost: netCost,
  273. KubernetesPercent: isKubernetes,
  274. },
  275. }
  276. ccsr.LoadCloudCost(cc)
  277. }
  278. }
  279. err = s3si.Query(query, queryKeys, client, processResults)
  280. if err != nil {
  281. return nil, err
  282. }
  283. return ccsr, nil
  284. }
  285. // hsK8sLabel checks if the labels contain a k8s label
  286. func hasK8sLabel(labels opencost.CloudCostLabels) bool {
  287. if _, ok := labels["eks:cluster-name"]; ok {
  288. return true
  289. }
  290. if _, ok := labels["alpha.eksctl.io/cluster-name"]; ok {
  291. return true
  292. }
  293. if _, ok := labels["kubernetes.io/service-name"]; ok {
  294. return true
  295. }
  296. if _, ok := labels["kubernetes.io/created-for/pvc/name"]; ok {
  297. return true
  298. }
  299. if _, ok := labels["kubernetes.io/created-for/pv/name"]; ok {
  300. return true
  301. }
  302. return false
  303. }