| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261 |
- package aws
- import (
- "encoding/csv"
- "fmt"
- "io"
- "strings"
- "time"
- "github.com/aws/aws-sdk-go-v2/service/s3"
- "github.com/opencost/opencost/pkg/kubecost"
- "github.com/opencost/opencost/pkg/log"
- )
- const S3SelectDateLayout = "2006-01-02T15:04:05Z"
- // S3Object is aliased as "s" in queries
- const S3SelectAccountID = `s."bill/PayerAccountId"`
- const S3SelectItemType = `s."lineItem/LineItemType"`
- const S3SelectStartDate = `s."lineItem/UsageStartDate"`
- const S3SelectProductCode = `s."lineItem/ProductCode"`
- const S3SelectResourceID = `s."lineItem/ResourceId"`
- const S3SelectUsageType = `s."lineItem/UsageType"`
- const S3SelectListCost = `s."lineItem/UnblendedCost"`
- const S3SelectNetCost = `s."lineItem/NetUnblendedCost"`
- // These two may be used for Amortized<Net>Cost
- const S3SelectRICost = `s."reservation/EffectiveCost"`
- const S3SelectSPCost = `s."savingsPlan/SavingsPlanEffectiveCost"`
- type S3SelectIntegration struct {
- S3SelectQuerier
- }
- func (s3si *S3SelectIntegration) GetCloudCost(
- start,
- end time.Time,
- ) (*kubecost.CloudCostSetRange, error) {
- log.Infof(
- "S3SelectIntegration[%s]: GetCloudCost: %s",
- s3si.Key(),
- kubecost.NewWindow(&start, &end).String(),
- )
- // Set midnight yesterday as last point in time reconciliation data
- // can be pulled from to ensure complete days of data
- midnightYesterday := time.Now().In(
- time.UTC,
- ).Truncate(time.Hour*24).AddDate(0, 0, -1)
- if end.After(midnightYesterday) {
- end = midnightYesterday
- }
- // ccsr to populate with cloudcosts.
- ccsr, err := kubecost.NewCloudCostSetRange(
- start,
- end,
- kubecost.AccumulateOptionDay,
- s3si.Key(),
- )
- if err != nil {
- return nil, err
- }
- // acquire S3 client
- client, err := s3si.GetS3Client()
- if err != nil {
- return nil, err
- }
- // Acquire query keys
- queryKeys, err := s3si.GetQueryKeys(start, end, client)
- if err != nil {
- return nil, err
- }
- // Acquire headers
- headers, err := s3si.GetHeaders(queryKeys, client)
- if err != nil {
- return nil, err
- }
- // Exactly what it says on the tin. Though is there a set equivalent
- // in Go? This seems like a good use case for that.
- allColumns := map[string]bool{}
- for _, header := range headers {
- allColumns[header] = true
- }
- formattedStart := start.Format("2006-01-02")
- formattedEnd := end.Format("2006-01-02")
- selectColumns := []string{
- S3SelectStartDate,
- S3SelectAccountID,
- S3SelectResourceID,
- S3SelectItemType,
- S3SelectProductCode,
- S3SelectUsageType,
- S3SelectListCost,
- }
- // OC equivalent to KCM env flags relevant at all?
- // Check for Reservation columns in CUR and query if available
- checkReservations := allColumns[S3SelectRICost]
- if checkReservations {
- selectColumns = append(selectColumns, S3SelectRICost)
- }
- // Check for Savings Plan Columns in CUR and query if available
- checkSavingsPlan := allColumns[S3SelectSPCost]
- if checkSavingsPlan {
- selectColumns = append(selectColumns, S3SelectSPCost)
- }
- // Build map of query columns to use for parsing query
- columnIndexes := map[string]int{}
- for i, column := range selectColumns {
- columnIndexes[column] = i
- }
- // Build query
- selectStr := strings.Join(selectColumns, ", ")
- queryStr := `SELECT %s FROM s3object s
- WHERE (CAST(s."lineItem/UsageStartDate" AS TIMESTAMP) BETWEEN CAST('%s' AS TIMESTAMP) AND CAST('%s' AS TIMESTAMP))
- AND s."lineItem/ResourceId" <> ''
- AND (
- (
- s."lineItem/ProductCode" = 'AmazonEC2' AND (
- SUBSTRING(s."lineItem/ResourceId",1,2) = 'i-'
- OR SUBSTRING(s."lineItem/ResourceId",1,4) = 'vol-'
- )
- )
- OR s."lineItem/ProductCode" = 'AWSELB'
- OR s."lineItem/ProductCode" = 'AmazonFSx'
- )`
- query := fmt.Sprintf(queryStr, selectStr, formattedStart, formattedEnd)
- processResults := func(reader *csv.Reader) error {
- _, err2 := reader.Read()
- if err2 == io.EOF {
- return nil
- }
- for {
- row, err3 := reader.Read()
- if err3 == io.EOF {
- return nil
- }
- startStr := GetCSVRowValue(row, columnIndexes, S3SelectStartDate)
- itemAccountID := GetCSVRowValue(row, columnIndexes, S3SelectAccountID)
- itemProviderID := GetCSVRowValue(row, columnIndexes, S3SelectResourceID)
- lineItemType := GetCSVRowValue(row, columnIndexes, S3SelectItemType)
- itemProductCode := GetCSVRowValue(row, columnIndexes, S3SelectProductCode)
- usageType := GetCSVRowValue(row, columnIndexes, S3SelectUsageType)
- var (
- amortizedCost float64
- listCost float64
- netCost float64
- )
- // Get list and net costs
- listCost, err = GetCSVRowValueFloat(row, columnIndexes, S3SelectListCost)
- if err != nil {
- return err
- }
- netCost, err = GetCSVRowValueFloat(row, columnIndexes, S3SelectNetCost)
- if err != nil {
- return err
- }
- // If there is a reservation_reservation_a_r_n on the line item use the awsRIPricingSUMColumn as cost
- if checkReservations && lineItemType == "DiscountedUsage" {
- amortizedCost, err = GetCSVRowValueFloat(row, columnIndexes, S3SelectRICost)
- if err != nil {
- log.Errorf(err.Error())
- continue
- }
- // If there is a lineItemType of SavingsPlanCoveredUsage use the awsSPPricingSUMColumn
- } else if checkSavingsPlan && lineItemType == "SavingsPlanCoveredUsage" {
- amortizedCost, err = GetCSVRowValueFloat(row, columnIndexes, S3SelectSPCost)
- if err != nil {
- log.Errorf(err.Error())
- continue
- }
- } else {
- // Default to listCost
- amortizedCost = listCost
- }
- category := SelectAWSCategory(itemProviderID, usageType, itemProductCode)
- // Retrieve final stanza of product code for ProviderID
- if itemProductCode == "AWSELB" || itemProductCode == "AmazonFSx" {
- itemProviderID = ParseARN(itemProviderID)
- }
- properties := kubecost.CloudCostProperties{}
- properties.Provider = kubecost.AWSProvider
- properties.AccountID = itemAccountID
- properties.Category = category
- properties.Service = itemProductCode
- properties.ProviderID = itemProviderID
- itemStart, err := time.Parse(S3SelectDateLayout, startStr)
- if err != nil {
- log.Infof(
- "Unable to parse '%s': '%s'",
- S3SelectStartDate,
- err.Error(),
- )
- itemStart = time.Now()
- }
- itemStart = itemStart.Truncate(time.Hour * 24)
- itemEnd := itemStart.AddDate(0, 0, 1)
- cc := &kubecost.CloudCost{
- Properties: &properties,
- Window: kubecost.NewWindow(&itemStart, &itemEnd),
- ListCost: kubecost.CostMetric{
- Cost: listCost,
- },
- NetCost: kubecost.CostMetric{
- Cost: netCost,
- },
- AmortizedNetCost: kubecost.CostMetric{
- Cost: amortizedCost,
- },
- AmortizedCost: kubecost.CostMetric{
- Cost: amortizedCost,
- },
- InvoicedCost: kubecost.CostMetric{
- Cost: netCost,
- },
- }
- ccsr.LoadCloudCost(cc)
- }
- }
- err = s3si.Query(query, queryKeys, client, processResults)
- if err != nil {
- return nil, err
- }
- return ccsr, nil
- }
- func (s3si *S3SelectIntegration) GetHeaders(queryKeys []string, client *s3.Client) ([]string, error) {
- // Query to grab only header line from file
- query := "SELECT * FROM S3OBJECT LIMIT 1"
- var record []string
- proccessheaders := func(reader *csv.Reader) error {
- var err error
- record, err = reader.Read()
- if err != nil {
- return err
- }
- return nil
- }
- // Use only the first query key with assumption that files share schema
- err := s3si.Query(query, []string{queryKeys[0]}, client, proccessheaders)
- if err != nil {
- return nil, err
- }
- return record, nil
- }
|