bigqueryquerier.go 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. package gcp
  2. import (
  3. "context"
  4. "fmt"
  5. "cloud.google.com/go/bigquery"
  6. "github.com/opencost/opencost/pkg/cloud"
  7. cloudconfig "github.com/opencost/opencost/pkg/cloud/config"
  8. )
  9. type BigQueryQuerier struct {
  10. BigQueryConfiguration
  11. ConnectionStatus cloud.ConnectionStatus
  12. }
  13. func (bqq *BigQueryQuerier) GetStatus() cloud.ConnectionStatus {
  14. // initialize status if it has not done so; this can happen if the integration is inactive
  15. if bqq.ConnectionStatus.String() == "" {
  16. bqq.ConnectionStatus = cloud.InitialStatus
  17. }
  18. return bqq.ConnectionStatus
  19. }
  20. func (bqq *BigQueryQuerier) Equals(config cloudconfig.Config) bool {
  21. thatConfig, ok := config.(*BigQueryQuerier)
  22. if !ok {
  23. return false
  24. }
  25. return bqq.BigQueryConfiguration.Equals(&thatConfig.BigQueryConfiguration)
  26. }
  27. func (bqq *BigQueryQuerier) Query(ctx context.Context, queryStr string) (*bigquery.RowIterator, error) {
  28. err := bqq.Validate()
  29. if err != nil {
  30. bqq.ConnectionStatus = cloud.InvalidConfiguration
  31. return nil, err
  32. }
  33. client, err := bqq.GetBigQueryClient(ctx)
  34. if err != nil {
  35. bqq.ConnectionStatus = cloud.FailedConnection
  36. return nil, err
  37. }
  38. query := client.Query(queryStr)
  39. iter, err := query.Read(ctx)
  40. // If result is empty and connection status is not already successful update status to missing data
  41. if iter == nil && bqq.ConnectionStatus != cloud.SuccessfulConnection {
  42. bqq.ConnectionStatus = cloud.MissingData
  43. } else {
  44. bqq.ConnectionStatus = cloud.SuccessfulConnection
  45. }
  46. if err != nil {
  47. return iter, fmt.Errorf("BigQueryQuerier: Query: error reading query results: %w", err)
  48. }
  49. return iter, nil
  50. }