|
@@ -5,6 +5,7 @@ import (
|
|
|
"compress/gzip"
|
|
"compress/gzip"
|
|
|
"context"
|
|
"context"
|
|
|
"encoding/csv"
|
|
"encoding/csv"
|
|
|
|
|
+ "errors"
|
|
|
"fmt"
|
|
"fmt"
|
|
|
"io"
|
|
"io"
|
|
|
"net/http"
|
|
"net/http"
|
|
@@ -19,7 +20,7 @@ import (
|
|
|
|
|
|
|
|
"github.com/opencost/opencost/pkg/clustercache"
|
|
"github.com/opencost/opencost/pkg/clustercache"
|
|
|
"github.com/opencost/opencost/pkg/env"
|
|
"github.com/opencost/opencost/pkg/env"
|
|
|
- "github.com/opencost/opencost/pkg/errors"
|
|
|
|
|
|
|
+ errs "github.com/opencost/opencost/pkg/errors"
|
|
|
"github.com/opencost/opencost/pkg/log"
|
|
"github.com/opencost/opencost/pkg/log"
|
|
|
"github.com/opencost/opencost/pkg/util"
|
|
"github.com/opencost/opencost/pkg/util"
|
|
|
"github.com/opencost/opencost/pkg/util/fileutil"
|
|
"github.com/opencost/opencost/pkg/util/fileutil"
|
|
@@ -766,6 +767,10 @@ func (aws *AWS) getRegionPricing(nodeList []*v1.Node) (*http.Response, string, e
|
|
|
|
|
|
|
|
pricingURL += "index.json"
|
|
pricingURL += "index.json"
|
|
|
|
|
|
|
|
|
|
+ if env.GetAWSPricingURL() != "" { // Allow override of pricing URL
|
|
|
|
|
+ pricingURL = env.GetAWSPricingURL()
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
log.Infof("starting download of \"%s\", which is quite large ...", pricingURL)
|
|
log.Infof("starting download of \"%s\", which is quite large ...", pricingURL)
|
|
|
resp, err := http.Get(pricingURL)
|
|
resp, err := http.Get(pricingURL)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
@@ -855,7 +860,7 @@ func (aws *AWS) DownloadPricingData() error {
|
|
|
log.Errorf("Failed to lookup reserved instance data: %s", err.Error())
|
|
log.Errorf("Failed to lookup reserved instance data: %s", err.Error())
|
|
|
} else { // If we make one successful run, check on new reservation data every hour
|
|
} else { // If we make one successful run, check on new reservation data every hour
|
|
|
go func() {
|
|
go func() {
|
|
|
- defer errors.HandlePanic()
|
|
|
|
|
|
|
+ defer errs.HandlePanic()
|
|
|
aws.RIDataRunning = true
|
|
aws.RIDataRunning = true
|
|
|
|
|
|
|
|
for {
|
|
for {
|
|
@@ -875,7 +880,7 @@ func (aws *AWS) DownloadPricingData() error {
|
|
|
log.Errorf("Failed to lookup savings plan data: %s", err.Error())
|
|
log.Errorf("Failed to lookup savings plan data: %s", err.Error())
|
|
|
} else {
|
|
} else {
|
|
|
go func() {
|
|
go func() {
|
|
|
- defer errors.HandlePanic()
|
|
|
|
|
|
|
+ defer errs.HandlePanic()
|
|
|
aws.SavingsPlanDataRunning = true
|
|
aws.SavingsPlanDataRunning = true
|
|
|
for {
|
|
for {
|
|
|
log.Infof("Savings Plan watcher running... next update in 1h")
|
|
log.Infof("Savings Plan watcher running... next update in 1h")
|
|
@@ -1052,7 +1057,7 @@ func (aws *AWS) DownloadPricingData() error {
|
|
|
aws.SpotRefreshRunning = true
|
|
aws.SpotRefreshRunning = true
|
|
|
|
|
|
|
|
go func() {
|
|
go func() {
|
|
|
- defer errors.HandlePanic()
|
|
|
|
|
|
|
+ defer errs.HandlePanic()
|
|
|
|
|
|
|
|
for {
|
|
for {
|
|
|
log.Infof("Spot Pricing Refresh scheduled in %.2f minutes.", SpotRefreshDuration.Minutes())
|
|
log.Infof("Spot Pricing Refresh scheduled in %.2f minutes.", SpotRefreshDuration.Minutes())
|
|
@@ -1463,7 +1468,7 @@ func (aws *AWS) GetAddresses() ([]byte, error) {
|
|
|
// respective channels
|
|
// respective channels
|
|
|
go func(region string) {
|
|
go func(region string) {
|
|
|
defer wg.Done()
|
|
defer wg.Done()
|
|
|
- defer errors.HandlePanic()
|
|
|
|
|
|
|
+ defer errs.HandlePanic()
|
|
|
|
|
|
|
|
// Query for first page of volume results
|
|
// Query for first page of volume results
|
|
|
resp, err := aws.getAddressesForRegion(context.TODO(), region)
|
|
resp, err := aws.getAddressesForRegion(context.TODO(), region)
|
|
@@ -1477,7 +1482,7 @@ func (aws *AWS) GetAddresses() ([]byte, error) {
|
|
|
|
|
|
|
|
// Close the result channels after everything has been sent
|
|
// Close the result channels after everything has been sent
|
|
|
go func() {
|
|
go func() {
|
|
|
- defer errors.HandlePanic()
|
|
|
|
|
|
|
+ defer errs.HandlePanic()
|
|
|
|
|
|
|
|
wg.Wait()
|
|
wg.Wait()
|
|
|
close(errorCh)
|
|
close(errorCh)
|
|
@@ -1546,7 +1551,7 @@ func (aws *AWS) GetDisks() ([]byte, error) {
|
|
|
// respective channels
|
|
// respective channels
|
|
|
go func(region string) {
|
|
go func(region string) {
|
|
|
defer wg.Done()
|
|
defer wg.Done()
|
|
|
- defer errors.HandlePanic()
|
|
|
|
|
|
|
+ defer errs.HandlePanic()
|
|
|
|
|
|
|
|
// Query for first page of volume results
|
|
// Query for first page of volume results
|
|
|
resp, err := aws.getDisksForRegion(context.TODO(), region, 1000, nil)
|
|
resp, err := aws.getDisksForRegion(context.TODO(), region, 1000, nil)
|
|
@@ -1571,7 +1576,7 @@ func (aws *AWS) GetDisks() ([]byte, error) {
|
|
|
|
|
|
|
|
// Close the result channels after everything has been sent
|
|
// Close the result channels after everything has been sent
|
|
|
go func() {
|
|
go func() {
|
|
|
- defer errors.HandlePanic()
|
|
|
|
|
|
|
+ defer errs.HandlePanic()
|
|
|
|
|
|
|
|
wg.Wait()
|
|
wg.Wait()
|
|
|
close(errorCh)
|
|
close(errorCh)
|
|
@@ -1605,6 +1610,10 @@ func (aws *AWS) GetDisks() ([]byte, error) {
|
|
|
})
|
|
})
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+func (*AWS) GetOrphanedResources() ([]OrphanedResource, error) {
|
|
|
|
|
+ return nil, errors.New("not implemented")
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
// QueryAthenaPaginated executes athena query and processes results.
|
|
// QueryAthenaPaginated executes athena query and processes results.
|
|
|
func (aws *AWS) QueryAthenaPaginated(ctx context.Context, query string, fn func(*athena.GetQueryResultsOutput) bool) error {
|
|
func (aws *AWS) QueryAthenaPaginated(ctx context.Context, query string, fn func(*athena.GetQueryResultsOutput) bool) error {
|
|
|
awsAthenaInfo, err := aws.GetAWSAthenaInfo()
|
|
awsAthenaInfo, err := aws.GetAWSAthenaInfo()
|