Преглед на файлове

Cloud Integration Status Refresh (#3482)

Signed-off-by: Nik Willwerth <nwillwerth@kubecost.com>
nik-kc преди 5 месеца
родител
ревизия
534b98c101

+ 24 - 2
pkg/cloud/aws/athenaintegration.go

@@ -10,6 +10,7 @@ import (
 	"github.com/aws/aws-sdk-go-v2/service/athena/types"
 	"github.com/aws/aws-sdk-go-v2/service/athena/types"
 	"github.com/opencost/opencost/core/pkg/log"
 	"github.com/opencost/opencost/core/pkg/log"
 	"github.com/opencost/opencost/core/pkg/opencost"
 	"github.com/opencost/opencost/core/pkg/opencost"
+	"github.com/opencost/opencost/core/pkg/util/timeutil"
 	"github.com/opencost/opencost/pkg/cloud"
 	"github.com/opencost/opencost/pkg/cloud"
 )
 )
 
 
@@ -68,6 +69,24 @@ type AthenaIntegration struct {
 
 
 // Query Athena for CUR data and build a new CloudCostSetRange containing the info
 // Query Athena for CUR data and build a new CloudCostSetRange containing the info
 func (ai *AthenaIntegration) GetCloudCost(start, end time.Time) (*opencost.CloudCostSetRange, error) {
 func (ai *AthenaIntegration) GetCloudCost(start, end time.Time) (*opencost.CloudCostSetRange, error) {
+	return ai.getCloudCost(start, end, 0)
+}
+
+func (ai *AthenaIntegration) RefreshStatus() cloud.ConnectionStatus {
+	end := time.Now().UTC().Truncate(timeutil.Day)
+	start := end.Add(-7 * timeutil.Day)
+
+	// getCloudCost already sets ConnectionStatus in the event there is no error, so we don't need to handle the positive
+	// case here
+	_, err := ai.getCloudCost(start, end, 1)
+	if err != nil {
+		ai.ConnectionStatus = cloud.FailedConnection
+	}
+
+	return ai.ConnectionStatus
+}
+
+func (ai *AthenaIntegration) getCloudCost(start, end time.Time, limit int) (*opencost.CloudCostSetRange, error) {
 	log.Infof("AthenaIntegration[%s]: GetCloudCost: %s", ai.Key(), opencost.NewWindow(&start, &end).String())
 	log.Infof("AthenaIntegration[%s]: GetCloudCost: %s", ai.Key(), opencost.NewWindow(&start, &end).String())
 	// Query for all column names
 	// Query for all column names
 	allColumns, err := ai.GetColumns()
 	allColumns, err := ai.GetColumns()
@@ -161,6 +180,9 @@ func (ai *AthenaIntegration) GetCloudCost(start, end time.Time) (*opencost.Cloud
 		WHERE %s
 		WHERE %s
 		GROUP BY %s
 		GROUP BY %s
 	`
 	`
+	if limit > 0 {
+		queryStr = fmt.Sprintf("%s LIMIT %d", queryStr, limit)
+	}
 	aqi.Query = fmt.Sprintf(queryStr, columnStr, ai.Table, whereClause, groupByStr)
 	aqi.Query = fmt.Sprintf(queryStr, columnStr, ai.Table, whereClause, groupByStr)
 
 
 	ccsr, err := opencost.NewCloudCostSetRange(start, end, opencost.AccumulateOptionDay, ai.Key())
 	ccsr, err := opencost.NewCloudCostSetRange(start, end, opencost.AccumulateOptionDay, ai.Key())
@@ -328,7 +350,7 @@ func (ai *AthenaIntegration) GetPartitionWhere(start, end time.Time) string {
 	month := time.Date(start.Year(), start.Month(), 1, 0, 0, 0, 0, time.UTC)
 	month := time.Date(start.Year(), start.Month(), 1, 0, 0, 0, 0, time.UTC)
 	endMonth := time.Date(end.Year(), end.Month(), 1, 0, 0, 0, 0, time.UTC)
 	endMonth := time.Date(end.Year(), end.Month(), 1, 0, 0, 0, 0, time.UTC)
 	var disjuncts []string
 	var disjuncts []string
-	
+
 	// For CUR 2.0, check if billing_period partitions actually exist
 	// For CUR 2.0, check if billing_period partitions actually exist
 	useBillingPeriodPartitions := false
 	useBillingPeriodPartitions := false
 	if ai.CURVersion != "1.0" {
 	if ai.CURVersion != "1.0" {
@@ -337,7 +359,7 @@ func (ai *AthenaIntegration) GetPartitionWhere(start, end time.Time) string {
 			useBillingPeriodPartitions = true
 			useBillingPeriodPartitions = true
 		}
 		}
 	}
 	}
-	
+
 	for !month.After(endMonth) {
 	for !month.After(endMonth) {
 		if ai.CURVersion == "1.0" {
 		if ai.CURVersion == "1.0" {
 			// CUR 1.0 uses year and month columns for partitioning
 			// CUR 1.0 uses year and month columns for partitioning

+ 1 - 1
pkg/cloud/aws/athenaquerier.go

@@ -69,7 +69,7 @@ func (aq *AthenaQuerier) HasBillingPeriodPartitions() (bool, error) {
 	// Use SHOW PARTITIONS to check if billing_period partitions exist
 	// Use SHOW PARTITIONS to check if billing_period partitions exist
 	query := fmt.Sprintf("SHOW PARTITIONS \"%s\"", aq.Table)
 	query := fmt.Sprintf("SHOW PARTITIONS \"%s\"", aq.Table)
 	hasBillingPeriodPartition := false
 	hasBillingPeriodPartition := false
-	
+
 	athenaErr := aq.Query(context.TODO(), query, GetAthenaQueryFunc(func(row types.Row) {
 	athenaErr := aq.Query(context.TODO(), query, GetAthenaQueryFunc(func(row types.Row) {
 		if len(row.Data) > 0 && row.Data[0].VarCharValue != nil {
 		if len(row.Data) > 0 && row.Data[0].VarCharValue != nil {
 			partitionValue := *row.Data[0].VarCharValue
 			partitionValue := *row.Data[0].VarCharValue

+ 18 - 0
pkg/cloud/aws/s3selectintegration.go

@@ -9,6 +9,7 @@ import (
 
 
 	"github.com/opencost/opencost/core/pkg/log"
 	"github.com/opencost/opencost/core/pkg/log"
 	"github.com/opencost/opencost/core/pkg/opencost"
 	"github.com/opencost/opencost/core/pkg/opencost"
+	"github.com/opencost/opencost/pkg/cloud"
 )
 )
 
 
 const S3SelectDateLayout = "2006-01-02T15:04:05Z"
 const S3SelectDateLayout = "2006-01-02T15:04:05Z"
@@ -373,3 +374,20 @@ func hasK8sLabel(labels opencost.CloudCostLabels) bool {
 	}
 	}
 	return false
 	return false
 }
 }
+
+func (s3si *S3SelectIntegration) RefreshStatus() cloud.ConnectionStatus {
+	client, err := s3si.GetS3Client()
+	if err != nil {
+		s3si.ConnectionStatus = cloud.FailedConnection
+	}
+	objs, err := s3si.ListObjects(client)
+	if err != nil {
+		s3si.ConnectionStatus = cloud.FailedConnection
+	} else if len(objs.Contents) == 0 {
+		s3si.ConnectionStatus = cloud.MissingData
+	} else {
+		s3si.ConnectionStatus = cloud.SuccessfulConnection
+	}
+
+	return s3si.ConnectionStatus
+}

+ 47 - 8
pkg/cloud/azure/storagebillingparser.go

@@ -14,6 +14,7 @@ import (
 	"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
 	"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
 	"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
 	"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
 	"github.com/opencost/opencost/core/pkg/log"
 	"github.com/opencost/opencost/core/pkg/log"
+	"github.com/opencost/opencost/core/pkg/util/timeutil"
 	"github.com/opencost/opencost/pkg/cloud"
 	"github.com/opencost/opencost/pkg/cloud"
 	"github.com/opencost/opencost/pkg/env"
 	"github.com/opencost/opencost/pkg/env"
 )
 )
@@ -40,16 +41,10 @@ func (asbp *AzureStorageBillingParser) ParseBillingData(start, end time.Time, re
 		return err
 		return err
 	}
 	}
 
 
-	serviceURL := fmt.Sprintf(asbp.StorageConnection.getBlobURLTemplate(), asbp.Account, "")
-	client, err := asbp.Authorizer.GetBlobClient(serviceURL)
-	if err != nil {
-		asbp.ConnectionStatus = cloud.FailedConnection
-		return err
-	}
-	ctx := context.Background()
 	// most recent blob list contains information on blob including name and lastMod time
 	// most recent blob list contains information on blob including name and lastMod time
 	// Example blobNames: [ export/myExport/20240101-20240131/myExport_758a42af-0731-4edb-b498-1e523bb40f12.csv ]
 	// Example blobNames: [ export/myExport/20240101-20240131/myExport_758a42af-0731-4edb-b498-1e523bb40f12.csv ]
-	blobInfos, err := asbp.getMostRecentBlobs(start, end, client, ctx)
+	ctx := context.Background()
+	blobInfos, err := asbp.getBlobInfos(ctx, start, end)
 	if err != nil {
 	if err != nil {
 		asbp.ConnectionStatus = cloud.FailedConnection
 		asbp.ConnectionStatus = cloud.FailedConnection
 		return err
 		return err
@@ -60,6 +55,12 @@ func (asbp *AzureStorageBillingParser) ParseBillingData(start, end time.Time, re
 		return nil
 		return nil
 	}
 	}
 
 
+	client, err := asbp.getClient()
+	if err != nil {
+		asbp.ConnectionStatus = cloud.FailedConnection
+		return err
+	}
+
 	if env.IsAzureDownloadBillingDataToDisk() {
 	if env.IsAzureDownloadBillingDataToDisk() {
 		// clean up old files that have been saved to disk before downloading new ones
 		// clean up old files that have been saved to disk before downloading new ones
 		localPath := env.GetAzureDownloadBillingDataPath()
 		localPath := env.GetAzureDownloadBillingDataPath()
@@ -112,6 +113,44 @@ func (asbp *AzureStorageBillingParser) ParseBillingData(start, end time.Time, re
 	return nil
 	return nil
 }
 }
 
 
+func (asbp *AzureStorageBillingParser) getClient() (*azblob.Client, error) {
+	serviceURL := fmt.Sprintf(asbp.StorageConnection.getBlobURLTemplate(), asbp.Account, "")
+	client, err := asbp.Authorizer.GetBlobClient(serviceURL)
+	if err != nil {
+		return nil, err
+	}
+	return client, nil
+}
+
+func (asbp *AzureStorageBillingParser) getBlobInfos(ctx context.Context, start, end time.Time) ([]container.BlobItem, error) {
+	client, err := asbp.getClient()
+	if err != nil {
+		return nil, err
+	}
+	blobInfos, err := asbp.getMostRecentBlobs(start, end, client, ctx)
+	if err != nil {
+		return nil, err
+	}
+	return blobInfos, nil
+}
+
+func (asbp *AzureStorageBillingParser) RefreshStatus() cloud.ConnectionStatus {
+	end := time.Now().UTC().Truncate(timeutil.Day)
+	start := end.Add(-7 * timeutil.Day)
+
+	ctx := context.Background()
+	blobInfos, err := asbp.getBlobInfos(ctx, start, end)
+	if err != nil {
+		asbp.ConnectionStatus = cloud.FailedConnection
+	} else if len(blobInfos) == 0 {
+		asbp.ConnectionStatus = cloud.MissingData
+	} else {
+		asbp.ConnectionStatus = cloud.SuccessfulConnection
+	}
+
+	return asbp.ConnectionStatus
+}
+
 func (asbp *AzureStorageBillingParser) parseCSV(start, end time.Time, reader *csv.Reader, resultFn AzureBillingResultFunc) error {
 func (asbp *AzureStorageBillingParser) parseCSV(start, end time.Time, reader *csv.Reader, resultFn AzureBillingResultFunc) error {
 	headers, err := reader.Read()
 	headers, err := reader.Read()
 	if err != nil {
 	if err != nil {

+ 23 - 0
pkg/cloud/gcp/bigqueryintegration.go

@@ -9,6 +9,8 @@ import (
 
 
 	"github.com/opencost/opencost/core/pkg/log"
 	"github.com/opencost/opencost/core/pkg/log"
 	"github.com/opencost/opencost/core/pkg/opencost"
 	"github.com/opencost/opencost/core/pkg/opencost"
+	"github.com/opencost/opencost/core/pkg/util/timeutil"
+	"github.com/opencost/opencost/pkg/cloud"
 	"google.golang.org/api/iterator"
 	"google.golang.org/api/iterator"
 )
 )
 
 
@@ -38,6 +40,24 @@ const BiqQueryWherePartitionFmt = `DATE(_PARTITIONTIME) >= "%s" AND DATE(_PARTIT
 const BiqQueryWhereDateFmt = `usage_start_time >= "%s" AND usage_start_time < "%s"`
 const BiqQueryWhereDateFmt = `usage_start_time >= "%s" AND usage_start_time < "%s"`
 
 
 func (bqi *BigQueryIntegration) GetCloudCost(start time.Time, end time.Time) (*opencost.CloudCostSetRange, error) {
 func (bqi *BigQueryIntegration) GetCloudCost(start time.Time, end time.Time) (*opencost.CloudCostSetRange, error) {
+	return bqi.getCloudCost(start, end, 0)
+}
+
+func (bqi *BigQueryIntegration) RefreshStatus() cloud.ConnectionStatus {
+	end := time.Now().UTC().Truncate(timeutil.Day)
+	start := end.Add(-7 * timeutil.Day)
+
+	// the call to Query within getCloudCost already sets ConnectionStatus in the event there is no error, so we don't
+	// need to handle the positive case here
+	_, err := bqi.getCloudCost(start, end, 1)
+	if err != nil {
+		bqi.ConnectionStatus = cloud.FailedConnection
+	}
+
+	return bqi.ConnectionStatus
+}
+
+func (bqi *BigQueryIntegration) getCloudCost(start time.Time, end time.Time, limit int) (*opencost.CloudCostSetRange, error) {
 	cudRates, err := bqi.GetFlexibleCUDRates(start, end)
 	cudRates, err := bqi.GetFlexibleCUDRates(start, end)
 	if err != nil {
 	if err != nil {
 		return nil, fmt.Errorf("error retrieving CUD rates: %w", err)
 		return nil, fmt.Errorf("error retrieving CUD rates: %w", err)
@@ -87,6 +107,9 @@ func (bqi *BigQueryIntegration) GetCloudCost(start time.Time, end time.Time) (*o
 		WHERE %s
 		WHERE %s
 		GROUP BY %s
 		GROUP BY %s
 	`
 	`
+	if limit > 0 {
+		queryStr = fmt.Sprintf("%s LIMIT %d", queryStr, limit)
+	}
 
 
 	querystr := fmt.Sprintf(queryStr, columnStr, table, whereClause, groupByStr)
 	querystr := fmt.Sprintf(queryStr, columnStr, table, whereClause, groupByStr)
 
 

+ 6 - 0
pkg/cloud/oracle/usageapiintegration.go

@@ -6,6 +6,7 @@ import (
 	"strconv"
 	"strconv"
 	"time"
 	"time"
 
 
+	"github.com/opencost/opencost/core/pkg/log"
 	"github.com/opencost/opencost/core/pkg/opencost"
 	"github.com/opencost/opencost/core/pkg/opencost"
 	"github.com/opencost/opencost/pkg/cloud"
 	"github.com/opencost/opencost/pkg/cloud"
 	"github.com/oracle/oci-go-sdk/v65/common"
 	"github.com/oracle/oci-go-sdk/v65/common"
@@ -158,6 +159,11 @@ func (uai *UsageApiIntegration) GetStatus() cloud.ConnectionStatus {
 	return uai.ConnectionStatus
 	return uai.ConnectionStatus
 }
 }
 
 
+func (uai *UsageApiIntegration) RefreshStatus() cloud.ConnectionStatus {
+	log.Warn("status refresh is not supported for the Oracle provider")
+	return uai.ConnectionStatus
+}
+
 func SelectOCICategory(service string) string {
 func SelectOCICategory(service string) string {
 	if service == "Compute" {
 	if service == "Compute" {
 		return opencost.ComputeCategory
 		return opencost.ComputeCategory

+ 2 - 0
pkg/cloudcost/ingestionmanager.go

@@ -212,6 +212,8 @@ func (im *IngestionManager) createIngestor(config cloud.KeyedConfig) error {
 		return fmt.Errorf("IngestionManager: createIngestor: %w", err)
 		return fmt.Errorf("IngestionManager: createIngestor: %w", err)
 	}
 	}
 
 
+	ing.RefreshStatus()
+
 	ing.Start(false)
 	ing.Start(false)
 
 
 	im.ingestors[config.Key()] = ing
 	im.ingestors[config.Key()] = ing

+ 4 - 0
pkg/cloudcost/ingestor.go

@@ -340,3 +340,7 @@ func (ing *ingestor) expandCoverage(window opencost.Window) {
 
 
 	ing.coverage = coverage
 	ing.coverage = coverage
 }
 }
+
+func (ing *ingestor) RefreshStatus() cloud.ConnectionStatus {
+	return ing.integration.RefreshStatus()
+}

+ 1 - 0
pkg/cloudcost/integration.go

@@ -16,6 +16,7 @@ import (
 type CloudCostIntegration interface {
 type CloudCostIntegration interface {
 	GetCloudCost(time.Time, time.Time) (*opencost.CloudCostSetRange, error)
 	GetCloudCost(time.Time, time.Time) (*opencost.CloudCostSetRange, error)
 	GetStatus() cloud.ConnectionStatus
 	GetStatus() cloud.ConnectionStatus
+	RefreshStatus() cloud.ConnectionStatus
 }
 }
 
 
 // GetIntegrationFromConfig coverts any valid KeyedConfig into the appropriate BillingIntegration if possible
 // GetIntegrationFromConfig coverts any valid KeyedConfig into the appropriate BillingIntegration if possible