Kaynağa Gözat

Remove external allocation and azure storage code from cloud pkg

Sean Holcomb 4 yıl önce
ebeveyn
işleme
1b568c8f18

+ 71 - 361
pkg/cloud/awsprovider.go

@@ -25,7 +25,6 @@ import (
 	"github.com/kubecost/cost-model/pkg/errors"
 	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/util"
-	"github.com/kubecost/cost-model/pkg/util/cloudutil"
 	"github.com/kubecost/cost-model/pkg/util/fileutil"
 	"github.com/kubecost/cost-model/pkg/util/json"
 
@@ -1570,10 +1569,10 @@ func generateAWSGroupBy(lastIdx int) string {
 	return strings.Join(sequence, ",")
 }
 
-func (a *AWS) QueryAthenaPaginated(query string) (*athena.GetQueryResultsInput, *athena.Athena, error) {
+func (a *AWS) QueryAthenaPaginated(query string, fn func(*athena.GetQueryResultsOutput, bool) bool) error {
 	customPricing, err := a.GetConfig()
 	if err != nil {
-		return nil, nil, err
+		return err
 	}
 	a.ConfigureAuthWith(customPricing)
 	region := aws.String(customPricing.AthenaRegion)
@@ -1606,7 +1605,7 @@ func (a *AWS) QueryAthenaPaginated(query string) (*athena.GetQueryResultsInput,
 
 	res, err := svc.StartQueryExecution(&e)
 	if err != nil {
-		return nil, svc, err
+		return err
 	}
 
 	klog.V(2).Infof("StartQueryExecution result:")
@@ -1621,7 +1620,7 @@ func (a *AWS) QueryAthenaPaginated(query string) (*athena.GetQueryResultsInput,
 	for {
 		qrop, err = svc.GetQueryExecution(&qri)
 		if err != nil {
-			return nil, svc, err
+			return err
 		}
 		if *qrop.QueryExecution.Status.State != "RUNNING" && *qrop.QueryExecution.Status.State != "QUEUED" {
 			break
@@ -1632,80 +1631,13 @@ func (a *AWS) QueryAthenaPaginated(query string) (*athena.GetQueryResultsInput,
 
 		var ip athena.GetQueryResultsInput
 		ip.SetQueryExecutionId(*res.QueryExecutionId)
-		return &ip, svc, nil
-	} else {
-		return nil, svc, fmt.Errorf("No results available for %s", query)
-	}
-}
-
-func (a *AWS) QueryAthenaBillingData(query string) (*athena.GetQueryResultsOutput, error) {
-	customPricing, err := a.GetConfig()
-	if err != nil {
-		return nil, err
-	}
-
-	a.ConfigureAuthWith(customPricing) // load aws authentication from configuration or secret
-
-	region := aws.String(customPricing.AthenaRegion)
-	resultsBucket := customPricing.AthenaBucketName
-	database := customPricing.AthenaDatabase
-	c := &aws.Config{
-		Region:              region,
-		STSRegionalEndpoint: endpoints.RegionalSTSEndpoint,
-	}
-	s := session.Must(session.NewSession(c))
-	svc := athena.New(s)
-	if customPricing.MasterPayerARN != "" {
-		creds := stscreds.NewCredentials(s, customPricing.MasterPayerARN)
-		svc = athena.New(s, &aws.Config{
-			Region:      region,
-			Credentials: creds,
-		})
-	}
-
-	var e athena.StartQueryExecutionInput
-
-	var r athena.ResultConfiguration
-	r.SetOutputLocation(resultsBucket)
-	e.SetResultConfiguration(&r)
-
-	e.SetQueryString(query)
-	var q athena.QueryExecutionContext
-	q.SetDatabase(database)
-	e.SetQueryExecutionContext(&q)
-
-	res, err := svc.StartQueryExecution(&e)
-	if err != nil {
-		return nil, err
-	}
-
-	klog.V(2).Infof("StartQueryExecution result:")
-	klog.V(2).Infof(res.GoString())
-
-	var qri athena.GetQueryExecutionInput
-	qri.SetQueryExecutionId(*res.QueryExecutionId)
-
-	var qrop *athena.GetQueryExecutionOutput
-	duration := time.Duration(2) * time.Second // Pause for 2 seconds
-
-	for {
-		qrop, err = svc.GetQueryExecution(&qri)
+		err = svc.GetQueryResultsPages(&ip, fn)
 		if err != nil {
-			return nil, err
+			return fmt.Errorf("queryAthenaPaginated: error getting query resultsPages from athena service %s", err)
 		}
-		if *qrop.QueryExecution.Status.State != "RUNNING" && *qrop.QueryExecution.Status.State != "QUEUED" {
-			break
-		}
-		time.Sleep(duration)
-	}
-	if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
-
-		var ip athena.GetQueryResultsInput
-		ip.SetQueryExecutionId(*res.QueryExecutionId)
-
-		return svc.GetQueryResults(&ip)
+		return nil
 	} else {
-		return nil, fmt.Errorf("No results available for %s", query)
+		return fmt.Errorf("No results available for %s", query)
 	}
 }
 
@@ -1784,14 +1716,11 @@ func (a *AWS) GetSavingsPlanDataFromAthena() error {
 
 	klog.V(3).Infof("Running Query: %s", query)
 
-	ip, svc, err := a.QueryAthenaPaginated(query)
+	err = a.QueryAthenaPaginated(query, processResults)
 	if err != nil {
 		return fmt.Errorf("Error fetching Savings Plan Data: %s", err)
 	}
-	athenaErr := svc.GetQueryResultsPages(ip, processResults)
-	if athenaErr != nil {
-		return athenaErr
-	}
+
 	return nil
 }
 
@@ -1815,15 +1744,18 @@ func (a *AWS) GetReservationDataFromAthena() error {
 	// label columns
 	columns, _ := a.ShowAthenaColumns()
 
-	if columns["reservation_reservation_a_r_n"] && columns["reservation_effective_cost"] {
-		if a.RIPricingByInstanceID == nil {
-			a.RIPricingByInstanceID = make(map[string]*RIData)
-		}
-		tNow := time.Now()
-		tOneDayAgo := tNow.Add(time.Duration(-25) * time.Hour) // Also get files from one day ago to avoid boundary conditions
-		start := tOneDayAgo.Format("2006-01-02")
-		end := tNow.Format("2006-01-02")
-		q := `SELECT
+	if !columns["reservation_reservation_a_r_n"] || !columns["reservation_effective_cost"] {
+		klog.Infof("No reserved data available in Athena")
+		a.RIPricingError = nil
+	}
+	if a.RIPricingByInstanceID == nil {
+		a.RIPricingByInstanceID = make(map[string]*RIData)
+	}
+	tNow := time.Now()
+	tOneDayAgo := tNow.Add(time.Duration(-25) * time.Hour) // Also get files from one day ago to avoid boundary conditions
+	start := tOneDayAgo.Format("2006-01-02")
+	end := tNow.Format("2006-01-02")
+	q := `SELECT
 		line_item_usage_start_date,
 		reservation_reservation_a_r_n,
 		line_item_resource_id,
@@ -1832,48 +1764,54 @@ func (a *AWS) GetReservationDataFromAthena() error {
 	WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
 	AND reservation_reservation_a_r_n <> '' ORDER BY
 	line_item_usage_start_date DESC`
-		query := fmt.Sprintf(q, cfg.AthenaTable, start, end)
-		op, err := a.QueryAthenaBillingData(query)
-		if err != nil {
-			a.RIPricingError = err
-			return fmt.Errorf("Error fetching Reserved Instance Data: %s", err)
+
+	page := 0
+	processResults := func(op *athena.GetQueryResultsOutput, lastpage bool) bool {
+		a.RIDataLock.Lock()
+		a.RIPricingByInstanceID = make(map[string]*RIData) // Clean out the old data and only report a RI price if its in the most recent run.
+		mostRecentDate := ""
+		iter := op.ResultSet.Rows
+		if page == 0 && len(iter) > 0 {
+			iter = op.ResultSet.Rows[1:len(op.ResultSet.Rows)]
 		}
-		a.RIPricingError = nil
-		klog.Infof("Fetching RI data...")
-		if len(op.ResultSet.Rows) > 1 {
-			a.RIDataLock.Lock()
-			mostRecentDate := ""
-			for _, r := range op.ResultSet.Rows[1:(len(op.ResultSet.Rows) - 1)] {
-				d := *r.Data[0].VarCharValue
-				if mostRecentDate == "" {
-					mostRecentDate = d
-				} else if mostRecentDate != d { // Get all most recent assignments
-					break
-				}
-				cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
-				if err != nil {
-					klog.Infof("Error converting `%s` from float ", *r.Data[3].VarCharValue)
-				}
-				r := &RIData{
-					ResourceID:     *r.Data[2].VarCharValue,
-					EffectiveCost:  cost,
-					ReservationARN: *r.Data[1].VarCharValue,
-					MostRecentDate: d,
-				}
-				a.RIPricingByInstanceID[r.ResourceID] = r
+		page++
+		for _, r := range iter {
+			d := *r.Data[0].VarCharValue
+			if mostRecentDate == "" {
+				mostRecentDate = d
+			} else if mostRecentDate != d { // Get all most recent assignments
+				break
 			}
-			klog.V(1).Infof("Found %d reserved instances", len(a.RIPricingByInstanceID))
-			for k, r := range a.RIPricingByInstanceID {
-				log.DedupedInfof(5, "Reserved Instance Data found for node %s : %f at time %s", k, r.EffectiveCost, r.MostRecentDate)
+			cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
+			if err != nil {
+				klog.Infof("Error converting `%s` from float ", *r.Data[3].VarCharValue)
 			}
-			a.RIDataLock.Unlock()
-		} else {
-			klog.Infof("No reserved instance data found")
+			r := &RIData{
+				ResourceID:     *r.Data[2].VarCharValue,
+				EffectiveCost:  cost,
+				ReservationARN: *r.Data[1].VarCharValue,
+				MostRecentDate: d,
+			}
+			a.RIPricingByInstanceID[r.ResourceID] = r
 		}
-	} else {
-		klog.Infof("No reserved data available in Athena")
-		a.RIPricingError = nil
+		klog.V(1).Infof("Found %d reserved instances", len(a.RIPricingByInstanceID))
+		for k, r := range a.RIPricingByInstanceID {
+			log.DedupedInfof(5, "Reserved Instance Data found for node %s : %f at time %s", k, r.EffectiveCost, r.MostRecentDate)
+		}
+		a.RIDataLock.Unlock()
+		return true
 	}
+
+	query := fmt.Sprintf(q, cfg.AthenaTable, start, end)
+
+	klog.V(3).Infof("Running Query: %s", query)
+
+	err = a.QueryAthenaPaginated(query, processResults)
+	if err != nil {
+		a.RIPricingError = err
+		return fmt.Errorf("Error fetching Reserved Instance Data: %s", err)
+	}
+	a.RIPricingError = nil
 	return nil
 }
 
@@ -1895,11 +1833,11 @@ func (aws *AWS) ShowAthenaColumns() (map[string]bool, error) {
 
 	q := `SHOW COLUMNS IN  %s`
 	query := fmt.Sprintf(q, cfg.AthenaTable)
-	results, svc, err := aws.QueryAthenaPaginated(query)
+
 
 	columns := []string{}
 	pageNum := 0
-	athenaErr := svc.GetQueryResultsPages(results, func(page *athena.GetQueryResultsOutput, lastpage bool) bool {
+	processResults := func(page *athena.GetQueryResultsOutput, lastpage bool) bool {
 		for _, row := range page.ResultSet.Rows {
 			columns = append(columns, *row.Data[0].VarCharValue)
 		}
@@ -1907,10 +1845,11 @@ func (aws *AWS) ShowAthenaColumns() (map[string]bool, error) {
 		pageNum++
 
 		return true
-	})
-	if athenaErr != nil {
+	}
+	err = aws.QueryAthenaPaginated(query, processResults)
+	if err != nil {
 		log.Warningf("Error getting Athena columns: %s", err)
-		return columnSet, athenaErr
+		return columnSet, err
 	}
 
 	for _, col := range columns {
@@ -1920,208 +1859,6 @@ func (aws *AWS) ShowAthenaColumns() (map[string]bool, error) {
 	return columnSet, nil
 }
 
-// ExternalAllocations represents tagged assets outside the scope of kubernetes.
-// "start" and "end" are dates of the format YYYY-MM-DD
-// "aggregator" is the tag used to determine how to allocate those assets, ie namespace, pod, etc.
-func (a *AWS) ExternalAllocations(start string, end string, aggregators []string, filterType string, filterValue string, crossCluster bool) ([]*OutOfClusterAllocation, error) {
-	customPricing, err := a.GetConfig()
-	if err != nil {
-		return nil, err
-	}
-	formattedAggregators := []string{}
-	for _, agg := range aggregators {
-		aggregator_column_name := "resource_tags_user_" + agg
-		aggregator_column_name = cloudutil.ConvertToGlueColumnFormat(aggregator_column_name)
-		formattedAggregators = append(formattedAggregators, aggregator_column_name)
-	}
-	aggregatorNames := strings.Join(formattedAggregators, ",")
-	aggregatorOr := strings.Join(formattedAggregators, " <> '' OR ")
-	aggregatorOr = aggregatorOr + " <> ''"
-
-	filter_column_name := "resource_tags_user_" + filterType
-	filter_column_name = cloudutil.ConvertToGlueColumnFormat(filter_column_name)
-
-	var query string
-	var lastIdx int
-	if filterType != "kubernetes_" { // This gets appended upstream and is equivalent to no filter.
-		lastIdx = len(formattedAggregators) + 3
-		groupby := generateAWSGroupBy(lastIdx)
-		query = fmt.Sprintf(`SELECT
-			CAST(line_item_usage_start_date AS DATE) as start_date,
-			%s,
-			line_item_product_code,
-			%s,
-			SUM(line_item_blended_cost) as blended_cost
-		FROM %s as cost_data
-		WHERE (%s='%s') AND line_item_usage_start_date BETWEEN date '%s' AND date '%s' AND (%s)
-		GROUP BY %s`, aggregatorNames, filter_column_name, customPricing.AthenaTable, filter_column_name, filterValue, start, end, aggregatorOr, groupby)
-	} else {
-		lastIdx = len(formattedAggregators) + 2
-		groupby := generateAWSGroupBy(lastIdx)
-		query = fmt.Sprintf(`SELECT
-			CAST(line_item_usage_start_date AS DATE) as start_date,
-			%s,
-			line_item_product_code,
-			SUM(line_item_blended_cost) as blended_cost
-		FROM %s as cost_data
-		WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s' AND (%s)
-		GROUP BY %s`, aggregatorNames, customPricing.AthenaTable, start, end, aggregatorOr, groupby)
-	}
-	var oocAllocs []*OutOfClusterAllocation
-	page := 0
-	processResults := func(op *athena.GetQueryResultsOutput, lastpage bool) bool {
-		iter := op.ResultSet.Rows
-		if page == 0 && len(iter) > 0 {
-			iter = op.ResultSet.Rows[1:len(op.ResultSet.Rows)]
-		}
-		page++
-		for _, r := range iter {
-			cost, err := strconv.ParseFloat(*r.Data[lastIdx].VarCharValue, 64)
-			if err != nil {
-				klog.Infof("Error converting cost `%s` from float ", *r.Data[lastIdx].VarCharValue)
-			}
-			environment := ""
-			for _, d := range r.Data[1 : len(formattedAggregators)+1] {
-				if *d.VarCharValue != "" {
-					environment = *d.VarCharValue // just set to the first nonempty match
-				}
-				break
-			}
-			ooc := &OutOfClusterAllocation{
-				Aggregator:  strings.Join(aggregators, ","),
-				Environment: environment,
-				Service:     *r.Data[len(formattedAggregators)+1].VarCharValue,
-				Cost:        cost,
-			}
-			oocAllocs = append(oocAllocs, ooc)
-		}
-		return true
-	}
-	// Query for all column names in advance in order to validate configured
-	// label columns
-	columns, _ := a.ShowAthenaColumns()
-
-	// Check for all aggregators being formatted into the query
-	containsColumns := true
-	for _, agg := range formattedAggregators {
-		if columns[agg] != true {
-			containsColumns = false
-			klog.Warningf("Athena missing column: %s", agg)
-		}
-	}
-	if containsColumns {
-		klog.V(3).Infof("Running Query: %s", query)
-		ip, svc, _ := a.QueryAthenaPaginated(query)
-
-		athenaErr := svc.GetQueryResultsPages(ip, processResults)
-		if athenaErr != nil {
-			klog.Infof("RETURNING ATHENA ERROR")
-			return nil, athenaErr
-		}
-
-		if customPricing.BillingDataDataset != "" && !crossCluster { // There is GCP data, meaning someone has tried to configure a GCP out-of-cluster allocation.
-			gcp, err := NewCrossClusterProvider("gcp", a.Config.ConfigFileManager(), "aws.json", a.Clientset)
-			if err != nil {
-				klog.Infof("Could not instantiate cross-cluster provider %s", err.Error())
-			}
-			gcpOOC, err := gcp.ExternalAllocations(start, end, aggregators, filterType, filterValue, true)
-			if err != nil {
-				klog.Infof("Could not fetch cross-cluster costs %s", err.Error())
-			}
-			oocAllocs = append(oocAllocs, gcpOOC...)
-		}
-	} else {
-		klog.Infof("External Allocations: Athena Query skipped due to missing columns")
-	}
-	return oocAllocs, nil
-}
-
-// QuerySQL can query a properly configured Athena database.
-// Used to fetch billing data.
-// Requires a json config in /var/configs with key region, output, and database.
-func (a *AWS) QuerySQL(query string) ([]byte, error) {
-	customPricing, err := a.GetConfig()
-	if err != nil {
-		return nil, err
-	}
-
-	a.ConfigureAuthWith(customPricing) // load aws authentication from configuration or secret
-
-	athenaConfigs, err := os.Open("/var/configs/athena.json")
-	if err != nil {
-		return nil, err
-	}
-	defer athenaConfigs.Close()
-	b, err := ioutil.ReadAll(athenaConfigs)
-	if err != nil {
-		return nil, err
-	}
-	var athenaConf map[string]string
-	json.Unmarshal([]byte(b), &athenaConf)
-	region := aws.String(customPricing.AthenaRegion)
-	resultsBucket := customPricing.AthenaBucketName
-	database := customPricing.AthenaDatabase
-
-	c := &aws.Config{
-		Region: region,
-	}
-	s := session.Must(session.NewSession(c))
-	svc := athena.New(s)
-
-	var e athena.StartQueryExecutionInput
-
-	var r athena.ResultConfiguration
-	r.SetOutputLocation(resultsBucket)
-	e.SetResultConfiguration(&r)
-
-	e.SetQueryString(query)
-	var q athena.QueryExecutionContext
-	q.SetDatabase(database)
-	e.SetQueryExecutionContext(&q)
-
-	res, err := svc.StartQueryExecution(&e)
-	if err != nil {
-		return nil, err
-	}
-
-	klog.V(2).Infof("StartQueryExecution result:")
-	klog.V(2).Infof(res.GoString())
-
-	var qri athena.GetQueryExecutionInput
-	qri.SetQueryExecutionId(*res.QueryExecutionId)
-
-	var qrop *athena.GetQueryExecutionOutput
-	duration := time.Duration(2) * time.Second // Pause for 2 seconds
-
-	for {
-		qrop, err = svc.GetQueryExecution(&qri)
-		if err != nil {
-			return nil, err
-		}
-		if *qrop.QueryExecution.Status.State != "RUNNING" && *qrop.QueryExecution.Status.State != "QUEUED" {
-			break
-		}
-		time.Sleep(duration)
-	}
-	if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
-
-		var ip athena.GetQueryResultsInput
-		ip.SetQueryExecutionId(*res.QueryExecutionId)
-
-		op, err := svc.GetQueryResults(&ip)
-		if err != nil {
-			return nil, err
-		}
-		b, err := json.Marshal(op.ResultSet)
-		if err != nil {
-			return nil, err
-		}
-
-		return b, nil
-	}
-	return nil, fmt.Errorf("Error getting query results : %s", *qrop.QueryExecution.Status.State)
-}
-
 type spotInfo struct {
 	Timestamp   string `csv:"Timestamp"`
 	UsageType   string `csv:"UsageType"`
@@ -2134,33 +1871,6 @@ type spotInfo struct {
 	Version     string `csv:"Version"`
 }
 
-type fnames []*string
-
-func (f fnames) Len() int {
-	return len(f)
-}
-
-func (f fnames) Swap(i, j int) {
-	f[i], f[j] = f[j], f[i]
-}
-
-func (f fnames) Less(i, j int) bool {
-	key1 := strings.Split(*f[i], ".")
-	key2 := strings.Split(*f[j], ".")
-
-	t1, err := time.Parse("2006-01-02-15", key1[1])
-	if err != nil {
-		klog.V(1).Info("Unable to parse timestamp" + key1[1])
-		return false
-	}
-	t2, err := time.Parse("2006-01-02-15", key2[1])
-	if err != nil {
-		klog.V(1).Info("Unable to parse timestamp" + key2[1])
-		return false
-	}
-	return t1.Before(t2)
-}
-
 func (a *AWS) parseSpotData(bucket string, prefix string, projectID string, region string) (map[string]*spotInfo, error) {
 	if a.ServiceAccountChecks == nil { // Set up checks to store error/success states
 		a.ServiceAccountChecks = make(map[string]*ServiceAccountCheck)

+ 0 - 156
pkg/cloud/azureprovider.go

@@ -2,7 +2,6 @@ package cloud
 
 import (
 	"context"
-	"encoding/csv"
 	"fmt"
 	"io"
 	"io/ioutil"
@@ -18,7 +17,6 @@ import (
 
 	"github.com/kubecost/cost-model/pkg/clustercache"
 	"github.com/kubecost/cost-model/pkg/env"
-	"github.com/kubecost/cost-model/pkg/kubecost"
 	"github.com/kubecost/cost-model/pkg/util"
 	"github.com/kubecost/cost-model/pkg/util/fileutil"
 	"github.com/kubecost/cost-model/pkg/util/json"
@@ -154,9 +152,6 @@ var azureRegions = []string{
 	"brazilsoutheast",
 }
 
-const AzureLayout = "2006-01-02"
-
-var HeaderStrings = []string{"MeterCategory", "UsageDateTime", "InstanceId", "AdditionalInfo", "Tags", "PreTaxCost", "SubscriptionGuid", "ConsumedService", "ResourceGroup", "ResourceType"}
 
 type regionParts []string
 
@@ -1244,157 +1239,6 @@ func (az *Azure) GetConfig() (*CustomPricing, error) {
 	return c, nil
 }
 
-// ExternalAllocations represents tagged assets outside the scope of kubernetes.
-// "start" and "end" are dates of the format YYYY-MM-DD
-// "aggregator" is the tag used to determine how to allocate those assets, ie namespace, pod, etc.
-func (az *Azure) ExternalAllocations(start string, end string, aggregators []string, filterType string, filterValue string, crossCluster bool) ([]*OutOfClusterAllocation, error) {
-	var csvRetriever CSVRetriever = AzureCSVRetriever{}
-	err := az.ConfigureAzureStorage() // load Azure Storage config
-	if err != nil {
-		return nil, err
-	}
-	return getExternalAllocations(start, end, aggregators, filterType, filterValue, crossCluster, csvRetriever)
-}
-
-func getExternalAllocations(start string, end string, aggregators []string, filterType string, filterValue string, crossCluster bool, csvRetriever CSVRetriever) ([]*OutOfClusterAllocation, error) {
-	dateFormat := "2006-1-2"
-	startTime, err := time.Parse(dateFormat, start)
-	if err != nil {
-		return nil, err
-	}
-	endTime, err := time.Parse(dateFormat, end)
-	if err != nil {
-		return nil, err
-	}
-	readers, err := csvRetriever.GetCSVReaders(startTime, endTime)
-	if err != nil {
-		return nil, err
-	}
-	oocAllocs := make(map[string]*OutOfClusterAllocation)
-	for _, reader := range readers {
-		err = parseCSV(reader, startTime, endTime, oocAllocs, aggregators, filterType, filterValue, crossCluster)
-		if err != nil {
-			return nil, err
-		}
-	}
-	var oocAllocsArr []*OutOfClusterAllocation
-	for _, alloc := range oocAllocs {
-		oocAllocsArr = append(oocAllocsArr, alloc)
-	}
-	return oocAllocsArr, nil
-}
-
-func parseCSV(reader *csv.Reader, start, end time.Time, oocAllocs map[string]*OutOfClusterAllocation, aggregators []string, filterType string, filterValue string, crossCluster bool) error {
-	headers, _ := reader.Read()
-	headerMap := createHeaderMap(headers)
-
-	for {
-		var record, err = reader.Read()
-		if err == io.EOF {
-			break
-		}
-		if err != nil {
-			return err
-		}
-
-		meterCategory := record[headerMap["MeterCategory"]]
-		category := selectCategory(meterCategory)
-		usageDateTime, err := time.Parse(AzureLayout, record[headerMap["UsageDateTime"]])
-		if err != nil {
-			klog.Errorf("failed to parse usage date: '%s'", record[headerMap["UsageDateTime"]])
-			continue
-		}
-		// Ignore VM's and Storage Items for now
-		if category == kubecost.ComputeCategory || category == kubecost.StorageCategory || !isValidUsageDateTime(start, end, usageDateTime) {
-			continue
-		}
-
-		itemCost, err := strconv.ParseFloat(record[headerMap["PreTaxCost"]], 64)
-		if err != nil {
-			klog.Infof("failed to parse cost: '%s'", record[headerMap["PreTaxCost"]])
-			continue
-		}
-
-		itemTags := make(map[string]string)
-		itemTagJson := makeValidJSON(record[headerMap["Tags"]])
-		if itemTagJson != "" {
-			err = json.Unmarshal([]byte(itemTagJson), &itemTags)
-			if err != nil {
-				klog.Infof("Could not parse item tags %v", err)
-			}
-		}
-
-		if filterType != "kubernetes_" {
-			if value, ok := itemTags[filterType]; !ok || value != filterValue {
-				continue
-			}
-		}
-		environment := ""
-		for _, agg := range aggregators {
-			if tag, ok := itemTags[agg]; ok {
-				environment = tag // just set to the first nonempty match
-				break
-			}
-		}
-		key := environment + record[headerMap["ConsumedService"]]
-		if alloc, ok := oocAllocs[key]; ok {
-			alloc.Cost += itemCost
-		} else {
-			ooc := &OutOfClusterAllocation{
-				Aggregator:  strings.Join(aggregators, ","),
-				Environment: environment,
-				Service:     record[headerMap["ConsumedService"]],
-				Cost:        itemCost,
-			}
-			oocAllocs[key] = ooc
-		}
-
-	}
-	return nil
-}
-
-func createHeaderMap(headers []string) map[string]int {
-	headerMap := make(map[string]int)
-	for i, header := range headers {
-		for _, headerString := range HeaderStrings {
-			if strings.Contains(header, headerString) {
-				headerMap[headerString] = i
-			}
-		}
-	}
-	return headerMap
-}
-
-func makeValidJSON(jsonString string) string {
-	if jsonString == "" || (jsonString[0] == '{' && jsonString[len(jsonString)-1] == '}') {
-		return jsonString
-	}
-	return fmt.Sprintf("{%v}", jsonString)
-}
-
-// UsageDateTime only contains date information and not time because of this filtering usageDate time is inclusive on start and exclusive on end
-func isValidUsageDateTime(start, end, usageDateTime time.Time) bool {
-	return (usageDateTime.After(start) || usageDateTime.Equal(start)) && usageDateTime.Before(end)
-}
-
-func getStartAndEndTimes(usageDateTime time.Time) (time.Time, time.Time) {
-	start := time.Date(usageDateTime.Year(), usageDateTime.Month(), usageDateTime.Day(), 0, 0, 0, 0, usageDateTime.Location())
-	end := time.Date(usageDateTime.Year(), usageDateTime.Month(), usageDateTime.Day(), 23, 59, 59, 999999999, usageDateTime.Location())
-	return start, end
-}
-
-func selectCategory(meterCategory string) string {
-	if meterCategory == "Virtual Machines" {
-		return kubecost.ComputeCategory
-	} else if meterCategory == "Storage" {
-		return kubecost.StorageCategory
-	} else if meterCategory == "Load Balancer" || meterCategory == "Bandwidth" {
-		return kubecost.NetworkCategory
-	} else {
-		return kubecost.OtherCategory
-	}
-}
-
 func (az *Azure) ApplyReservedInstancePricing(nodes map[string]*Node) {
 
 }

+ 0 - 156
pkg/cloud/csvretriever.go

@@ -1,156 +0,0 @@
-package cloud
-
-import (
-	"bytes"
-	"context"
-	"encoding/csv"
-	"fmt"
-	"github.com/Azure/azure-storage-blob-go/azblob"
-	"github.com/kubecost/cost-model/pkg/env"
-	"net/url"
-	"strings"
-	"time"
-)
-
-type CSVRetriever interface {
-	GetCSVReaders(start, end time.Time) ([]*csv.Reader, error)
-}
-
-type AzureCSVRetriever struct {
-}
-
-func (acr AzureCSVRetriever) GetCSVReaders(start, end time.Time) ([]*csv.Reader, error) {
-
-	containerURL, err := acr.getContainer()
-	if err != nil {
-		return nil, err
-	}
-	return acr.getMostRecentFiles(start, end, containerURL)
-}
-
-func (acr AzureCSVRetriever) getMostRecentFiles(start, end time.Time, containerURL *azblob.ContainerURL) ([]*csv.Reader, error) {
-	ctx := context.Background()
-	blobNames, err := acr.getMostResentBlobNames(start, end, ctx, containerURL)
-	if err != nil {
-		return nil, err
-	}
-	var readers []*csv.Reader
-	for _, blobName := range blobNames {
-		blobURL := containerURL.NewBlobURL(blobName)
-
-		downloadResponse, err := blobURL.Download(ctx, 0, azblob.CountToEnd, azblob.BlobAccessConditions{}, false, azblob.ClientProvidedKeyOptions{})
-		if err != nil {
-			return nil, err
-		}
-		// NOTE: automatically retries are performed if the connection fails
-		bodyStream := downloadResponse.Body(azblob.RetryReaderOptions{MaxRetryRequests: 20})
-
-		// read the body into a buffer
-		downloadedData := bytes.Buffer{}
-		_, err = downloadedData.ReadFrom(bodyStream)
-		if err != nil {
-			return nil, err
-		}
-		reader := csv.NewReader(bytes.NewReader(downloadedData.Bytes()))
-		readers = append(readers, reader)
-	}
-	return readers, nil
-}
-
-func (acr AzureCSVRetriever) getContainer() (*azblob.ContainerURL, error) {
-	accountName := env.Get(env.AzureStorageAccountNameEnvVar, "")
-	accountKey := env.Get(env.AzureStorageAccessKeyEnvVar, "")
-	containerName := env.Get(env.AzureStorageContainerNameEnvVar, "")
-	if accountName == "" || accountKey == "" || containerName == "" {
-		return nil, fmt.Errorf("set up Azure storage config to access out of cluster costs")
-	}
-
-	// Create a default request pipeline using your storage account name and account key.
-	credential, err := azblob.NewSharedKeyCredential(accountName, accountKey)
-	if err != nil {
-		return nil, err
-	}
-
-	p := azblob.NewPipeline(credential, azblob.PipelineOptions{})
-
-	// From the Azure portal, get your storage account blob service URL endpoint.
-	URL, _ := url.Parse(
-		fmt.Sprintf("https://%s.blob.core.windows.net/%s", accountName, containerName))
-
-	// Create a ContainerURL object that wraps the container URL and a request
-	// pipeline to make requests.
-	containerURL := azblob.NewContainerURL(*URL, p)
-	return &containerURL, nil
-}
-
-func (acr AzureCSVRetriever) getMostResentBlobNames(start, end time.Time, ctx context.Context, containerURL *azblob.ContainerURL) ([]string, error) {
-	// Get list of month substrings for months contained in the start to end range
-	monthStrs, err := acr.getMonthStrings(start, end)
-	if err != nil {
-		return nil, err
-	}
-	mostResentBlobs := make(map[string]azblob.BlobItemInternal)
-	for marker := (azblob.Marker{}); marker.NotDone(); {
-		// Get a result segment starting with the blob indicated by the current Marker.
-		listBlob, err := containerURL.ListBlobsFlatSegment(ctx, marker, azblob.ListBlobsSegmentOptions{})
-		if err != nil {
-			return nil, err
-		}
-
-		// ListBlobs returns the start of the next segment; you MUST use this to get
-		// the next segment (after processing the current result segment).
-		marker = listBlob.NextMarker
-
-		// Using the list of months strings find the most resent blob for each month in the range
-		for _, blobInfo := range listBlob.Segment.BlobItems {
-			for _, month := range monthStrs {
-				if strings.Contains(blobInfo.Name, month) {
-					if prevBlob, ok := mostResentBlobs[month]; ok {
-						if prevBlob.Properties.CreationTime.After(*blobInfo.Properties.CreationTime) {
-							continue
-						}
-					}
-					mostResentBlobs[month] = blobInfo
-				}
-			}
-		}
-	}
-
-	// move the blobs names from map into ordered list of blob names
-	var blobNames []string
-	for _, month := range monthStrs {
-		if blob, ok := mostResentBlobs[month]; ok {
-			blobNames = append(blobNames, blob.Name)
-		}
-	}
-	return blobNames, nil
-}
-
-func (acr AzureCSVRetriever) getMonthStrings(start, end time.Time) ([]string, error) {
-	if end.After(time.Now()) {
-		end = time.Now()
-	}
-	if start.After(end) {
-		return []string{}, fmt.Errorf("start date must be before end date")
-	}
-
-	var monthStrs []string
-	monthStr := acr.timeToMonthString(start)
-	endStr := acr.timeToMonthString(end)
-	monthStrs = append(monthStrs, monthStr)
-	currMonth := start.AddDate(0, 0, -start.Day()+1)
-	for monthStr != endStr {
-		currMonth = currMonth.AddDate(0, 1, 0)
-		monthStr = acr.timeToMonthString(currMonth)
-		monthStrs = append(monthStrs, monthStr)
-	}
-
-	return monthStrs, nil
-}
-
-func (acr AzureCSVRetriever) timeToMonthString(input time.Time) string {
-	format := "20060102"
-	startOfMonth := input.AddDate(0, 0, -input.Day()+1)
-	endOfMonth := input.AddDate(0, 1, -input.Day())
-	return startOfMonth.Format(format) + "-" + endOfMonth.Format(format)
-}

+ 0 - 216
pkg/cloud/gcpprovider.go

@@ -26,7 +26,6 @@ import (
 	"golang.org/x/oauth2"
 	"golang.org/x/oauth2/google"
 	compute "google.golang.org/api/compute/v1"
-	"google.golang.org/api/iterator"
 	v1 "k8s.io/api/core/v1"
 	"k8s.io/klog"
 )
@@ -111,53 +110,6 @@ type multiKeyGCPAllocation struct {
 	Cost    float64
 }
 
-func multiKeyGCPAllocationToOutOfClusterAllocation(gcpAlloc multiKeyGCPAllocation, aggregatorNames []string) *OutOfClusterAllocation {
-	var keys []map[string]string
-	var environment string
-	var usedAggregatorName string
-	if gcpAlloc.Keys.Valid {
-		err := json.Unmarshal([]byte(gcpAlloc.Keys.StringVal), &keys)
-		if err != nil {
-			klog.Infof("Invalid unmarshaling response from BigQuery filtered query: %s", err.Error())
-		}
-	keyloop:
-		for _, label := range keys {
-			for _, aggregatorName := range aggregatorNames {
-				if label["key"] == aggregatorName {
-					environment = label["value"]
-					usedAggregatorName = label["key"]
-					break keyloop
-				}
-			}
-		}
-	}
-	return &OutOfClusterAllocation{
-		Aggregator:  usedAggregatorName,
-		Environment: environment,
-		Service:     gcpAlloc.Service,
-		Cost:        gcpAlloc.Cost,
-	}
-}
-
-func gcpAllocationToOutOfClusterAllocation(gcpAlloc gcpAllocation) *OutOfClusterAllocation {
-	var aggregator string
-	if gcpAlloc.Aggregator.Valid {
-		aggregator = gcpAlloc.Aggregator.StringVal
-	}
-
-	var environment string
-	if gcpAlloc.Environment.Valid {
-		environment = gcpAlloc.Environment.StringVal
-	}
-
-	return &OutOfClusterAllocation{
-		Aggregator:  aggregator,
-		Environment: environment,
-		Service:     gcpAlloc.Service,
-		Cost:        gcpAlloc.Cost,
-	}
-}
-
 // GetLocalStorageQuery returns the cost of local storage for the given window. Setting rate=true
 // returns hourly spend. Setting used=true only tracks used storage, not total.
 func (gcp *GCP) GetLocalStorageQuery(window, offset time.Duration, rate bool, used bool) string {
@@ -337,174 +289,6 @@ func (gcp *GCP) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, er
 	})
 }
 
-// ExternalAllocations represents tagged assets outside the scope of kubernetes.
-// "start" and "end" are dates of the format YYYY-MM-DD
-// "aggregator" is the tag used to determine how to allocate those assets, ie namespace, pod, etc.
-func (gcp *GCP) ExternalAllocations(start string, end string, aggregators []string, filterType string, filterValue string, crossCluster bool) ([]*OutOfClusterAllocation, error) {
-	if env.LegacyExternalCostsAPIDisabled() {
-		return nil, fmt.Errorf("Legacy External Allocations API disabled.")
-	}
-
-	c, err := gcp.Config.GetCustomPricingData()
-	if err != nil {
-		return nil, err
-	}
-
-	var s []*OutOfClusterAllocation
-	if c.ServiceKeyName != "" && c.ServiceKeySecret != "" && !crossCluster {
-		aws, err := NewCrossClusterProvider("aws", gcp.Config.ConfigFileManager(), "gcp.json", gcp.Clientset)
-		if err != nil {
-			klog.Infof("Could not instantiate cross-cluster provider %s", err.Error())
-		}
-		awsOOC, err := aws.ExternalAllocations(start, end, aggregators, filterType, filterValue, true)
-		if err != nil {
-			klog.Infof("Could not fetch cross-cluster costs %s", err.Error())
-		}
-		s = append(s, awsOOC...)
-	}
-
-	formattedAggregators := []string{}
-	for _, a := range aggregators {
-		formattedAggregators = append(formattedAggregators, strconv.Quote(a))
-	}
-
-	aggregator := strings.Join(formattedAggregators, ",")
-
-	var qerr error
-	if filterType == "kubernetes_" {
-		// start, end formatted like: "2019-04-20 00:00:00"
-		/* OLD METHOD: supported getting all data, including unaggregated.
-		queryString := fmt.Sprintf(`SELECT
-						service,
-						labels.key as aggregator,
-						labels.value as environment,
-						SUM(cost) as cost
-						FROM  (SELECT
-								service.description as service,
-								labels,
-								cost
-							FROM %s
-							WHERE usage_start_time >= "%s" AND usage_start_time < "%s")
-							LEFT JOIN UNNEST(labels) as labels
-							ON labels.key = "%s"
-					GROUP BY aggregator, environment, service;`, c.BillingDataDataset, start, end, aggregator) // For example, "billing_data.gcp_billing_export_v1_01AC9F_74CF1D_5565A2"
-		klog.V(3).Infof("Querying \"%s\" with : %s", c.ProjectID, queryString)
-		gcpOOC, err := gcp.QuerySQL(queryString)
-		s = append(s, gcpOOC...)
-		qerr = err
-		*/
-		queryString := `(
-			SELECT
-				service.description as service,
-				TO_JSON_STRING(labels) as keys,
-				SUM(cost) as cost
-			FROM` +
-			fmt.Sprintf(" `%s` ", c.BillingDataDataset) +
-			fmt.Sprintf(`WHERE EXISTS (SELECT * FROM UNNEST(labels) AS l2 WHERE l2.key IN (%s))
-			AND usage_start_time >= "%s" AND usage_start_time < "%s"
-			GROUP BY service, keys
-		)`, aggregator, start, end)
-		klog.V(3).Infof("Querying \"%s\" with : %s", c.ProjectID, queryString)
-		gcpOOC, err := gcp.multiLabelQuery(queryString, aggregators)
-		s = append(s, gcpOOC...)
-		qerr = err
-	} else {
-		if filterType == "kubernetes_labels" {
-			fvs := strings.Split(filterValue, "=")
-			if len(fvs) == 2 {
-				// if we are given "app=myapp" then look for label "kubernetes_label_app=myapp"
-				filterType = fmt.Sprintf("kubernetes_label_%s", fvs[0])
-				filterValue = fvs[1]
-			} else {
-				klog.V(2).Infof("[Warning] illegal kubernetes_labels filterValue: %s", filterValue)
-			}
-		}
-
-		queryString := `(
-			SELECT
-				service.description as service,
-				TO_JSON_STRING(labels) as keys,
-				SUM(cost) as cost
-		  	FROM` +
-			fmt.Sprintf(" `%s` ", c.BillingDataDataset) +
-			fmt.Sprintf(`WHERE EXISTS (SELECT * FROM UNNEST(labels) AS l2 WHERE l2.key IN (%s))
-			AND EXISTS (SELECT * FROM UNNEST(labels) AS l WHERE l.key = "%s" AND l.value = "%s")
-			AND usage_start_time >= "%s" AND usage_start_time < "%s"
-			GROUP BY service, keys
-		)`, aggregator, filterType, filterValue, start, end)
-		klog.V(4).Infof("Querying \"%s\" with : %s", c.ProjectID, queryString)
-		gcpOOC, err := gcp.multiLabelQuery(queryString, aggregators)
-		s = append(s, gcpOOC...)
-		qerr = err
-	}
-	if qerr != nil && gcp.ServiceKeyProvided {
-		klog.Infof("Error querying gcp: %s", qerr)
-	}
-	return s, qerr
-}
-
-func (gcp *GCP) multiLabelQuery(query string, aggregators []string) ([]*OutOfClusterAllocation, error) {
-	c, err := gcp.Config.GetCustomPricingData()
-	if err != nil {
-		return nil, err
-	}
-	ctx := context.Background()
-	client, err := bigquery.NewClient(ctx, c.ProjectID) // For example, "guestbook-227502"
-	if err != nil {
-		return nil, err
-	}
-
-	q := client.Query(query)
-	it, err := q.Read(ctx)
-	if err != nil {
-		return nil, err
-	}
-	var allocations []*OutOfClusterAllocation
-	for {
-		var a multiKeyGCPAllocation
-		err := it.Next(&a)
-		if err == iterator.Done {
-			break
-		}
-		if err != nil {
-			return nil, err
-		}
-		allocations = append(allocations, multiKeyGCPAllocationToOutOfClusterAllocation(a, aggregators))
-	}
-	return allocations, nil
-}
-
-// QuerySQL should query BigQuery for billing data for out of cluster costs.
-func (gcp *GCP) QuerySQL(query string) ([]*OutOfClusterAllocation, error) {
-	c, err := gcp.Config.GetCustomPricingData()
-	if err != nil {
-		return nil, err
-	}
-	ctx := context.Background()
-	client, err := bigquery.NewClient(ctx, c.ProjectID) // For example, "guestbook-227502"
-	if err != nil {
-		return nil, err
-	}
-
-	q := client.Query(query)
-	it, err := q.Read(ctx)
-	if err != nil {
-		return nil, err
-	}
-	var allocations []*OutOfClusterAllocation
-	for {
-		var a gcpAllocation
-		err := it.Next(&a)
-		if err == iterator.Done {
-			break
-		}
-		if err != nil {
-			return nil, err
-		}
-		allocations = append(allocations, gcpAllocationToOutOfClusterAllocation(a))
-	}
-	return allocations, nil
-}
 
 // ClusterName returns the name of a GKE cluster, as provided by metadata.
 func (gcp *GCP) ClusterInfo() (map[string]string, error) {

+ 0 - 1
pkg/cloud/provider.go

@@ -262,7 +262,6 @@ type Provider interface {
 	GetConfig() (*CustomPricing, error)
 	GetManagementPlatform() (string, error)
 	GetLocalStorageQuery(time.Duration, time.Duration, bool, bool) string
-	ExternalAllocations(string, string, []string, string, string, bool) ([]*OutOfClusterAllocation, error)
 	ApplyReservedInstancePricing(map[string]*Node)
 	ServiceAccountStatus() *ServiceAccountStatus
 	PricingSourceStatus() map[string]*PricingSource

+ 0 - 66
pkg/costmodel/router.go

@@ -555,71 +555,6 @@ func parseAggregations(customAggregation, aggregator, filterType string) (string
 	return key, val, filter
 }
 
-func (a *Accesses) OutofClusterCosts(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
-	w.Header().Set("Content-Type", "application/json")
-	w.Header().Set("Access-Control-Allow-Origin", "*")
-
-	start := r.URL.Query().Get("start")
-	end := r.URL.Query().Get("end")
-	aggregator := r.URL.Query().Get("aggregator")
-	customAggregation := r.URL.Query().Get("customAggregation")
-	filterType := r.URL.Query().Get("filterType")
-	filterValue := r.URL.Query().Get("filterValue")
-	var data []*cloud.OutOfClusterAllocation
-	var err error
-	_, aggregations, filter := parseAggregations(customAggregation, aggregator, filterType)
-	data, err = a.CloudProvider.ExternalAllocations(start, end, aggregations, filter, filterValue, false)
-	w.Write(WrapData(data, err))
-}
-
-func (a *Accesses) OutOfClusterCostsWithCache(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
-	w.Header().Set("Content-Type", "application/json")
-	w.Header().Set("Access-Control-Allow-Origin", "*")
-
-	// start date for which to query costs, inclusive; format YYYY-MM-DD
-	start := r.URL.Query().Get("start")
-	// end date for which to query costs, inclusive; format YYYY-MM-DD
-	end := r.URL.Query().Get("end")
-	// aggregator sets the field by which to aggregate; default, prepended by "kubernetes_"
-	kubernetesAggregation := r.URL.Query().Get("aggregator")
-	// customAggregation allows full customization of aggregator w/o prepending
-	customAggregation := r.URL.Query().Get("customAggregation")
-	// disableCache, if set to "true", tells this function to recompute and
-	// cache the requested data
-	disableCache := r.URL.Query().Get("disableCache") == "true"
-	// clearCache, if set to "true", tells this function to flush the cache,
-	// then recompute and cache the requested data
-	clearCache := r.URL.Query().Get("clearCache") == "true"
-
-	filterType := r.URL.Query().Get("filterType")
-	filterValue := r.URL.Query().Get("filterValue")
-
-	aggregationkey, aggregation, filter := parseAggregations(customAggregation, kubernetesAggregation, filterType)
-
-	// clear cache prior to checking the cache so that a clearCache=true
-	// request always returns a freshly computed value
-	if clearCache {
-		a.OutOfClusterCache.Flush()
-	}
-
-	// attempt to retrieve cost data from cache
-	key := fmt.Sprintf(`%s:%s:%s:%s:%s`, start, end, aggregationkey, filter, filterValue)
-	if value, found := a.OutOfClusterCache.Get(key); found && !disableCache {
-		if data, ok := value.([]*cloud.OutOfClusterAllocation); ok {
-			w.Write(WrapDataWithMessage(data, nil, fmt.Sprintf("out of cluster cache hit: %s", key)))
-			return
-		}
-		klog.Errorf("caching error: failed to type cast data: %s", key)
-	}
-
-	data, err := a.CloudProvider.ExternalAllocations(start, end, aggregation, filter, filterValue, false)
-	if err == nil {
-		a.OutOfClusterCache.Set(key, data, cache.DefaultExpiration)
-	}
-
-	w.Write(WrapDataWithMessage(data, err, fmt.Sprintf("out of cluser cache miss: %s", key)))
-}
-
 func (a *Accesses) GetAllNodePricing(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
@@ -1630,7 +1565,6 @@ func Initialize(additionalConfigWatchers ...*watcher.ConfigMapWatcher) *Accesses
 	a.Router.GET("/costDataModelRange", a.CostDataModelRange)
 	a.Router.GET("/aggregatedCostModel", a.AggregateCostModelHandler)
 	a.Router.GET("/allocation/compute", a.ComputeAllocationHandler)
-	a.Router.GET("/outOfClusterCosts", a.OutOfClusterCostsWithCache)
 	a.Router.GET("/allNodePricing", a.GetAllNodePricing)
 	a.Router.POST("/refreshPricing", a.RefreshPricingData)
 	a.Router.GET("/clusterCostsOverTime", a.ClusterCostsOverTime)