Parcourir la source

Merge branch 'develop' into sean/aws-lb-alloc

Sean Holcomb il y a 5 ans
Parent
commit
2df5f54b6f

BIN
allocation-drilldown.gif


+ 1 - 0
go.mod

@@ -18,6 +18,7 @@ require (
 	github.com/google/uuid v1.1.2
 	github.com/googleapis/gax-go v2.0.2+incompatible // indirect
 	github.com/gophercloud/gophercloud v0.2.0 // indirect
+	github.com/json-iterator/go v1.1.10
 	github.com/jszwec/csvutil v1.2.1
 	github.com/julienschmidt/httprouter v1.2.0
 	github.com/lib/pq v1.2.0

+ 126 - 57
pkg/cloud/awsprovider.go

@@ -4,7 +4,6 @@ import (
 	"bytes"
 	"compress/gzip"
 	"encoding/csv"
-	"encoding/json"
 	"fmt"
 	"io"
 	"io/ioutil"
@@ -23,6 +22,7 @@ import (
 	"github.com/kubecost/cost-model/pkg/errors"
 	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/util"
+	"github.com/kubecost/cost-model/pkg/util/json"
 
 	"github.com/aws/aws-sdk-go/aws"
 	"github.com/aws/aws-sdk-go/aws/awserr"
@@ -1789,14 +1789,20 @@ func (a *AWS) GetReservationDataFromAthena() error {
 	if cfg.AthenaBucketName == "" {
 		return fmt.Errorf("No Athena Bucket configured")
 	}
-	if a.RIPricingByInstanceID == nil {
-		a.RIPricingByInstanceID = make(map[string]*RIData)
-	}
-	tNow := time.Now()
-	tOneDayAgo := tNow.Add(time.Duration(-25) * time.Hour) // Also get files from one day ago to avoid boundary conditions
-	start := tOneDayAgo.Format("2006-01-02")
-	end := tNow.Format("2006-01-02")
-	q := `SELECT   
+
+	// Query for all column names in advance in order to validate configured
+	// label columns
+	columns, _ := a.ShowAthenaColumns()
+
+	if columns["reservation_reservation_a_r_n"] && columns["reservation_effective_cost"] {
+		if a.RIPricingByInstanceID == nil {
+			a.RIPricingByInstanceID = make(map[string]*RIData)
+		}
+		tNow := time.Now()
+		tOneDayAgo := tNow.Add(time.Duration(-25) * time.Hour) // Also get files from one day ago to avoid boundary conditions
+		start := tOneDayAgo.Format("2006-01-02")
+		end := tNow.Format("2006-01-02")
+		q := `SELECT   
 		line_item_usage_start_date,
 		reservation_reservation_a_r_n,
 		line_item_resource_id,
@@ -1805,47 +1811,95 @@ func (a *AWS) GetReservationDataFromAthena() error {
 	WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
 	AND reservation_reservation_a_r_n <> '' ORDER BY 
 	line_item_usage_start_date DESC`
-	query := fmt.Sprintf(q, cfg.AthenaTable, start, end)
-	op, err := a.QueryAthenaBillingData(query)
-	if err != nil {
-		a.RIPricingStatus = err.Error()
-		return fmt.Errorf("Error fetching Reserved Instance Data: %s", err)
-	}
-	a.RIPricingStatus = ""
-	klog.Infof("Fetching RI data...")
-	if len(op.ResultSet.Rows) > 1 {
-		a.RIDataLock.Lock()
-		mostRecentDate := ""
-		for _, r := range op.ResultSet.Rows[1:(len(op.ResultSet.Rows) - 1)] {
-			d := *r.Data[0].VarCharValue
-			if mostRecentDate == "" {
-				mostRecentDate = d
-			} else if mostRecentDate != d { // Get all most recent assignments
-				break
-			}
-			cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
-			if err != nil {
-				klog.Infof("Error converting `%s` from float ", *r.Data[3].VarCharValue)
+		query := fmt.Sprintf(q, cfg.AthenaTable, start, end)
+		op, err := a.QueryAthenaBillingData(query)
+		if err != nil {
+			a.RIPricingStatus = err.Error()
+			return fmt.Errorf("Error fetching Reserved Instance Data: %s", err)
+		}
+		a.RIPricingStatus = ""
+		klog.Infof("Fetching RI data...")
+		if len(op.ResultSet.Rows) > 1 {
+			a.RIDataLock.Lock()
+			mostRecentDate := ""
+			for _, r := range op.ResultSet.Rows[1:(len(op.ResultSet.Rows) - 1)] {
+				d := *r.Data[0].VarCharValue
+				if mostRecentDate == "" {
+					mostRecentDate = d
+				} else if mostRecentDate != d { // Get all most recent assignments
+					break
+				}
+				cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
+				if err != nil {
+					klog.Infof("Error converting `%s` from float ", *r.Data[3].VarCharValue)
+				}
+				r := &RIData{
+					ResourceID:     *r.Data[2].VarCharValue,
+					EffectiveCost:  cost,
+					ReservationARN: *r.Data[1].VarCharValue,
+					MostRecentDate: d,
+				}
+				a.RIPricingByInstanceID[r.ResourceID] = r
 			}
-			r := &RIData{
-				ResourceID:     *r.Data[2].VarCharValue,
-				EffectiveCost:  cost,
-				ReservationARN: *r.Data[1].VarCharValue,
-				MostRecentDate: d,
+			klog.V(1).Infof("Found %d reserved instances", len(a.RIPricingByInstanceID))
+			for k, r := range a.RIPricingByInstanceID {
+				log.DedupedInfof(5, "Reserved Instance Data found for node %s : %f at time %s", k, r.EffectiveCost, r.MostRecentDate)
 			}
-			a.RIPricingByInstanceID[r.ResourceID] = r
-		}
-		klog.V(1).Infof("Found %d reserved instances", len(a.RIPricingByInstanceID))
-		for k, r := range a.RIPricingByInstanceID {
-			log.DedupedInfof(5, "Reserved Instance Data found for node %s : %f at time %s", k, r.EffectiveCost, r.MostRecentDate)
+			a.RIDataLock.Unlock()
+		} else {
+			klog.Infof("No reserved instance data found")
 		}
-		a.RIDataLock.Unlock()
 	} else {
-		klog.Infof("No reserved instance data found")
+		klog.Infof("No reserved data available in Athena")
+		a.RIPricingStatus = ""
 	}
 	return nil
 }
 
+// ShowAthenaColumns returns a list of the names of all columns in the configured
+// Athena tables
+func (aws *AWS) ShowAthenaColumns() (map[string]bool, error) {
+	columnSet := map[string]bool{}
+	// Configure Athena query
+	cfg, err := aws.GetConfig()
+	if err != nil {
+		return nil, err
+	}
+	if cfg.AthenaTable == "" {
+		return nil, fmt.Errorf("AthenaTable not configured")
+	}
+	if cfg.AthenaBucketName == "" {
+		return nil, fmt.Errorf("AthenaBucketName not configured")
+	}
+
+	q := `SHOW COLUMNS IN  %s`
+	query := fmt.Sprintf(q, cfg.AthenaTable)
+	results, svc, err := aws.QueryAthenaPaginated(query)
+
+	columns := []string{}
+	pageNum := 0
+	athenaErr := svc.GetQueryResultsPages(results, func(page *athena.GetQueryResultsOutput, lastpage bool) bool {
+		for _, row := range page.ResultSet.Rows {
+			columns = append(columns, *row.Data[0].VarCharValue)
+		}
+
+		pageNum++
+
+		return true
+	})
+	if athenaErr != nil {
+		log.Warningf("Error getting Athena columns: %s", err)
+		return columnSet, athenaErr
+	}
+
+	for _, col := range columns {
+		columnSet[col] = true
+	}
+
+	return columnSet, nil
+}
+
+
 // ExternalAllocations represents tagged assets outside the scope of kubernetes.
 // "start" and "end" are dates of the format YYYY-MM-DD
 // "aggregator" is the tag used to determine how to allocate those assets, ie namespace, pod, etc.
@@ -1923,26 +1977,41 @@ func (a *AWS) ExternalAllocations(start string, end string, aggregators []string
 		}
 		return true
 	}
+	// Query for all column names in advance in order to validate configured
+	// label columns
+	columns, _ := a.ShowAthenaColumns()
 
-	klog.V(3).Infof("Running Query: %s", query)
-	ip, svc, err := a.QueryAthenaPaginated(query)
-
-	athenaErr := svc.GetQueryResultsPages(ip, processResults)
-	if athenaErr != nil {
-		klog.Infof("RETURNING ATHENA ERROR")
-		return nil, athenaErr
+	// Check for all aggregators being formatted into the query
+	containsColumns := true
+	for _, agg := range formattedAggregators {
+		if columns[agg] != true {
+			containsColumns = false
+			klog.Warningf("Athena missing column: %s", agg)
+		}
 	}
+	if containsColumns {
+		klog.V(3).Infof("Running Query: %s", query)
+		ip, svc, _ := a.QueryAthenaPaginated(query)
 
-	if customPricing.BillingDataDataset != "" && !crossCluster { // There is GCP data, meaning someone has tried to configure a GCP out-of-cluster allocation.
-		gcp, err := NewCrossClusterProvider("gcp", "aws.json", a.Clientset)
-		if err != nil {
-			klog.Infof("Could not instantiate cross-cluster provider %s", err.Error())
+		athenaErr := svc.GetQueryResultsPages(ip, processResults)
+		if athenaErr != nil {
+			klog.Infof("RETURNING ATHENA ERROR")
+			return nil, athenaErr
 		}
-		gcpOOC, err := gcp.ExternalAllocations(start, end, aggregators, filterType, filterValue, true)
-		if err != nil {
-			klog.Infof("Could not fetch cross-cluster costs %s", err.Error())
+
+		if customPricing.BillingDataDataset != "" && !crossCluster { // There is GCP data, meaning someone has tried to configure a GCP out-of-cluster allocation.
+			gcp, err := NewCrossClusterProvider("gcp", "aws.json", a.Clientset)
+			if err != nil {
+				klog.Infof("Could not instantiate cross-cluster provider %s", err.Error())
+			}
+			gcpOOC, err := gcp.ExternalAllocations(start, end, aggregators, filterType, filterValue, true)
+			if err != nil {
+				klog.Infof("Could not fetch cross-cluster costs %s", err.Error())
+			}
+			oocAllocs = append(oocAllocs, gcpOOC...)
 		}
-		oocAllocs = append(oocAllocs, gcpOOC...)
+	} else {
+		klog.Infof("External Allocations: Athena Query skipped due to missing columns")
 	}
 	return oocAllocs, nil
 }

+ 1 - 1
pkg/cloud/azureprovider.go

@@ -3,7 +3,6 @@ package cloud
 import (
 	"context"
 	"encoding/csv"
-	"encoding/json"
 	"fmt"
 	"github.com/kubecost/cost-model/pkg/kubecost"
 	"io"
@@ -17,6 +16,7 @@ import (
 	"github.com/kubecost/cost-model/pkg/clustercache"
 	"github.com/kubecost/cost-model/pkg/env"
 	"github.com/kubecost/cost-model/pkg/util"
+	"github.com/kubecost/cost-model/pkg/util/json"
 
 	"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2017-09-01/skus"
 	"github.com/Azure/azure-sdk-for-go/services/containerservice/mgmt/2018-03-31/containerservice"

+ 1 - 1
pkg/cloud/customprovider.go

@@ -1,7 +1,6 @@
 package cloud
 
 import (
-	"encoding/json"
 	"io"
 	"strconv"
 	"strings"
@@ -9,6 +8,7 @@ import (
 
 	"github.com/kubecost/cost-model/pkg/clustercache"
 	"github.com/kubecost/cost-model/pkg/env"
+	"github.com/kubecost/cost-model/pkg/util/json"
 
 	v1 "k8s.io/api/core/v1"
 )

+ 1 - 1
pkg/cloud/gcpprovider.go

@@ -2,7 +2,6 @@ package cloud
 
 import (
 	"context"
-	"encoding/json"
 	"fmt"
 	"io"
 	"io/ioutil"
@@ -23,6 +22,7 @@ import (
 	"github.com/kubecost/cost-model/pkg/env"
 	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/util"
+	"github.com/kubecost/cost-model/pkg/util/json"
 
 	"golang.org/x/oauth2"
 	"golang.org/x/oauth2/google"

+ 1 - 1
pkg/cloud/providerconfig.go

@@ -1,7 +1,6 @@
 package cloud
 
 import (
-	"encoding/json"
 	"fmt"
 	"io/ioutil"
 	"reflect"
@@ -11,6 +10,7 @@ import (
 
 	"github.com/kubecost/cost-model/pkg/env"
 	"github.com/kubecost/cost-model/pkg/util"
+	"github.com/kubecost/cost-model/pkg/util/json"
 	"github.com/microcosm-cc/bluemonday"
 
 	"k8s.io/klog"

+ 1 - 1
pkg/clustermanager/clustermanager.go

@@ -2,7 +2,6 @@ package clustermanager
 
 import (
 	"encoding/base64"
-	"encoding/json"
 	"fmt"
 	"io/ioutil"
 	"strings"
@@ -10,6 +9,7 @@ import (
 	"github.com/google/uuid"
 
 	"github.com/kubecost/cost-model/pkg/util"
+	"github.com/kubecost/cost-model/pkg/util/json"
 
 	"k8s.io/klog"
 	"sigs.k8s.io/yaml"

+ 1 - 1
pkg/clustermanager/clustersendpoints.go

@@ -1,7 +1,6 @@
 package clustermanager
 
 import (
-	"encoding/json"
 	"errors"
 	"io/ioutil"
 	"net/http"
@@ -9,6 +8,7 @@ import (
 	"github.com/julienschmidt/httprouter"
 
 	"k8s.io/klog"
+	"github.com/kubecost/cost-model/pkg/util/json"
 )
 
 // DataEnvelope is a generic wrapper struct for http response data

+ 1 - 1
pkg/costmodel/aggregation.go

@@ -1,7 +1,6 @@
 package costmodel
 
 import (
-	"encoding/json"
 	"fmt"
 	"math"
 	"net/http"
@@ -20,6 +19,7 @@ import (
 	"github.com/kubecost/cost-model/pkg/prom"
 	"github.com/kubecost/cost-model/pkg/thanos"
 	"github.com/kubecost/cost-model/pkg/util"
+	"github.com/kubecost/cost-model/pkg/util/json"
 	"github.com/patrickmn/go-cache"
 	prometheusClient "github.com/prometheus/client_golang/api"
 	"k8s.io/klog"

+ 1 - 1
pkg/costmodel/router.go

@@ -2,7 +2,6 @@ package costmodel
 
 import (
 	"context"
-	"encoding/json"
 	"flag"
 	"fmt"
 	"net/http"
@@ -28,6 +27,7 @@ import (
 	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/prom"
 	"github.com/kubecost/cost-model/pkg/thanos"
+	"github.com/kubecost/cost-model/pkg/util/json"
 	prometheus "github.com/prometheus/client_golang/api"
 	prometheusClient "github.com/prometheus/client_golang/api"
 	prometheusAPI "github.com/prometheus/client_golang/api/prometheus/v1"

+ 1 - 1
pkg/costmodel/sql.go

@@ -2,7 +2,6 @@ package costmodel
 
 import (
 	"database/sql"
-	"encoding/json"
 	"fmt"
 	"time"
 
@@ -11,6 +10,7 @@ import (
 	costAnalyzerCloud "github.com/kubecost/cost-model/pkg/cloud"
 	"github.com/kubecost/cost-model/pkg/env"
 	"github.com/kubecost/cost-model/pkg/util"
+	"github.com/kubecost/cost-model/pkg/util/json"
 
 	_ "github.com/lib/pq"
 )

+ 2 - 2
pkg/kubecost/allocation.go

@@ -2,7 +2,6 @@ package kubecost
 
 import (
 	"bytes"
-	"encoding/json"
 	"fmt"
 	"sort"
 	"strings"
@@ -11,6 +10,7 @@ import (
 
 	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/util"
+	"github.com/kubecost/cost-model/pkg/util/json"
 )
 
 // TODO Clean-up use of IsEmpty; nil checks should be separated for safety.
@@ -257,7 +257,7 @@ func (a *Allocation) PVBytes() float64 {
 	return a.PVByteHours / (a.Minutes() / 60.0)
 }
 
-// MarshalJSON implements json.Marshal interface
+// MarshalJSON implements json.Marshaler interface
 func (a *Allocation) MarshalJSON() ([]byte, error) {
 	buffer := bytes.NewBufferString("{")
 	jsonEncodeString(buffer, "name", a.Name, ",")

+ 1 - 1
pkg/kubecost/asset.go

@@ -3,13 +3,13 @@ package kubecost
 import (
 	"bytes"
 	"encoding"
-	"encoding/json"
 	"fmt"
 	"strings"
 	"sync"
 	"time"
 
 	"github.com/kubecost/cost-model/pkg/log"
+	"github.com/kubecost/cost-model/pkg/util/json"
 )
 
 const timeFmt = "2006-01-02T15:04:05-0700"

+ 2 - 1
pkg/kubecost/json.go

@@ -2,9 +2,10 @@ package kubecost
 
 import (
 	"bytes"
-	"encoding/json"
 	"fmt"
 	"math"
+
+	"github.com/kubecost/cost-model/pkg/util/json"
 )
 
 // TODO move everything below to a separate package

+ 2 - 1
pkg/prom/metrics.go

@@ -1,12 +1,13 @@
 package prom
 
 import (
-	"encoding/json"
 	"fmt"
 	"reflect"
 	"regexp"
 	"sort"
 	"strings"
+
+	"github.com/kubecost/cost-model/pkg/util/json"
 )
 
 var invalidLabelCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)

+ 1 - 1
pkg/prom/query.go

@@ -2,7 +2,6 @@ package prom
 
 import (
 	"context"
-	"encoding/json"
 	"fmt"
 	"net/http"
 	"net/url"
@@ -12,6 +11,7 @@ import (
 	"github.com/kubecost/cost-model/pkg/errors"
 	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/util"
+	"github.com/kubecost/cost-model/pkg/util/json"
 	prometheus "github.com/prometheus/client_golang/api"
 )
 

+ 12 - 0
pkg/util/json/json.go

@@ -0,0 +1,12 @@
+package json
+
+import (
+    "encoding/json"
+
+    jsoniter "github.com/json-iterator/go"
+)
+
+var Marshal = jsoniter.ConfigCompatibleWithStandardLibrary.Marshal
+var Unmarshal = jsoniter.ConfigCompatibleWithStandardLibrary.Unmarshal
+type Marshaler json.Marshaler
+var NewDecoder = json.NewDecoder