s3selectintegration.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. package aws
  2. import (
  3. "encoding/csv"
  4. "fmt"
  5. "io"
  6. "strconv"
  7. "strings"
  8. "time"
  9. "github.com/aws/aws-sdk-go-v2/service/s3"
  10. "github.com/opencost/opencost/pkg/kubecost"
  11. "github.com/opencost/opencost/pkg/log"
  12. "github.com/opencost/opencost/pkg/util/timeutil"
  13. )
  14. const s3SelectDateLayout = "2006-01-02T15:04:05Z"
  15. // S3Object is aliased as "s" in queries
  16. const s3SelectAccountID = `s."bill/PayerAccountId"`
  17. const s3SelectItemType = `s."lineItem/LineItemType"`
  18. const s3SelectStartDate = `s."lineItem/UsageStartDate"`
  19. const s3SelectProductCode = `s."lineItem/ProductCode"`
  20. const s3SelectResourceID = `s."lineItem/ResourceId"`
  21. const s3SelectIsNode = `SUBSTRING(s."lineItem/ResourceId",1,2) = 'i-'`
  22. const s3SelectIsVol = `SUBSTRING(s."lineItem/ResourceId", 1, 4) = 'vol-'`
  23. const s3SelectIsNetwork = `s."lineItem/UsageType" LIKE '%Bytes'`
  24. const s3SelectListCost = `s."lineItem/UnblendedCost"`
  25. const s3SelectNetCost = `s."lineItem/NetUnblendedCost"`
  26. // These two may be used for Amortized<Net>Cost
  27. const s3SelectRICost = `s."reservation/EffectiveCost"`
  28. const s3SelectSPCost = `s."savingsPlan/SavingsPlanEffectiveCost"`
  29. type S3SelectIntegration struct {
  30. S3SelectQuerier
  31. }
  32. func (s3si *S3SelectIntegration) GetCloudCost(
  33. start,
  34. end time.Time,
  35. ) (*kubecost.CloudCostSetRange, error) {
  36. log.Infof(
  37. "S3SelectIntegration[%s]: GetCloudCost: %s",
  38. s3si.Key(),
  39. kubecost.NewWindow(&start, &end).String(),
  40. )
  41. // Set midnight yesterday as last point in time reconciliation data
  42. // can be pulled from to ensure complete days of data
  43. midnightYesterday := time.Now().In(
  44. time.UTC,
  45. ).Truncate(time.Hour*24).AddDate(0, 0, -1)
  46. if end.After(midnightYesterday) {
  47. end = midnightYesterday
  48. }
  49. // ccsr to populate with cloudcosts.
  50. ccsr, err := kubecost.NewCloudCostSetRange(
  51. start,
  52. end,
  53. timeutil.Day,
  54. s3si.Key(),
  55. )
  56. if err != nil {
  57. return nil, err
  58. }
  59. // acquire S3 client
  60. client, err := s3si.GetS3Client()
  61. if err != nil {
  62. return nil, err
  63. }
  64. // Acquire query keys
  65. queryKeys, err := s3si.GetQueryKeys(start, end, client)
  66. if err != nil {
  67. return nil, err
  68. }
  69. // Acquire headers
  70. headers, err := s3si.GetHeaders(queryKeys, client)
  71. if err != nil {
  72. return nil, err
  73. }
  74. // Exactly what it says on the tin. Though is there a set equivalent
  75. // in Go? This seems like a good use case for that.
  76. allColumns := map[string]bool{}
  77. for _, header := range headers {
  78. allColumns[header] = true
  79. }
  80. formattedStart := start.Format("2006-01-02")
  81. formattedEnd := end.Format("2006-01-02")
  82. selectColumns := []string{
  83. s3SelectStartDate,
  84. s3SelectAccountID,
  85. s3SelectResourceID,
  86. s3SelectItemType,
  87. s3SelectProductCode,
  88. s3SelectIsNode,
  89. s3SelectIsVol,
  90. s3SelectIsNetwork,
  91. s3SelectListCost,
  92. }
  93. // OC equivalent to KCM env flags relevant at all?
  94. // Check for Reservation columns in CUR and query if available
  95. checkReservations := allColumns[s3SelectRICost]
  96. if checkReservations {
  97. selectColumns = append(selectColumns, s3SelectRICost)
  98. }
  99. // Check for Savings Plan Columns in CUR and query if available
  100. checkSavingsPlan := allColumns[s3SelectSPCost]
  101. if checkSavingsPlan {
  102. selectColumns = append(selectColumns, s3SelectSPCost)
  103. }
  104. // Build map of query columns to use for parsing query
  105. columnIndexes := map[string]int{}
  106. for i, column := range selectColumns {
  107. columnIndexes[column] = i
  108. }
  109. // Build query
  110. selectStr := strings.Join(selectColumns, ", ")
  111. queryStr := `SELECT %s FROM s3object s
  112. WHERE (CAST(s."lineItem/UsageStartDate" AS TIMESTAMP) BETWEEN CAST('%s' AS TIMESTAMP) AND CAST('%s' AS TIMESTAMP))
  113. AND s."lineItem/ResourceId" <> ''
  114. AND (
  115. (
  116. s."lineItem/ProductCode" = 'AmazonEC2' AND (
  117. SUBSTRING(s."lineItem/ResourceId",1,2) = 'i-'
  118. OR SUBSTRING(s."lineItem/ResourceId",1,4) = 'vol-'
  119. )
  120. )
  121. OR s."lineItem/ProductCode" = 'AWSELB'
  122. OR s."lineItem/ProductCode" = 'AmazonFSx'
  123. )`
  124. query := fmt.Sprintf(queryStr, selectStr, formattedStart, formattedEnd)
  125. processResults := func(reader *csv.Reader) error {
  126. _, err2 := reader.Read()
  127. if err2 == io.EOF {
  128. return nil
  129. }
  130. for {
  131. row, err3 := reader.Read()
  132. if err3 == io.EOF {
  133. return nil
  134. }
  135. startStr := GetCSVRowValue(row, columnIndexes, s3SelectStartDate)
  136. itemAccountID := GetCSVRowValue(row, columnIndexes, s3SelectAccountID)
  137. itemProviderID := GetCSVRowValue(row, columnIndexes, s3SelectResourceID)
  138. lineItemType := GetCSVRowValue(row, columnIndexes, s3SelectItemType)
  139. itemProductCode := GetCSVRowValue(row, columnIndexes, s3SelectProductCode)
  140. isNode, _ := strconv.ParseBool(GetCSVRowValue(row, columnIndexes, s3SelectIsNode))
  141. isVol, _ := strconv.ParseBool(GetCSVRowValue(row, columnIndexes, s3SelectIsVol))
  142. isNetwork, _ := strconv.ParseBool(GetCSVRowValue(row, columnIndexes, s3SelectIsNetwork))
  143. var (
  144. amortizedCost float64
  145. listCost float64
  146. netCost float64
  147. )
  148. // Get list and net costs
  149. listCost, err = GetCSVRowValueFloat(row, columnIndexes, s3SelectListCost)
  150. if err != nil {
  151. return err
  152. }
  153. netCost, err = GetCSVRowValueFloat(row, columnIndexes, s3SelectNetCost)
  154. if err != nil {
  155. return err
  156. }
  157. // If there is a reservation_reservation_a_r_n on the line item use the awsRIPricingSUMColumn as cost
  158. if checkReservations && lineItemType == "DiscountedUsage" {
  159. amortizedCost, err = GetCSVRowValueFloat(row, columnIndexes, s3SelectRICost)
  160. if err != nil {
  161. log.Errorf(err.Error())
  162. continue
  163. }
  164. // If there is a lineItemType of SavingsPlanCoveredUsage use the awsSPPricingSUMColumn
  165. } else if checkSavingsPlan && lineItemType == "SavingsPlanCoveredUsage" {
  166. amortizedCost, err = GetCSVRowValueFloat(row, columnIndexes, s3SelectSPCost)
  167. if err != nil {
  168. log.Errorf(err.Error())
  169. continue
  170. }
  171. } else {
  172. // Default to listCost
  173. amortizedCost = listCost
  174. }
  175. category := SelectAWSCategory(isNode, isVol, isNetwork, itemProductCode, "")
  176. // Retrieve final stanza of product code for ProviderID
  177. if itemProductCode == "AWSELB" || itemProductCode == "AmazonFSx" {
  178. itemProviderID = ParseARN(itemProviderID)
  179. }
  180. properties := kubecost.CloudCostProperties{}
  181. properties.Provider = kubecost.AWSProvider
  182. properties.AccountID = itemAccountID
  183. properties.Category = category
  184. properties.Service = itemProductCode
  185. properties.ProviderID = itemProviderID
  186. itemStart, err := time.Parse(s3SelectDateLayout, startStr)
  187. if err != nil {
  188. log.Infof(
  189. "Unable to parse '%s': '%s'",
  190. s3SelectStartDate,
  191. err.Error(),
  192. )
  193. itemStart = time.Now()
  194. }
  195. itemStart = itemStart.Truncate(time.Hour * 24)
  196. itemEnd := itemStart.AddDate(0, 0, 1)
  197. cc := &kubecost.CloudCost{
  198. Properties: &properties,
  199. Window: kubecost.NewWindow(&itemStart, &itemEnd),
  200. ListCost: kubecost.CostMetric{
  201. Cost: listCost,
  202. },
  203. NetCost: kubecost.CostMetric{
  204. Cost: netCost,
  205. },
  206. AmortizedNetCost: kubecost.CostMetric{
  207. Cost: amortizedCost,
  208. },
  209. AmortizedCost: kubecost.CostMetric{
  210. Cost: amortizedCost,
  211. },
  212. InvoicedCost: kubecost.CostMetric{
  213. Cost: netCost,
  214. },
  215. }
  216. ccsr.LoadCloudCost(cc)
  217. }
  218. }
  219. err = s3si.Query(query, queryKeys, client, processResults)
  220. if err != nil {
  221. return nil, err
  222. }
  223. return ccsr, nil
  224. }
  225. func (s3si *S3SelectIntegration) GetHeaders(queryKeys []string, client *s3.Client) ([]string, error) {
  226. // Query to grab only header line from file
  227. query := "SELECT * FROM S3OBJECT LIMIT 1"
  228. var record []string
  229. proccessheaders := func(reader *csv.Reader) error {
  230. var err error
  231. record, err = reader.Read()
  232. if err != nil {
  233. return err
  234. }
  235. return nil
  236. }
  237. // Use only the first query key with assumption that files share schema
  238. err := s3si.Query(query, []string{queryKeys[0]}, client, proccessheaders)
  239. if err != nil {
  240. return nil, err
  241. }
  242. return record, nil
  243. }