s3selectintegration.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. package aws
  2. import (
  3. "encoding/csv"
  4. "fmt"
  5. "io"
  6. "strings"
  7. "time"
  8. "github.com/aws/aws-sdk-go-v2/service/s3"
  9. "github.com/opencost/opencost/pkg/kubecost"
  10. "github.com/opencost/opencost/pkg/log"
  11. "github.com/opencost/opencost/pkg/util/timeutil"
  12. )
  13. const S3SelectDateLayout = "2006-01-02T15:04:05Z"
  14. // S3Object is aliased as "s" in queries
  15. const S3SelectAccountID = `s."bill/PayerAccountId"`
  16. const S3SelectItemType = `s."lineItem/LineItemType"`
  17. const S3SelectStartDate = `s."lineItem/UsageStartDate"`
  18. const S3SelectProductCode = `s."lineItem/ProductCode"`
  19. const S3SelectResourceID = `s."lineItem/ResourceId"`
  20. const S3SelectUsageType = `s."lineItem/UsageType"`
  21. const S3SelectListCost = `s."lineItem/UnblendedCost"`
  22. const S3SelectNetCost = `s."lineItem/NetUnblendedCost"`
  23. // These two may be used for Amortized<Net>Cost
  24. const S3SelectRICost = `s."reservation/EffectiveCost"`
  25. const S3SelectSPCost = `s."savingsPlan/SavingsPlanEffectiveCost"`
  26. type S3SelectIntegration struct {
  27. S3SelectQuerier
  28. }
  29. func (s3si *S3SelectIntegration) GetCloudCost(
  30. start,
  31. end time.Time,
  32. ) (*kubecost.CloudCostSetRange, error) {
  33. log.Infof(
  34. "S3SelectIntegration[%s]: GetCloudCost: %s",
  35. s3si.Key(),
  36. kubecost.NewWindow(&start, &end).String(),
  37. )
  38. // Set midnight yesterday as last point in time reconciliation data
  39. // can be pulled from to ensure complete days of data
  40. midnightYesterday := time.Now().In(
  41. time.UTC,
  42. ).Truncate(time.Hour*24).AddDate(0, 0, -1)
  43. if end.After(midnightYesterday) {
  44. end = midnightYesterday
  45. }
  46. // ccsr to populate with cloudcosts.
  47. ccsr, err := kubecost.NewCloudCostSetRange(
  48. start,
  49. end,
  50. timeutil.Day,
  51. s3si.Key(),
  52. )
  53. if err != nil {
  54. return nil, err
  55. }
  56. // acquire S3 client
  57. client, err := s3si.GetS3Client()
  58. if err != nil {
  59. return nil, err
  60. }
  61. // Acquire query keys
  62. queryKeys, err := s3si.GetQueryKeys(start, end, client)
  63. if err != nil {
  64. return nil, err
  65. }
  66. // Acquire headers
  67. headers, err := s3si.GetHeaders(queryKeys, client)
  68. if err != nil {
  69. return nil, err
  70. }
  71. // Exactly what it says on the tin. Though is there a set equivalent
  72. // in Go? This seems like a good use case for that.
  73. allColumns := map[string]bool{}
  74. for _, header := range headers {
  75. allColumns[header] = true
  76. }
  77. formattedStart := start.Format("2006-01-02")
  78. formattedEnd := end.Format("2006-01-02")
  79. selectColumns := []string{
  80. S3SelectStartDate,
  81. S3SelectAccountID,
  82. S3SelectResourceID,
  83. S3SelectItemType,
  84. S3SelectProductCode,
  85. S3SelectUsageType,
  86. S3SelectListCost,
  87. }
  88. // OC equivalent to KCM env flags relevant at all?
  89. // Check for Reservation columns in CUR and query if available
  90. checkReservations := allColumns[S3SelectRICost]
  91. if checkReservations {
  92. selectColumns = append(selectColumns, S3SelectRICost)
  93. }
  94. // Check for Savings Plan Columns in CUR and query if available
  95. checkSavingsPlan := allColumns[S3SelectSPCost]
  96. if checkSavingsPlan {
  97. selectColumns = append(selectColumns, S3SelectSPCost)
  98. }
  99. // Build map of query columns to use for parsing query
  100. columnIndexes := map[string]int{}
  101. for i, column := range selectColumns {
  102. columnIndexes[column] = i
  103. }
  104. // Build query
  105. selectStr := strings.Join(selectColumns, ", ")
  106. queryStr := `SELECT %s FROM s3object s
  107. WHERE (CAST(s."lineItem/UsageStartDate" AS TIMESTAMP) BETWEEN CAST('%s' AS TIMESTAMP) AND CAST('%s' AS TIMESTAMP))
  108. AND s."lineItem/ResourceId" <> ''
  109. AND (
  110. (
  111. s."lineItem/ProductCode" = 'AmazonEC2' AND (
  112. SUBSTRING(s."lineItem/ResourceId",1,2) = 'i-'
  113. OR SUBSTRING(s."lineItem/ResourceId",1,4) = 'vol-'
  114. )
  115. )
  116. OR s."lineItem/ProductCode" = 'AWSELB'
  117. OR s."lineItem/ProductCode" = 'AmazonFSx'
  118. )`
  119. query := fmt.Sprintf(queryStr, selectStr, formattedStart, formattedEnd)
  120. processResults := func(reader *csv.Reader) error {
  121. _, err2 := reader.Read()
  122. if err2 == io.EOF {
  123. return nil
  124. }
  125. for {
  126. row, err3 := reader.Read()
  127. if err3 == io.EOF {
  128. return nil
  129. }
  130. startStr := GetCSVRowValue(row, columnIndexes, S3SelectStartDate)
  131. itemAccountID := GetCSVRowValue(row, columnIndexes, S3SelectAccountID)
  132. itemProviderID := GetCSVRowValue(row, columnIndexes, S3SelectResourceID)
  133. lineItemType := GetCSVRowValue(row, columnIndexes, S3SelectItemType)
  134. itemProductCode := GetCSVRowValue(row, columnIndexes, S3SelectProductCode)
  135. usageType := GetCSVRowValue(row, columnIndexes, S3SelectUsageType)
  136. var (
  137. amortizedCost float64
  138. listCost float64
  139. netCost float64
  140. )
  141. // Get list and net costs
  142. listCost, err = GetCSVRowValueFloat(row, columnIndexes, S3SelectListCost)
  143. if err != nil {
  144. return err
  145. }
  146. netCost, err = GetCSVRowValueFloat(row, columnIndexes, S3SelectNetCost)
  147. if err != nil {
  148. return err
  149. }
  150. // If there is a reservation_reservation_a_r_n on the line item use the awsRIPricingSUMColumn as cost
  151. if checkReservations && lineItemType == "DiscountedUsage" {
  152. amortizedCost, err = GetCSVRowValueFloat(row, columnIndexes, S3SelectRICost)
  153. if err != nil {
  154. log.Errorf(err.Error())
  155. continue
  156. }
  157. // If there is a lineItemType of SavingsPlanCoveredUsage use the awsSPPricingSUMColumn
  158. } else if checkSavingsPlan && lineItemType == "SavingsPlanCoveredUsage" {
  159. amortizedCost, err = GetCSVRowValueFloat(row, columnIndexes, S3SelectSPCost)
  160. if err != nil {
  161. log.Errorf(err.Error())
  162. continue
  163. }
  164. } else {
  165. // Default to listCost
  166. amortizedCost = listCost
  167. }
  168. category := SelectAWSCategory(itemProviderID, usageType, itemProductCode)
  169. // Retrieve final stanza of product code for ProviderID
  170. if itemProductCode == "AWSELB" || itemProductCode == "AmazonFSx" {
  171. itemProviderID = ParseARN(itemProviderID)
  172. }
  173. properties := kubecost.CloudCostProperties{}
  174. properties.Provider = kubecost.AWSProvider
  175. properties.AccountID = itemAccountID
  176. properties.Category = category
  177. properties.Service = itemProductCode
  178. properties.ProviderID = itemProviderID
  179. itemStart, err := time.Parse(S3SelectDateLayout, startStr)
  180. if err != nil {
  181. log.Infof(
  182. "Unable to parse '%s': '%s'",
  183. S3SelectStartDate,
  184. err.Error(),
  185. )
  186. itemStart = time.Now()
  187. }
  188. itemStart = itemStart.Truncate(time.Hour * 24)
  189. itemEnd := itemStart.AddDate(0, 0, 1)
  190. cc := &kubecost.CloudCost{
  191. Properties: &properties,
  192. Window: kubecost.NewWindow(&itemStart, &itemEnd),
  193. ListCost: kubecost.CostMetric{
  194. Cost: listCost,
  195. },
  196. NetCost: kubecost.CostMetric{
  197. Cost: netCost,
  198. },
  199. AmortizedNetCost: kubecost.CostMetric{
  200. Cost: amortizedCost,
  201. },
  202. AmortizedCost: kubecost.CostMetric{
  203. Cost: amortizedCost,
  204. },
  205. InvoicedCost: kubecost.CostMetric{
  206. Cost: netCost,
  207. },
  208. }
  209. ccsr.LoadCloudCost(cc)
  210. }
  211. }
  212. err = s3si.Query(query, queryKeys, client, processResults)
  213. if err != nil {
  214. return nil, err
  215. }
  216. return ccsr, nil
  217. }
  218. func (s3si *S3SelectIntegration) GetHeaders(queryKeys []string, client *s3.Client) ([]string, error) {
  219. // Query to grab only header line from file
  220. query := "SELECT * FROM S3OBJECT LIMIT 1"
  221. var record []string
  222. proccessheaders := func(reader *csv.Reader) error {
  223. var err error
  224. record, err = reader.Read()
  225. if err != nil {
  226. return err
  227. }
  228. return nil
  229. }
  230. // Use only the first query key with assumption that files share schema
  231. err := s3si.Query(query, []string{queryKeys[0]}, client, proccessheaders)
  232. if err != nil {
  233. return nil, err
  234. }
  235. return record, nil
  236. }