s3selectintegration.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. package aws
  2. import (
  3. "encoding/csv"
  4. "fmt"
  5. "io"
  6. "strings"
  7. "time"
  8. "github.com/aws/aws-sdk-go-v2/service/s3"
  9. "github.com/opencost/opencost/core/pkg/log"
  10. "github.com/opencost/opencost/core/pkg/opencost"
  11. )
  12. const S3SelectDateLayout = "2006-01-02T15:04:05Z"
  13. // S3Object is aliased as "s" in queries
  14. const S3SelectAccountID = `s."bill/PayerAccountId"`
  15. const S3SelectItemType = `s."lineItem/LineItemType"`
  16. const S3SelectStartDate = `s."lineItem/UsageStartDate"`
  17. const S3SelectProductCode = `s."lineItem/ProductCode"`
  18. const S3SelectResourceID = `s."lineItem/ResourceId"`
  19. const S3SelectUsageType = `s."lineItem/UsageType"`
  20. const S3SelectListCost = `s."lineItem/UnblendedCost"`
  21. const S3SelectNetCost = `s."lineItem/NetUnblendedCost"`
  22. // These two may be used for Amortized<Net>Cost
  23. const S3SelectRICost = `s."reservation/EffectiveCost"`
  24. const S3SelectSPCost = `s."savingsPlan/SavingsPlanEffectiveCost"`
  25. type S3SelectIntegration struct {
  26. S3SelectQuerier
  27. }
  28. func (s3si *S3SelectIntegration) GetCloudCost(
  29. start,
  30. end time.Time,
  31. ) (*opencost.CloudCostSetRange, error) {
  32. log.Infof(
  33. "S3SelectIntegration[%s]: GetCloudCost: %s",
  34. s3si.Key(),
  35. opencost.NewWindow(&start, &end).String(),
  36. )
  37. // Set midnight yesterday as last point in time reconciliation data
  38. // can be pulled from to ensure complete days of data
  39. midnightYesterday := time.Now().In(
  40. time.UTC,
  41. ).Truncate(time.Hour*24).AddDate(0, 0, -1)
  42. if end.After(midnightYesterday) {
  43. end = midnightYesterday
  44. }
  45. // ccsr to populate with cloudcosts.
  46. ccsr, err := opencost.NewCloudCostSetRange(
  47. start,
  48. end,
  49. opencost.AccumulateOptionDay,
  50. s3si.Key(),
  51. )
  52. if err != nil {
  53. return nil, err
  54. }
  55. // acquire S3 client
  56. client, err := s3si.GetS3Client()
  57. if err != nil {
  58. return nil, err
  59. }
  60. // Acquire query keys
  61. queryKeys, err := s3si.GetQueryKeys(start, end, client)
  62. if err != nil {
  63. return nil, err
  64. }
  65. // Acquire headers
  66. headers, err := s3si.GetHeaders(queryKeys, client)
  67. if err != nil {
  68. return nil, err
  69. }
  70. // Exactly what it says on the tin. Though is there a set equivalent
  71. // in Go? This seems like a good use case for that.
  72. allColumns := map[string]bool{}
  73. for _, header := range headers {
  74. allColumns[header] = true
  75. }
  76. formattedStart := start.Format("2006-01-02")
  77. formattedEnd := end.Format("2006-01-02")
  78. selectColumns := []string{
  79. S3SelectStartDate,
  80. S3SelectAccountID,
  81. S3SelectResourceID,
  82. S3SelectItemType,
  83. S3SelectProductCode,
  84. S3SelectUsageType,
  85. S3SelectListCost,
  86. }
  87. // OC equivalent to KCM env flags relevant at all?
  88. // Check for Reservation columns in CUR and query if available
  89. checkReservations := allColumns[S3SelectRICost]
  90. if checkReservations {
  91. selectColumns = append(selectColumns, S3SelectRICost)
  92. }
  93. // Check for Savings Plan Columns in CUR and query if available
  94. checkSavingsPlan := allColumns[S3SelectSPCost]
  95. if checkSavingsPlan {
  96. selectColumns = append(selectColumns, S3SelectSPCost)
  97. }
  98. // Build map of query columns to use for parsing query
  99. columnIndexes := map[string]int{}
  100. for i, column := range selectColumns {
  101. columnIndexes[column] = i
  102. }
  103. // Build query
  104. selectStr := strings.Join(selectColumns, ", ")
  105. queryStr := `SELECT %s FROM s3object s
  106. WHERE (CAST(s."lineItem/UsageStartDate" AS TIMESTAMP) BETWEEN CAST('%s' AS TIMESTAMP) AND CAST('%s' AS TIMESTAMP))
  107. AND s."lineItem/ResourceId" <> ''
  108. AND (
  109. (
  110. s."lineItem/ProductCode" = 'AmazonEC2' AND (
  111. SUBSTRING(s."lineItem/ResourceId",1,2) = 'i-'
  112. OR SUBSTRING(s."lineItem/ResourceId",1,4) = 'vol-'
  113. )
  114. )
  115. OR s."lineItem/ProductCode" = 'AWSELB'
  116. OR s."lineItem/ProductCode" = 'AmazonFSx'
  117. )`
  118. query := fmt.Sprintf(queryStr, selectStr, formattedStart, formattedEnd)
  119. processResults := func(reader *csv.Reader) error {
  120. _, err2 := reader.Read()
  121. if err2 == io.EOF {
  122. return nil
  123. }
  124. for {
  125. row, err3 := reader.Read()
  126. if err3 == io.EOF {
  127. return nil
  128. }
  129. startStr := GetCSVRowValue(row, columnIndexes, S3SelectStartDate)
  130. itemAccountID := GetCSVRowValue(row, columnIndexes, S3SelectAccountID)
  131. itemProviderID := GetCSVRowValue(row, columnIndexes, S3SelectResourceID)
  132. lineItemType := GetCSVRowValue(row, columnIndexes, S3SelectItemType)
  133. itemProductCode := GetCSVRowValue(row, columnIndexes, S3SelectProductCode)
  134. usageType := GetCSVRowValue(row, columnIndexes, S3SelectUsageType)
  135. var (
  136. amortizedCost float64
  137. listCost float64
  138. netCost float64
  139. )
  140. // Get list and net costs
  141. listCost, err = GetCSVRowValueFloat(row, columnIndexes, S3SelectListCost)
  142. if err != nil {
  143. return err
  144. }
  145. netCost, err = GetCSVRowValueFloat(row, columnIndexes, S3SelectNetCost)
  146. if err != nil {
  147. return err
  148. }
  149. // If there is a reservation_reservation_a_r_n on the line item use the awsRIPricingSUMColumn as cost
  150. if checkReservations && lineItemType == "DiscountedUsage" {
  151. amortizedCost, err = GetCSVRowValueFloat(row, columnIndexes, S3SelectRICost)
  152. if err != nil {
  153. log.Errorf(err.Error())
  154. continue
  155. }
  156. // If there is a lineItemType of SavingsPlanCoveredUsage use the awsSPPricingSUMColumn
  157. } else if checkSavingsPlan && lineItemType == "SavingsPlanCoveredUsage" {
  158. amortizedCost, err = GetCSVRowValueFloat(row, columnIndexes, S3SelectSPCost)
  159. if err != nil {
  160. log.Errorf(err.Error())
  161. continue
  162. }
  163. } else {
  164. // Default to listCost
  165. amortizedCost = listCost
  166. }
  167. category := SelectAWSCategory(itemProviderID, usageType, itemProductCode)
  168. // Retrieve final stanza of product code for ProviderID
  169. if itemProductCode == "AWSELB" || itemProductCode == "AmazonFSx" {
  170. itemProviderID = ParseARN(itemProviderID)
  171. }
  172. properties := opencost.CloudCostProperties{}
  173. properties.Provider = opencost.AWSProvider
  174. properties.AccountID = itemAccountID
  175. properties.Category = category
  176. properties.Service = itemProductCode
  177. properties.ProviderID = itemProviderID
  178. itemStart, err := time.Parse(S3SelectDateLayout, startStr)
  179. if err != nil {
  180. log.Infof(
  181. "Unable to parse '%s': '%s'",
  182. S3SelectStartDate,
  183. err.Error(),
  184. )
  185. itemStart = time.Now()
  186. }
  187. itemStart = itemStart.Truncate(time.Hour * 24)
  188. itemEnd := itemStart.AddDate(0, 0, 1)
  189. cc := &opencost.CloudCost{
  190. Properties: &properties,
  191. Window: opencost.NewWindow(&itemStart, &itemEnd),
  192. ListCost: opencost.CostMetric{
  193. Cost: listCost,
  194. },
  195. NetCost: opencost.CostMetric{
  196. Cost: netCost,
  197. },
  198. AmortizedNetCost: opencost.CostMetric{
  199. Cost: amortizedCost,
  200. },
  201. AmortizedCost: opencost.CostMetric{
  202. Cost: amortizedCost,
  203. },
  204. InvoicedCost: opencost.CostMetric{
  205. Cost: netCost,
  206. },
  207. }
  208. ccsr.LoadCloudCost(cc)
  209. }
  210. }
  211. err = s3si.Query(query, queryKeys, client, processResults)
  212. if err != nil {
  213. return nil, err
  214. }
  215. return ccsr, nil
  216. }
  217. func (s3si *S3SelectIntegration) GetHeaders(queryKeys []string, client *s3.Client) ([]string, error) {
  218. // Query to grab only header line from file
  219. query := "SELECT * FROM S3OBJECT LIMIT 1"
  220. var record []string
  221. proccessheaders := func(reader *csv.Reader) error {
  222. var err error
  223. record, err = reader.Read()
  224. if err != nil {
  225. return err
  226. }
  227. return nil
  228. }
  229. // Use only the first query key with assumption that files share schema
  230. err := s3si.Query(query, []string{queryKeys[0]}, client, proccessheaders)
  231. if err != nil {
  232. return nil, err
  233. }
  234. return record, nil
  235. }