2
0

s3selectintegration.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393
  1. package aws
  2. import (
  3. "encoding/csv"
  4. "fmt"
  5. "io"
  6. "strings"
  7. "time"
  8. "github.com/opencost/opencost/core/pkg/log"
  9. "github.com/opencost/opencost/core/pkg/opencost"
  10. "github.com/opencost/opencost/pkg/cloud"
  11. )
  12. const S3SelectDateLayout = "2006-01-02T15:04:05Z"
  13. // S3Object is aliased as "s" in queries
  14. const S3SelectBillPayerAccountID = `s."bill/PayerAccountId"`
  15. const S3SelectAccountID = `s."lineItem/UsageAccountId"`
  16. const S3SelectItemType = `s."lineItem/LineItemType"`
  17. const S3SelectStartDate = `s."lineItem/UsageStartDate"`
  18. const S3SelectProductCode = `s."lineItem/ProductCode"`
  19. const S3SelectResourceID = `s."lineItem/ResourceId"`
  20. const S3SelectUsageType = `s."lineItem/UsageType"`
  21. const S3SelectRegionCode = `s."product/regionCode"`
  22. const S3SelectAvailabilityZone = `s."lineItem/AvailabilityZone"`
  23. const S3SelectListCost = `s."lineItem/UnblendedCost"`
  24. const S3SelectNetCost = `s."lineItem/NetUnblendedCost"`
  25. // These two may be used for Amortized<Net>Cost
  26. const S3SelectRICost = `s."reservation/EffectiveCost"`
  27. const S3SelectSPCost = `s."savingsPlan/SavingsPlanEffectiveCost"`
  28. const S3SelectNetRICost = `s."reservation/NetEffectiveCost"`
  29. const S3SelectNetSPCost = `s."savingsPlan/NetSavingsPlanEffectiveCost"`
  30. const S3SelectUserLabelPrefix = "resourceTags/user:"
  31. const S3SelectAWSLabelPrefix = "resourceTags/aws:"
  32. const S3SelectResourceTagsPrefix = "resourceTags/"
  33. const (
  34. TypeSavingsPlanCoveredUsage = "SavingsPlanCoveredUsage"
  35. TypeDiscountedUsage = "DiscountedUsage"
  36. TypeEDPDiscount = "EdpDiscount"
  37. TypePrivateRateDiscount = "PrivateRateDiscount"
  38. )
  39. type S3SelectIntegration struct {
  40. S3SelectQuerier
  41. }
  42. func (s3si *S3SelectIntegration) GetCloudCost(
  43. start,
  44. end time.Time,
  45. ) (*opencost.CloudCostSetRange, error) {
  46. log.Infof(
  47. "S3SelectIntegration[%s]: GetCloudCost: %s",
  48. s3si.Key(),
  49. opencost.NewWindow(&start, &end).String(),
  50. )
  51. // ccsr to populate with cloudcosts.
  52. ccsr, err := opencost.NewCloudCostSetRange(
  53. start,
  54. end,
  55. opencost.AccumulateOptionDay,
  56. s3si.Key(),
  57. )
  58. if err != nil {
  59. return nil, err
  60. }
  61. // acquire S3 client
  62. client, err := s3si.GetS3Client()
  63. if err != nil {
  64. return nil, err
  65. }
  66. // Acquire query keys
  67. queryKeys, err := s3si.GetQueryKeys(start, end, client)
  68. if err != nil {
  69. return nil, err
  70. }
  71. // Acquire headers
  72. headers, err := s3si.GetHeaders(queryKeys[0], client)
  73. if err != nil {
  74. return nil, err
  75. }
  76. allColumns := map[string]struct{}{}
  77. for _, header := range headers {
  78. allColumns[header] = struct{}{}
  79. }
  80. formattedStart := start.Format("2006-01-02")
  81. formattedEnd := end.Format("2006-01-02")
  82. selectColumns := []string{
  83. S3SelectStartDate,
  84. S3SelectBillPayerAccountID,
  85. S3SelectAccountID,
  86. S3SelectResourceID,
  87. S3SelectItemType,
  88. S3SelectProductCode,
  89. S3SelectUsageType,
  90. S3SelectRegionCode,
  91. S3SelectAvailabilityZone,
  92. S3SelectListCost,
  93. }
  94. if _, ok := allColumns[S3SelectNetCost]; ok {
  95. selectColumns = append(selectColumns, S3SelectNetCost)
  96. }
  97. // Check for Reservation columns in CUR and query if available
  98. if _, ok := allColumns[S3SelectRICost]; ok {
  99. selectColumns = append(selectColumns, S3SelectRICost)
  100. }
  101. if _, ok := allColumns[S3SelectNetRICost]; ok {
  102. selectColumns = append(selectColumns, S3SelectNetRICost)
  103. }
  104. // Check for Savings Plan Columns in CUR and query if available
  105. if _, ok := allColumns[S3SelectSPCost]; ok {
  106. selectColumns = append(selectColumns, S3SelectSPCost)
  107. }
  108. if _, ok := allColumns[S3SelectNetSPCost]; ok {
  109. selectColumns = append(selectColumns, S3SelectNetSPCost)
  110. }
  111. // Determine which columns are user-defined tags and add those to the list
  112. // of columns to query.
  113. userLabelColumns := []string{}
  114. awsLabelColumns := []string{}
  115. for column := range allColumns {
  116. if strings.HasPrefix(column, S3SelectUserLabelPrefix) {
  117. quotedTag := fmt.Sprintf(`s."%s"`, column)
  118. selectColumns = append(selectColumns, quotedTag)
  119. userLabelColumns = append(userLabelColumns, quotedTag)
  120. }
  121. if strings.HasPrefix(column, S3SelectAWSLabelPrefix) {
  122. quotedTag := fmt.Sprintf(`s."%s"`, column)
  123. selectColumns = append(selectColumns, quotedTag)
  124. awsLabelColumns = append(awsLabelColumns, quotedTag)
  125. }
  126. }
  127. // Build map of query columns to use for parsing query
  128. columnIndexes := map[string]int{}
  129. for i, column := range selectColumns {
  130. columnIndexes[column] = i
  131. }
  132. // Build query
  133. selectStr := strings.Join(selectColumns, ", ")
  134. queryStr := `SELECT %s FROM s3object s
  135. WHERE (CAST(s."lineItem/UsageStartDate" AS TIMESTAMP) BETWEEN CAST('%s' AS TIMESTAMP) AND CAST('%s' AS TIMESTAMP))
  136. AND (s."lineItem/LineItemType" = 'Usage' OR s."lineItem/LineItemType" = 'DiscountedUsage' OR s."lineItem/LineItemType" = 'SavingsPlanCoveredUsage' OR s."lineItem/LineItemType" = 'EdpDiscount' OR s."lineItem/LineItemType" = 'PrivateRateDiscount')
  137. `
  138. query := fmt.Sprintf(queryStr, selectStr, formattedStart, formattedEnd)
  139. processResults := func(reader *csv.Reader) error {
  140. _, err2 := reader.Read()
  141. if err2 == io.EOF {
  142. return nil
  143. }
  144. for {
  145. row, err3 := reader.Read()
  146. if err3 == io.EOF {
  147. return nil
  148. }
  149. cc, err3 := s3RowToCloudCost(row, columnIndexes, userLabelColumns, awsLabelColumns)
  150. if err3 != nil {
  151. log.Errorf("error creating cloud cost from row: %s", err3.Error())
  152. continue
  153. }
  154. ccsr.LoadCloudCost(cc)
  155. }
  156. }
  157. err = s3si.Query(query, queryKeys, client, processResults)
  158. if err != nil {
  159. return nil, err
  160. }
  161. return ccsr, nil
  162. }
  163. func s3RowToCloudCost(row []string, columnIndexes map[string]int, userLabelColumns, awsLabelColumns []string) (*opencost.CloudCost, error) {
  164. startStr := GetCSVRowValue(row, columnIndexes, S3SelectStartDate)
  165. billPayerAccountID := GetCSVRowValue(row, columnIndexes, S3SelectBillPayerAccountID)
  166. itemAccountID := GetCSVRowValue(row, columnIndexes, S3SelectAccountID)
  167. itemProviderID := GetCSVRowValue(row, columnIndexes, S3SelectResourceID)
  168. lineItemType := GetCSVRowValue(row, columnIndexes, S3SelectItemType)
  169. itemProductCode := GetCSVRowValue(row, columnIndexes, S3SelectProductCode)
  170. usageType := GetCSVRowValue(row, columnIndexes, S3SelectUsageType)
  171. regionCode := GetCSVRowValue(row, columnIndexes, S3SelectRegionCode)
  172. availabilityZone := GetCSVRowValue(row, columnIndexes, S3SelectAvailabilityZone)
  173. // Iterate through the slice of tag columns, assigning
  174. // values to the column names, minus the tag prefix.
  175. labels := opencost.CloudCostLabels{}
  176. for _, labelColumnName := range userLabelColumns {
  177. // remove quotes
  178. labelName := strings.TrimPrefix(labelColumnName, `s."`)
  179. labelName = strings.TrimSuffix(labelName, `"`)
  180. // remove prefix
  181. labelName = strings.TrimPrefix(labelName, S3SelectUserLabelPrefix)
  182. value := GetCSVRowValue(row, columnIndexes, labelColumnName)
  183. if value != "" {
  184. labels[labelName] = value
  185. }
  186. }
  187. for _, awsLabelColumnName := range awsLabelColumns {
  188. // remove quotes
  189. labelName := strings.TrimPrefix(awsLabelColumnName, `s."`)
  190. labelName = strings.TrimSuffix(labelName, `"`)
  191. // partially remove prefix leaving "aws:"
  192. labelName = strings.TrimPrefix(labelName, S3SelectResourceTagsPrefix)
  193. value := GetCSVRowValue(row, columnIndexes, awsLabelColumnName)
  194. if value != "" {
  195. labels[labelName] = value
  196. }
  197. }
  198. isKubernetes := 0.0
  199. if itemProductCode == "AmazonEKS" || hasK8sLabel(labels) {
  200. isKubernetes = 1.0
  201. }
  202. var (
  203. amortizedCost float64
  204. amortizedNetCost float64
  205. listCost float64
  206. netCost float64
  207. err error
  208. )
  209. // Get list and net costs
  210. if lineItemType != TypeEDPDiscount && lineItemType != TypePrivateRateDiscount {
  211. listCost, err = GetCSVRowValueFloat(row, columnIndexes, S3SelectListCost)
  212. if err != nil {
  213. return nil, err
  214. }
  215. }
  216. // Get net cost if available
  217. netCost = listCost
  218. if _, ok := columnIndexes[S3SelectNetCost]; ok {
  219. netCost, err = GetCSVRowValueFloat(row, columnIndexes, S3SelectNetCost)
  220. if err != nil {
  221. return nil, err
  222. }
  223. }
  224. // If there is a reservation_reservation_a_r_n on the line item use the awsRIPricingSUMColumn as cost
  225. amortizedCost = listCost
  226. amortizedNetCost = listCost
  227. if lineItemType == TypeDiscountedUsage {
  228. if _, ok := columnIndexes[S3SelectRICost]; ok {
  229. amortizedCost, err = GetCSVRowValueFloat(row, columnIndexes, S3SelectRICost)
  230. if err != nil {
  231. return nil, err
  232. }
  233. amortizedNetCost = amortizedCost
  234. }
  235. if _, ok := columnIndexes[S3SelectNetRICost]; ok {
  236. amortizedNetCost, err = GetCSVRowValueFloat(row, columnIndexes, S3SelectNetRICost)
  237. if err != nil {
  238. return nil, err
  239. }
  240. }
  241. // If there is a lineItemType of SavingsPlanCoveredUsage use the awsSPPricingSUMColumn
  242. } else if lineItemType == TypeSavingsPlanCoveredUsage {
  243. if _, ok := columnIndexes[S3SelectSPCost]; ok {
  244. amortizedCost, err = GetCSVRowValueFloat(row, columnIndexes, S3SelectSPCost)
  245. if err != nil {
  246. return nil, err
  247. }
  248. amortizedNetCost = amortizedCost
  249. }
  250. if _, ok := columnIndexes[S3SelectNetSPCost]; ok {
  251. amortizedNetCost, err = GetCSVRowValueFloat(row, columnIndexes, S3SelectNetSPCost)
  252. if err != nil {
  253. return nil, err
  254. }
  255. }
  256. }
  257. category := SelectAWSCategory(itemProviderID, usageType, itemProductCode)
  258. // Retrieve final stanza of product code for ProviderID
  259. if itemProductCode == "AWSELB" || itemProductCode == "AmazonFSx" {
  260. itemProviderID = ParseARN(itemProviderID)
  261. }
  262. properties := opencost.CloudCostProperties{}
  263. properties.Provider = opencost.AWSProvider
  264. properties.InvoiceEntityID = billPayerAccountID
  265. properties.InvoiceEntityName = billPayerAccountID
  266. properties.AccountID = itemAccountID
  267. properties.AccountName = itemAccountID
  268. properties.Category = category
  269. properties.Service = itemProductCode
  270. properties.ProviderID = itemProviderID
  271. properties.RegionID = regionCode
  272. properties.AvailabilityZone = availabilityZone
  273. properties.Labels = labels
  274. itemStart, err := time.Parse(S3SelectDateLayout, startStr)
  275. if err != nil {
  276. return nil, fmt.Errorf(
  277. "Unable to parse '%s': '%s'",
  278. S3SelectStartDate,
  279. err.Error(),
  280. )
  281. }
  282. itemStart = itemStart.Truncate(time.Hour * 24)
  283. itemEnd := itemStart.AddDate(0, 0, 1)
  284. return &opencost.CloudCost{
  285. Properties: &properties,
  286. Window: opencost.NewWindow(&itemStart, &itemEnd),
  287. ListCost: opencost.CostMetric{
  288. Cost: listCost,
  289. KubernetesPercent: isKubernetes,
  290. },
  291. NetCost: opencost.CostMetric{
  292. Cost: netCost,
  293. KubernetesPercent: isKubernetes,
  294. },
  295. AmortizedNetCost: opencost.CostMetric{
  296. Cost: amortizedNetCost,
  297. KubernetesPercent: isKubernetes,
  298. },
  299. AmortizedCost: opencost.CostMetric{
  300. Cost: amortizedCost,
  301. KubernetesPercent: isKubernetes,
  302. },
  303. InvoicedCost: opencost.CostMetric{
  304. Cost: netCost,
  305. KubernetesPercent: isKubernetes,
  306. },
  307. }, nil
  308. }
  309. const (
  310. TagAWSEKSClusterName = "aws:eks:cluster-name"
  311. TagEKSClusterName = "eks:cluster-name"
  312. TagEKSCtlClusterName = "alpha.eksctl.io/cluster-name"
  313. TagKubernetesServiceName = "kubernetes.io/service-name"
  314. TagKubernetesPVCName = "kubernetes.io/created-for/pvc/name"
  315. TagKubernetesPVName = "kubernetes.io/created-for/pv/name"
  316. )
  317. // hsK8sLabel checks if the labels contain a k8s label
  318. func hasK8sLabel(labels opencost.CloudCostLabels) bool {
  319. if _, ok := labels[TagAWSEKSClusterName]; ok {
  320. return true
  321. }
  322. if _, ok := labels[TagEKSClusterName]; ok {
  323. return true
  324. }
  325. if _, ok := labels[TagEKSCtlClusterName]; ok {
  326. return true
  327. }
  328. if _, ok := labels[TagKubernetesServiceName]; ok {
  329. return true
  330. }
  331. if _, ok := labels[TagKubernetesPVCName]; ok {
  332. return true
  333. }
  334. if _, ok := labels[TagKubernetesPVName]; ok {
  335. return true
  336. }
  337. return false
  338. }
  339. func (s3si *S3SelectIntegration) RefreshStatus() cloud.ConnectionStatus {
  340. client, err := s3si.GetS3Client()
  341. if err != nil {
  342. s3si.ConnectionStatus = cloud.FailedConnection
  343. }
  344. objs, err := s3si.ListObjects(client)
  345. if err != nil {
  346. s3si.ConnectionStatus = cloud.FailedConnection
  347. } else if len(objs.Contents) == 0 {
  348. s3si.ConnectionStatus = cloud.MissingData
  349. } else {
  350. s3si.ConnectionStatus = cloud.SuccessfulConnection
  351. }
  352. return s3si.ConnectionStatus
  353. }