Explorar o código

Merge pull request #1957 from opencost/sean/open-source-big-query

Sean/open source big query
Sean Holcomb %!s(int64=3) %!d(string=hai) anos
pai
achega
0195d83081

+ 8 - 9
pkg/cloud/aws/athenaintegration.go

@@ -69,7 +69,7 @@ type AthenaIntegration struct {
 
 // Query Athena for CUR data and build a new CloudCostSetRange containing the info
 func (ai *AthenaIntegration) GetCloudCost(start, end time.Time) (*kubecost.CloudCostSetRange, error) {
-	log.Infof("AthenaIntegration[%s]: StoreCloudCost: %s", ai.Key(), kubecost.NewWindow(&start, &end).String())
+	log.Infof("AthenaIntegration[%s]: GetCloudCost: %s", ai.Key(), kubecost.NewWindow(&start, &end).String())
 	// Query for all column names
 	allColumns, err := ai.GetColumns()
 	if err != nil {
@@ -185,31 +185,30 @@ func (ai *AthenaIntegration) GetCloudCost(start, end time.Time) (*kubecost.Cloud
 	`
 	aqi.Query = fmt.Sprintf(queryStr, columnStr, ai.Table, whereClause, groupByStr)
 
-	CCSR, err := kubecost.NewCloudCostSetRange(start, end, timeutil.Day, ai.Key())
+	ccsr, err := kubecost.NewCloudCostSetRange(start, end, timeutil.Day, ai.Key())
 	if err != nil {
 		return nil, err
 	}
 
 	// Generate row handling function.
 	rowHandler := func(row types.Row) {
-		err2 := ai.RowToCloudCost(row, aqi, CCSR)
+		err2 := ai.RowToCloudCost(row, aqi, ccsr)
 		if err2 != nil {
-			log.Errorf("AthenaIntegration: queryCloudCostCompute: error while parsing row: %s", err2.Error())
+			log.Errorf("AthenaIntegration: GetCloudCost: error while parsing row: %s", err2.Error())
 		}
 	}
-	log.Debugf("AthenaIntegration[%s]: queryCloudCostCompute: querying: %s", ai.Key(), aqi.Query)
+	log.Debugf("AthenaIntegration[%s]: GetCloudCost: querying: %s", ai.Key(), aqi.Query)
 	// Query CUR data and fill out CCSR
 	err = ai.Query(context.TODO(), aqi.Query, GetAthenaQueryFunc(rowHandler))
 	if err != nil {
 		return nil, err
 	}
 
-	// TODO: May not be needed anymore?
-	for _, ccs := range CCSR.CloudCostSets {
-		log.Debugf("AthenaIntegration[%s]: queryCloudCostCompute: writing compute items for window %s: %d", ai.Key(), ccs.Window, len(ccs.CloudCosts))
+	for _, ccs := range ccsr.CloudCostSets {
+		log.Debugf("AthenaIntegration[%s]: GetCloudCost: writing compute items for window %s: %d", ai.Key(), ccs.Window, len(ccs.CloudCosts))
 		ai.ConnectionStatus = ai.GetConnectionStatusFromResult(ccs, ai.ConnectionStatus)
 	}
-	return CCSR, nil
+	return ccsr, nil
 
 }
 

+ 1 - 1
pkg/cloud/aws/athenaintegration_test.go

@@ -9,7 +9,7 @@ import (
 	"github.com/opencost/opencost/pkg/util/timeutil"
 )
 
-func GetCloudCost_Test(t *testing.T) {
+func TestAthenaIntegration_GetCloudCost(t *testing.T) {
 	athenaConfigPath := os.Getenv("ATHENA_CONFIGURATION")
 	if athenaConfigPath == "" {
 		t.Skip("skipping integration test, set environment variable ATHENA_CONFIGURATION")

+ 369 - 0
pkg/cloud/gcp/bigqueryintegration.go

@@ -0,0 +1,369 @@
+package gcp
+
+import (
+	"context"
+	"encoding/json"
+	"fmt"
+	"regexp"
+	"strings"
+	"time"
+
+	"cloud.google.com/go/bigquery"
+	"github.com/opencost/opencost/pkg/kubecost"
+	"github.com/opencost/opencost/pkg/log"
+	"github.com/opencost/opencost/pkg/util/timeutil"
+	"google.golang.org/api/iterator"
+)
+
+type BigQueryIntegration struct {
+	BigQueryQuerier
+}
+
+const (
+	UsageDateColumnName          = "usage_date"
+	BillingAccountIDColumnName   = "billing_id"
+	ProjectIDColumnName          = "project_id"
+	ServiceDescriptionColumnName = "service"
+	SKUDescriptionColumnName     = "description"
+	LabelsColumnName             = "labels"
+	ResourceNameColumnName       = "resource"
+	CostColumnName               = "cost"
+	CreditsColumnName            = "credits"
+)
+
+const BiqQueryWherePartitionFmt = `DATE(_PARTITIONTIME) >= "%s" AND DATE(_PARTITIONTIME) < "%s"`
+const BiqQueryWhereDateFmt = `usage_start_time >= "%s" AND usage_start_time < "%s"`
+
+func (bqi *BigQueryIntegration) GetCloudCost(start time.Time, end time.Time) (*kubecost.CloudCostSetRange, error) {
+	// Build Query
+
+	selectColumns := []string{
+		fmt.Sprintf("TIMESTAMP_TRUNC(usage_start_time, day) as %s", UsageDateColumnName),
+		fmt.Sprintf("billing_account_id as %s", BillingAccountIDColumnName),
+		fmt.Sprintf("project.id as %s", ProjectIDColumnName),
+		fmt.Sprintf("service.description as %s", ServiceDescriptionColumnName),
+		fmt.Sprintf("sku.description as %s", SKUDescriptionColumnName),
+		fmt.Sprintf("resource.name as %s", ResourceNameColumnName),
+		fmt.Sprintf("TO_JSON_STRING(labels) as %s", LabelsColumnName),
+		fmt.Sprintf("SUM(cost) as %s", CostColumnName),
+		fmt.Sprintf("IFNULL(SUM((Select SUM(amount) FROM bd.credits)),0) as %s", CreditsColumnName),
+	}
+
+	groupByColumns := []string{
+		UsageDateColumnName,
+		BillingAccountIDColumnName,
+		ProjectIDColumnName,
+		ServiceDescriptionColumnName,
+		SKUDescriptionColumnName,
+		LabelsColumnName,
+		ResourceNameColumnName,
+	}
+
+	partitionStart := start
+	partitionEnd := end.AddDate(0, 0, 2)
+	wherePartition := fmt.Sprintf(BiqQueryWherePartitionFmt, partitionStart.Format("2006-01-02"), partitionEnd.Format("2006-01-02"))
+	whereDate := fmt.Sprintf(BiqQueryWhereDateFmt, start.Format("2006-01-02"), end.Format("2006-01-02"))
+
+	whereConjuncts := []string{
+		wherePartition,
+		whereDate,
+	}
+
+	columnStr := strings.Join(selectColumns, ", ")
+	table := fmt.Sprintf(" `%s` bd ", bqi.GetBillingDataDataset())
+	whereClause := strings.Join(whereConjuncts, " AND ")
+	groupByStr := strings.Join(groupByColumns, ", ")
+	queryStr := `
+		SELECT %s
+		FROM %s
+		WHERE %s
+		GROUP BY %s
+	`
+
+	querystr := fmt.Sprintf(queryStr, columnStr, table, whereClause, groupByStr)
+
+	// Perform Query and parse values
+
+	ccsr, err := kubecost.NewCloudCostSetRange(start, end, timeutil.Day, bqi.Key())
+	if err != nil {
+		return ccsr, fmt.Errorf("error creating new CloudCostSetRange: %s", err)
+	}
+
+	iter, err := bqi.Query(context.Background(), querystr)
+	if err != nil {
+		return ccsr, fmt.Errorf("error querying: %s", err)
+	}
+
+	// Parse query into CloudCostSetRange
+	for {
+		var ccl CloudCostLoader
+		err = iter.Next(&ccl)
+		if err == iterator.Done {
+			break
+		}
+		if err != nil {
+			return ccsr, err
+		}
+		if ccl.CloudCost == nil {
+			continue
+		}
+		ccsr.LoadCloudCost(ccl.CloudCost)
+
+	}
+	return ccsr, nil
+
+}
+
+type CloudCostLoader struct {
+	CloudCost *kubecost.CloudCost
+}
+
+// Load populates the fields of a CloudCostValues with bigquery.Value from provided slice
+func (ccl *CloudCostLoader) Load(values []bigquery.Value, schema bigquery.Schema) error {
+
+	// Create Cloud Cost Properties
+	properties := kubecost.CloudCostProperties{
+		Provider: kubecost.GCPProvider,
+	}
+	var window kubecost.Window
+	var description string
+	var listCost float64
+	var credits float64
+
+	for i, field := range schema {
+		if field == nil {
+			log.DedupedErrorf(5, "GCP: BigQuery: found nil field in schema")
+			continue
+		}
+
+		switch field.Name {
+		case UsageDateColumnName:
+			usageDate, ok := values[i].(time.Time)
+			if !ok {
+				// It would be very surprising if an unparsable time came back from the API, so it should be ok to return here.
+				return fmt.Errorf("error parsing usage date: %v", values[0])
+			}
+			// start and end will be the day that the usage occurred on
+			s := usageDate
+			e := s.Add(timeutil.Day)
+			window = kubecost.NewWindow(&s, &e)
+		case BillingAccountIDColumnName:
+			invoiceEntityID, ok := values[i].(string)
+			if !ok {
+				log.DedupedErrorf(5, "error parsing GCP CloudCost %s: %v", BillingAccountIDColumnName, values[i])
+				invoiceEntityID = ""
+			}
+			properties.InvoiceEntityID = invoiceEntityID
+		case ProjectIDColumnName:
+			accountID, ok := values[i].(string)
+			if !ok {
+				log.DedupedErrorf(5, "error parsing GCP CloudCost %s: %v", ProjectIDColumnName, values[i])
+				accountID = ""
+			}
+			properties.AccountID = accountID
+		case ServiceDescriptionColumnName:
+			service, ok := values[i].(string)
+			if !ok {
+				log.DedupedErrorf(5, "error parsing GCP CloudCost %s: %v", ServiceDescriptionColumnName, values[i])
+				service = ""
+			}
+			properties.Service = service
+		case SKUDescriptionColumnName:
+			d, ok := values[i].(string)
+			if !ok {
+				log.DedupedErrorf(5, "error parsing GCP CloudCost %s: %v", SKUDescriptionColumnName, values[i])
+				d = ""
+			}
+			description = d
+		case LabelsColumnName:
+			labelJSON, ok := values[i].(string)
+			if !ok {
+				log.DedupedErrorf(5, "error parsing GCP CloudCost %s: %v", LabelsColumnName, values[i])
+			}
+			labelList := []map[string]string{}
+			err := json.Unmarshal([]byte(labelJSON), &labelList)
+			if err != nil {
+				log.Warnf("GCP Cloud Assets: error unmarshaling GCP CloudCost labels: %s", err)
+			}
+			labels := map[string]string{}
+			for _, pair := range labelList {
+				key := pair["key"]
+				value := pair["value"]
+				labels[key] = value
+			}
+			properties.Labels = labels
+		case ResourceNameColumnName:
+			resouceNameValue := values[i]
+			if resouceNameValue == nil {
+				properties.ProviderID = ""
+				continue
+			}
+			resource, ok := resouceNameValue.(string)
+			if !ok {
+				log.DedupedErrorf(5, "error parsing GCP CloudCost %s: %v", ResourceNameColumnName, values[i])
+				properties.ProviderID = ""
+				continue
+			}
+
+			properties.ProviderID = ParseProviderID(resource)
+		case CostColumnName:
+			cost, ok := values[i].(float64)
+			if !ok {
+				log.DedupedErrorf(5, "error parsing GCP CloudCost %s: %v", CostColumnName, values[i])
+				cost = 0.0
+			}
+			listCost = cost
+		case CreditsColumnName:
+			creditSum, ok := values[i].(float64)
+			if !ok {
+				log.DedupedErrorf(5, "error parsing GCP CloudCost %s: %v", CreditsColumnName, values[i])
+				creditSum = 0.0
+			}
+			credits = creditSum
+		default:
+			log.DedupedErrorf(5, "GCP: BigQuery: found unrecognized column name %s", field.Name)
+		}
+	}
+
+	// Check required Fields
+	if window.IsOpen() {
+		return fmt.Errorf("GCP: BigQuery: error parsing, item had invalid window")
+	}
+
+	// Determine Category
+	properties.Category = SelectCategory(properties.Service, description)
+
+	// sum credit and cost for NetCost
+	netCost := listCost + credits
+
+	// Using the NetCost as a 'placeholder' for these costs now, until we can revisit and spend the time to do
+	// the calculations correctly
+	amortizedCost := netCost
+	amortizedNetCost := netCost
+	invoicedCost := netCost
+
+	// percent k8s is determined by the presence of labels
+	k8sPercent := 0.0
+	if IsK8s(properties.Labels) {
+		k8sPercent = 1.0
+	}
+
+	ccl.CloudCost = &kubecost.CloudCost{
+		Properties: &properties,
+		Window:     window,
+		ListCost: kubecost.CostMetric{
+			Cost:              listCost,
+			KubernetesPercent: k8sPercent,
+		},
+		AmortizedCost: kubecost.CostMetric{
+			Cost:              amortizedCost,
+			KubernetesPercent: k8sPercent,
+		},
+		AmortizedNetCost: kubecost.CostMetric{
+			Cost:              amortizedNetCost,
+			KubernetesPercent: k8sPercent,
+		},
+		InvoicedCost: kubecost.CostMetric{
+			Cost:              invoicedCost,
+			KubernetesPercent: k8sPercent,
+		},
+		NetCost: kubecost.CostMetric{
+			Cost:              netCost,
+			KubernetesPercent: k8sPercent,
+		},
+	}
+
+	return nil
+}
+
+func IsK8s(labels map[string]string) bool {
+	if _, ok := labels["goog-gke-volume"]; ok {
+		return true
+	}
+
+	if _, ok := labels["goog-gke-node"]; ok {
+		return true
+	}
+
+	if _, ok := labels["goog-k8s-cluster-name"]; ok {
+		return true
+	}
+
+	return false
+}
+
+var parseProviderIDRx = regexp.MustCompile("^.+\\/(.+)?") // Capture "gke-cluster-3-default-pool-xxxx-yy" from "projects/###/instances/gke-cluster-3-default-pool-xxxx-yy"
+
+func ParseProviderID(id string) string {
+	match := parseProviderIDRx.FindStringSubmatch(id)
+	if len(match) == 0 {
+		return id
+	}
+	return match[len(match)-1]
+}
+
+func SelectCategory(service, description string) string {
+	s := strings.ToLower(service)
+	d := strings.ToLower(description)
+
+	// Network descriptions
+	if strings.Contains(d, "download") {
+		return kubecost.NetworkCategory
+	}
+	if strings.Contains(d, "network") {
+		return kubecost.NetworkCategory
+	}
+	if strings.Contains(d, "ingress") {
+		return kubecost.NetworkCategory
+	}
+	if strings.Contains(d, "egress") {
+		return kubecost.NetworkCategory
+	}
+	if strings.Contains(d, "static ip") {
+		return kubecost.NetworkCategory
+	}
+	if strings.Contains(d, "external ip") {
+		return kubecost.NetworkCategory
+	}
+	if strings.Contains(d, "load balanced") {
+		return kubecost.NetworkCategory
+	}
+	if strings.Contains(d, "licensing fee") {
+		return kubecost.OtherCategory
+	}
+
+	// Storage Descriptions
+	if strings.Contains(d, "storage") {
+		return kubecost.StorageCategory
+	}
+	if strings.Contains(d, "pd capacity") {
+		return kubecost.StorageCategory
+	}
+	if strings.Contains(d, "pd iops") {
+		return kubecost.StorageCategory
+	}
+	if strings.Contains(d, "pd snapshot") {
+		return kubecost.StorageCategory
+	}
+
+	// Service Defaults
+	if strings.Contains(s, "storage") {
+		return kubecost.StorageCategory
+	}
+	if strings.Contains(s, "compute") {
+		return kubecost.ComputeCategory
+	}
+	if strings.Contains(s, "sql") {
+		return kubecost.StorageCategory
+	}
+	if strings.Contains(s, "bigquery") {
+		return kubecost.StorageCategory
+	}
+	if strings.Contains(s, "kubernetes") {
+		return kubecost.ManagementCategory
+	} else if strings.Contains(s, "pub/sub") {
+		return kubecost.NetworkCategory
+	}
+
+	return kubecost.OtherCategory
+}

+ 58 - 0
pkg/cloud/gcp/bigqueryintegration_test.go

@@ -0,0 +1,58 @@
+package gcp
+
+import (
+	"encoding/json"
+	"os"
+	"testing"
+	"time"
+
+	"github.com/opencost/opencost/pkg/kubecost"
+	"github.com/opencost/opencost/pkg/util/timeutil"
+)
+
+func TestBigQueryIntegration_GetCloudCost(t *testing.T) {
+	bigQueryConfigPath := os.Getenv("BIGQUERY_CONFIGURATION")
+	if bigQueryConfigPath == "" {
+		t.Skip("skipping integration test, set environment variable ATHENA_CONFIGURATION")
+	}
+	bigQueryConfigBin, err := os.ReadFile(bigQueryConfigPath)
+	if err != nil {
+		t.Fatalf("failed to read config file: %s", err.Error())
+	}
+	var bigQueryConfig BigQueryConfiguration
+	err = json.Unmarshal(bigQueryConfigBin, &bigQueryConfig)
+	if err != nil {
+		t.Fatalf("failed to unmarshal config from JSON: %s", err.Error())
+	}
+
+	today := kubecost.RoundBack(time.Now().UTC(), timeutil.Day)
+
+	testCases := map[string]struct {
+		integration *BigQueryIntegration
+		start       time.Time
+		end         time.Time
+		expected    bool
+	}{
+
+		"last week window": {
+			integration: &BigQueryIntegration{
+				BigQueryQuerier: BigQueryQuerier{
+					BigQueryConfiguration: bigQueryConfig,
+				},
+			},
+			end:      today.Add(-7 * timeutil.Day),
+			start:    today.Add(-8 * timeutil.Day),
+			expected: false,
+		},
+	}
+	for name, testCase := range testCases {
+		t.Run(name, func(t *testing.T) {
+			actual, err := testCase.integration.GetCloudCost(testCase.start, testCase.end)
+			if err != nil {
+				t.Errorf("Other error during testing %s", err)
+			} else if actual.IsEmpty() != testCase.expected {
+				t.Errorf("Incorrect result, actual emptiness: %t, expected: %t", actual.IsEmpty(), testCase.expected)
+			}
+		})
+	}
+}

+ 11 - 80
pkg/cloud/gcp/bigqueryquerier.go

@@ -2,16 +2,15 @@ package gcp
 
 import (
 	"context"
-	"regexp"
-	"strings"
 
 	"cloud.google.com/go/bigquery"
+	"github.com/opencost/opencost/pkg/cloud"
 	cloudconfig "github.com/opencost/opencost/pkg/cloud/config"
-	"github.com/opencost/opencost/pkg/kubecost"
 )
 
 type BigQueryQuerier struct {
 	BigQueryConfiguration
+	ConnectionStatus cloud.ConnectionStatus
 }
 
 func (bqq *BigQueryQuerier) Equals(config cloudconfig.Config) bool {
@@ -23,88 +22,20 @@ func (bqq *BigQueryQuerier) Equals(config cloudconfig.Config) bool {
 	return bqq.BigQueryConfiguration.Equals(&thatConfig.BigQueryConfiguration)
 }
 
-func (bqq *BigQueryQuerier) QueryBigQuery(ctx context.Context, queryStr string) (*bigquery.RowIterator, error) {
+func (bqq *BigQueryQuerier) Query(ctx context.Context, queryStr string) (*bigquery.RowIterator, error) {
+	err := bqq.Validate()
+
+	if err != nil {
+		bqq.ConnectionStatus = cloud.InvalidConfiguration
+		return nil, err
+	}
+
 	client, err := bqq.GetBigQueryClient(ctx)
 	if err != nil {
+		bqq.ConnectionStatus = cloud.FailedConnection
 		return nil, err
 	}
 
 	query := client.Query(queryStr)
 	return query.Read(ctx)
 }
-
-func GCPSelectCategory(service, description string) string {
-	s := strings.ToLower(service)
-	d := strings.ToLower(description)
-
-	// Network descriptions
-	if strings.Contains(d, "download") {
-		return kubecost.NetworkCategory
-	}
-	if strings.Contains(d, "network") {
-		return kubecost.NetworkCategory
-	}
-	if strings.Contains(d, "ingress") {
-		return kubecost.NetworkCategory
-	}
-	if strings.Contains(d, "egress") {
-		return kubecost.NetworkCategory
-	}
-	if strings.Contains(d, "static ip") {
-		return kubecost.NetworkCategory
-	}
-	if strings.Contains(d, "external ip") {
-		return kubecost.NetworkCategory
-	}
-	if strings.Contains(d, "load balanced") {
-		return kubecost.NetworkCategory
-	}
-	if strings.Contains(d, "licensing fee") {
-		return kubecost.OtherCategory
-	}
-
-	// Storage Descriptions
-	if strings.Contains(d, "storage") {
-		return kubecost.StorageCategory
-	}
-	if strings.Contains(d, "pd capacity") {
-		return kubecost.StorageCategory
-	}
-	if strings.Contains(d, "pd iops") {
-		return kubecost.StorageCategory
-	}
-	if strings.Contains(d, "pd snapshot") {
-		return kubecost.StorageCategory
-	}
-
-	// Service Defaults
-	if strings.Contains(s, "storage") {
-		return kubecost.StorageCategory
-	}
-	if strings.Contains(s, "compute") {
-		return kubecost.ComputeCategory
-	}
-	if strings.Contains(s, "sql") {
-		return kubecost.StorageCategory
-	}
-	if strings.Contains(s, "bigquery") {
-		return kubecost.StorageCategory
-	}
-	if strings.Contains(s, "kubernetes") {
-		return kubecost.ManagementCategory
-	} else if strings.Contains(s, "pub/sub") {
-		return kubecost.NetworkCategory
-	}
-
-	return kubecost.OtherCategory
-}
-
-var parseProviderIDRx = regexp.MustCompile("^.+\\/(.+)?") // Capture "gke-cluster-3-default-pool-xxxx-yy" from "projects/###/instances/gke-cluster-3-default-pool-xxxx-yy"
-
-func GCPParseProviderID(id string) string {
-	match := parseProviderIDRx.FindStringSubmatch(id)
-	if len(match) == 0 {
-		return id
-	}
-	return match[len(match)-1]
-}