2
0

s3selectintegration.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375
  1. package aws
  2. import (
  3. "encoding/csv"
  4. "fmt"
  5. "io"
  6. "strings"
  7. "time"
  8. "github.com/opencost/opencost/core/pkg/log"
  9. "github.com/opencost/opencost/core/pkg/opencost"
  10. )
  11. const S3SelectDateLayout = "2006-01-02T15:04:05Z"
  12. // S3Object is aliased as "s" in queries
  13. const S3SelectBillPayerAccountID = `s."bill/PayerAccountId"`
  14. const S3SelectAccountID = `s."lineItem/UsageAccountId"`
  15. const S3SelectItemType = `s."lineItem/LineItemType"`
  16. const S3SelectStartDate = `s."lineItem/UsageStartDate"`
  17. const S3SelectProductCode = `s."lineItem/ProductCode"`
  18. const S3SelectResourceID = `s."lineItem/ResourceId"`
  19. const S3SelectUsageType = `s."lineItem/UsageType"`
  20. const S3SelectRegionCode = `s."product/regionCode"`
  21. const S3SelectAvailabilityZone = `s."lineItem/AvailabilityZone"`
  22. const S3SelectListCost = `s."lineItem/UnblendedCost"`
  23. const S3SelectNetCost = `s."lineItem/NetUnblendedCost"`
  24. // These two may be used for Amortized<Net>Cost
  25. const S3SelectRICost = `s."reservation/EffectiveCost"`
  26. const S3SelectSPCost = `s."savingsPlan/SavingsPlanEffectiveCost"`
  27. const S3SelectNetRICost = `s."reservation/NetEffectiveCost"`
  28. const S3SelectNetSPCost = `s."savingsPlan/NetSavingsPlanEffectiveCost"`
  29. const S3SelectUserLabelPrefix = "resourceTags/user:"
  30. const S3SelectAWSLabelPrefix = "resourceTags/aws:"
  31. const S3SelectResourceTagsPrefix = "resourceTags/"
  32. const (
  33. TypeSavingsPlanCoveredUsage = "SavingsPlanCoveredUsage"
  34. TypeDiscountedUsage = "DiscountedUsage"
  35. TypeEDPDiscount = "EdpDiscount"
  36. TypePrivateRateDiscount = "PrivateRateDiscount"
  37. )
  38. type S3SelectIntegration struct {
  39. S3SelectQuerier
  40. }
  41. func (s3si *S3SelectIntegration) GetCloudCost(
  42. start,
  43. end time.Time,
  44. ) (*opencost.CloudCostSetRange, error) {
  45. log.Infof(
  46. "S3SelectIntegration[%s]: GetCloudCost: %s",
  47. s3si.Key(),
  48. opencost.NewWindow(&start, &end).String(),
  49. )
  50. // ccsr to populate with cloudcosts.
  51. ccsr, err := opencost.NewCloudCostSetRange(
  52. start,
  53. end,
  54. opencost.AccumulateOptionDay,
  55. s3si.Key(),
  56. )
  57. if err != nil {
  58. return nil, err
  59. }
  60. // acquire S3 client
  61. client, err := s3si.GetS3Client()
  62. if err != nil {
  63. return nil, err
  64. }
  65. // Acquire query keys
  66. queryKeys, err := s3si.GetQueryKeys(start, end, client)
  67. if err != nil {
  68. return nil, err
  69. }
  70. // Acquire headers
  71. headers, err := s3si.GetHeaders(queryKeys[0], client)
  72. if err != nil {
  73. return nil, err
  74. }
  75. allColumns := map[string]struct{}{}
  76. for _, header := range headers {
  77. allColumns[header] = struct{}{}
  78. }
  79. formattedStart := start.Format("2006-01-02")
  80. formattedEnd := end.Format("2006-01-02")
  81. selectColumns := []string{
  82. S3SelectStartDate,
  83. S3SelectBillPayerAccountID,
  84. S3SelectAccountID,
  85. S3SelectResourceID,
  86. S3SelectItemType,
  87. S3SelectProductCode,
  88. S3SelectUsageType,
  89. S3SelectRegionCode,
  90. S3SelectAvailabilityZone,
  91. S3SelectListCost,
  92. }
  93. if _, ok := allColumns[S3SelectNetCost]; ok {
  94. selectColumns = append(selectColumns, S3SelectNetCost)
  95. }
  96. // Check for Reservation columns in CUR and query if available
  97. if _, ok := allColumns[S3SelectRICost]; ok {
  98. selectColumns = append(selectColumns, S3SelectRICost)
  99. }
  100. if _, ok := allColumns[S3SelectNetRICost]; ok {
  101. selectColumns = append(selectColumns, S3SelectNetRICost)
  102. }
  103. // Check for Savings Plan Columns in CUR and query if available
  104. if _, ok := allColumns[S3SelectSPCost]; ok {
  105. selectColumns = append(selectColumns, S3SelectSPCost)
  106. }
  107. if _, ok := allColumns[S3SelectNetSPCost]; ok {
  108. selectColumns = append(selectColumns, S3SelectNetSPCost)
  109. }
  110. // Determine which columns are user-defined tags and add those to the list
  111. // of columns to query.
  112. userLabelColumns := []string{}
  113. awsLabelColumns := []string{}
  114. for column := range allColumns {
  115. if strings.HasPrefix(column, S3SelectUserLabelPrefix) {
  116. quotedTag := fmt.Sprintf(`s."%s"`, column)
  117. selectColumns = append(selectColumns, quotedTag)
  118. userLabelColumns = append(userLabelColumns, quotedTag)
  119. }
  120. if strings.HasPrefix(column, S3SelectAWSLabelPrefix) {
  121. quotedTag := fmt.Sprintf(`s."%s"`, column)
  122. selectColumns = append(selectColumns, quotedTag)
  123. awsLabelColumns = append(awsLabelColumns, quotedTag)
  124. }
  125. }
  126. // Build map of query columns to use for parsing query
  127. columnIndexes := map[string]int{}
  128. for i, column := range selectColumns {
  129. columnIndexes[column] = i
  130. }
  131. // Build query
  132. selectStr := strings.Join(selectColumns, ", ")
  133. queryStr := `SELECT %s FROM s3object s
  134. WHERE (CAST(s."lineItem/UsageStartDate" AS TIMESTAMP) BETWEEN CAST('%s' AS TIMESTAMP) AND CAST('%s' AS TIMESTAMP))
  135. AND (s."lineItem/LineItemType" = 'Usage' OR s."lineItem/LineItemType" = 'DiscountedUsage' OR s."lineItem/LineItemType" = 'SavingsPlanCoveredUsage' OR s."lineItem/LineItemType" = 'EdpDiscount' OR s."lineItem/LineItemType" = 'PrivateRateDiscount')
  136. `
  137. query := fmt.Sprintf(queryStr, selectStr, formattedStart, formattedEnd)
  138. processResults := func(reader *csv.Reader) error {
  139. _, err2 := reader.Read()
  140. if err2 == io.EOF {
  141. return nil
  142. }
  143. for {
  144. row, err3 := reader.Read()
  145. if err3 == io.EOF {
  146. return nil
  147. }
  148. cc, err3 := s3RowToCloudCost(row, columnIndexes, userLabelColumns, awsLabelColumns)
  149. if err3 != nil {
  150. log.Errorf("error creating cloud cost from row: %s", err3.Error())
  151. continue
  152. }
  153. ccsr.LoadCloudCost(cc)
  154. }
  155. }
  156. err = s3si.Query(query, queryKeys, client, processResults)
  157. if err != nil {
  158. return nil, err
  159. }
  160. return ccsr, nil
  161. }
  162. func s3RowToCloudCost(row []string, columnIndexes map[string]int, userLabelColumns, awsLabelColumns []string) (*opencost.CloudCost, error) {
  163. startStr := GetCSVRowValue(row, columnIndexes, S3SelectStartDate)
  164. billPayerAccountID := GetCSVRowValue(row, columnIndexes, S3SelectBillPayerAccountID)
  165. itemAccountID := GetCSVRowValue(row, columnIndexes, S3SelectAccountID)
  166. itemProviderID := GetCSVRowValue(row, columnIndexes, S3SelectResourceID)
  167. lineItemType := GetCSVRowValue(row, columnIndexes, S3SelectItemType)
  168. itemProductCode := GetCSVRowValue(row, columnIndexes, S3SelectProductCode)
  169. usageType := GetCSVRowValue(row, columnIndexes, S3SelectUsageType)
  170. regionCode := GetCSVRowValue(row, columnIndexes, S3SelectRegionCode)
  171. availabilityZone := GetCSVRowValue(row, columnIndexes, S3SelectAvailabilityZone)
  172. // Iterate through the slice of tag columns, assigning
  173. // values to the column names, minus the tag prefix.
  174. labels := opencost.CloudCostLabels{}
  175. for _, labelColumnName := range userLabelColumns {
  176. // remove quotes
  177. labelName := strings.TrimPrefix(labelColumnName, `s."`)
  178. labelName = strings.TrimSuffix(labelName, `"`)
  179. // remove prefix
  180. labelName = strings.TrimPrefix(labelName, S3SelectUserLabelPrefix)
  181. value := GetCSVRowValue(row, columnIndexes, labelColumnName)
  182. if value != "" {
  183. labels[labelName] = value
  184. }
  185. }
  186. for _, awsLabelColumnName := range awsLabelColumns {
  187. // remove quotes
  188. labelName := strings.TrimPrefix(awsLabelColumnName, `s."`)
  189. labelName = strings.TrimSuffix(labelName, `"`)
  190. // partially remove prefix leaving "aws:"
  191. labelName = strings.TrimPrefix(labelName, S3SelectResourceTagsPrefix)
  192. value := GetCSVRowValue(row, columnIndexes, awsLabelColumnName)
  193. if value != "" {
  194. labels[labelName] = value
  195. }
  196. }
  197. isKubernetes := 0.0
  198. if itemProductCode == "AmazonEKS" || hasK8sLabel(labels) {
  199. isKubernetes = 1.0
  200. }
  201. var (
  202. amortizedCost float64
  203. amortizedNetCost float64
  204. listCost float64
  205. netCost float64
  206. err error
  207. )
  208. // Get list and net costs
  209. if lineItemType != TypeEDPDiscount && lineItemType != TypePrivateRateDiscount {
  210. listCost, err = GetCSVRowValueFloat(row, columnIndexes, S3SelectListCost)
  211. if err != nil {
  212. return nil, err
  213. }
  214. }
  215. // Get net cost if available
  216. netCost = listCost
  217. if _, ok := columnIndexes[S3SelectNetCost]; ok {
  218. netCost, err = GetCSVRowValueFloat(row, columnIndexes, S3SelectNetCost)
  219. if err != nil {
  220. return nil, err
  221. }
  222. }
  223. // If there is a reservation_reservation_a_r_n on the line item use the awsRIPricingSUMColumn as cost
  224. amortizedCost = listCost
  225. amortizedNetCost = listCost
  226. if lineItemType == TypeDiscountedUsage {
  227. if _, ok := columnIndexes[S3SelectRICost]; ok {
  228. amortizedCost, err = GetCSVRowValueFloat(row, columnIndexes, S3SelectRICost)
  229. if err != nil {
  230. return nil, err
  231. }
  232. amortizedNetCost = amortizedCost
  233. }
  234. if _, ok := columnIndexes[S3SelectNetRICost]; ok {
  235. amortizedNetCost, err = GetCSVRowValueFloat(row, columnIndexes, S3SelectNetRICost)
  236. if err != nil {
  237. return nil, err
  238. }
  239. }
  240. // If there is a lineItemType of SavingsPlanCoveredUsage use the awsSPPricingSUMColumn
  241. } else if lineItemType == TypeSavingsPlanCoveredUsage {
  242. if _, ok := columnIndexes[S3SelectSPCost]; ok {
  243. amortizedCost, err = GetCSVRowValueFloat(row, columnIndexes, S3SelectSPCost)
  244. if err != nil {
  245. return nil, err
  246. }
  247. amortizedNetCost = amortizedCost
  248. }
  249. if _, ok := columnIndexes[S3SelectNetSPCost]; ok {
  250. amortizedNetCost, err = GetCSVRowValueFloat(row, columnIndexes, S3SelectNetSPCost)
  251. if err != nil {
  252. return nil, err
  253. }
  254. }
  255. }
  256. category := SelectAWSCategory(itemProviderID, usageType, itemProductCode)
  257. // Retrieve final stanza of product code for ProviderID
  258. if itemProductCode == "AWSELB" || itemProductCode == "AmazonFSx" {
  259. itemProviderID = ParseARN(itemProviderID)
  260. }
  261. properties := opencost.CloudCostProperties{}
  262. properties.Provider = opencost.AWSProvider
  263. properties.InvoiceEntityID = billPayerAccountID
  264. properties.InvoiceEntityName = billPayerAccountID
  265. properties.AccountID = itemAccountID
  266. properties.AccountName = itemAccountID
  267. properties.Category = category
  268. properties.Service = itemProductCode
  269. properties.ProviderID = itemProviderID
  270. properties.RegionID = regionCode
  271. properties.AvailabilityZone = availabilityZone
  272. properties.Labels = labels
  273. itemStart, err := time.Parse(S3SelectDateLayout, startStr)
  274. if err != nil {
  275. return nil, fmt.Errorf(
  276. "Unable to parse '%s': '%s'",
  277. S3SelectStartDate,
  278. err.Error(),
  279. )
  280. }
  281. itemStart = itemStart.Truncate(time.Hour * 24)
  282. itemEnd := itemStart.AddDate(0, 0, 1)
  283. return &opencost.CloudCost{
  284. Properties: &properties,
  285. Window: opencost.NewWindow(&itemStart, &itemEnd),
  286. ListCost: opencost.CostMetric{
  287. Cost: listCost,
  288. KubernetesPercent: isKubernetes,
  289. },
  290. NetCost: opencost.CostMetric{
  291. Cost: netCost,
  292. KubernetesPercent: isKubernetes,
  293. },
  294. AmortizedNetCost: opencost.CostMetric{
  295. Cost: amortizedNetCost,
  296. KubernetesPercent: isKubernetes,
  297. },
  298. AmortizedCost: opencost.CostMetric{
  299. Cost: amortizedCost,
  300. KubernetesPercent: isKubernetes,
  301. },
  302. InvoicedCost: opencost.CostMetric{
  303. Cost: netCost,
  304. KubernetesPercent: isKubernetes,
  305. },
  306. }, nil
  307. }
  308. const (
  309. TagAWSEKSClusterName = "aws:eks:cluster-name"
  310. TagEKSClusterName = "eks:cluster-name"
  311. TagEKSCtlClusterName = "alpha.eksctl.io/cluster-name"
  312. TagKubernetesServiceName = "kubernetes.io/service-name"
  313. TagKubernetesPVCName = "kubernetes.io/created-for/pvc/name"
  314. TagKubernetesPVName = "kubernetes.io/created-for/pv/name"
  315. )
  316. // hsK8sLabel checks if the labels contain a k8s label
  317. func hasK8sLabel(labels opencost.CloudCostLabels) bool {
  318. if _, ok := labels[TagAWSEKSClusterName]; ok {
  319. return true
  320. }
  321. if _, ok := labels[TagEKSClusterName]; ok {
  322. return true
  323. }
  324. if _, ok := labels[TagEKSCtlClusterName]; ok {
  325. return true
  326. }
  327. if _, ok := labels[TagKubernetesServiceName]; ok {
  328. return true
  329. }
  330. if _, ok := labels[TagKubernetesPVCName]; ok {
  331. return true
  332. }
  333. if _, ok := labels[TagKubernetesPVName]; ok {
  334. return true
  335. }
  336. return false
  337. }