Ver código fonte

Merge branch 'develop' into sean/aws-lb-alloc

Sean Holcomb 5 anos atrás
pai
commit
92a3a862cb

+ 2 - 0
README.md

@@ -2,6 +2,8 @@
 
 
 Kubecost models give teams visibility into current and historical Kubernetes spend and resource allocation. These models  provide cost transparency in Kubernetes environments that support multiple applications, teams, departments, etc.
 Kubecost models give teams visibility into current and historical Kubernetes spend and resource allocation. These models  provide cost transparency in Kubernetes environments that support multiple applications, teams, departments, etc.
 
 
+![Kubecost allocation UI](/allocation-drilldown.gif)
+
 To see more on the functionality of the full Kubecost product, please visit the [features page](https://kubecost.com/#features) on our website. 
 To see more on the functionality of the full Kubecost product, please visit the [features page](https://kubecost.com/#features) on our website. 
 Here is a summary of features enabled by this cost model:
 Here is a summary of features enabled by this cost model:
 
 

+ 4 - 0
pkg/cloud/csvprovider.go

@@ -69,6 +69,10 @@ func (c *CSVProvider) DownloadPricingData() error {
 	if strings.HasPrefix(c.CSVLocation, "s3://") {
 	if strings.HasPrefix(c.CSVLocation, "s3://") {
 		region := env.GetCSVRegion()
 		region := env.GetCSVRegion()
 		conf := aws.NewConfig().WithRegion(region).WithCredentialsChainVerboseErrors(true)
 		conf := aws.NewConfig().WithRegion(region).WithCredentialsChainVerboseErrors(true)
+		endpoint := env.GetCSVEndpoint()
+		if endpoint != "" {
+			conf = conf.WithEndpoint(endpoint)
+		}
 		s3Client := s3.New(session.New(conf))
 		s3Client := s3.New(session.New(conf))
 		bucketAndKey := strings.Split(strings.TrimPrefix(c.CSVLocation, "s3://"), "/")
 		bucketAndKey := strings.Split(strings.TrimPrefix(c.CSVLocation, "s3://"), "/")
 		if len(bucketAndKey) == 2 {
 		if len(bucketAndKey) == 2 {

+ 130 - 0
pkg/costmodel/aggregation.go

@@ -2114,6 +2114,136 @@ func (a *Accesses) AggregateCostModelHandler(w http.ResponseWriter, r *http.Requ
 	}
 	}
 }
 }
 
 
+// ParseAggregationProperties attempts to parse and return aggregation properties
+// encoded under the given key. If none exist, or if parsing fails, an error
+// is returned with empty Properties.
+func ParseAggregationProperties(qp util.QueryParams, key string) (kubecost.Properties, error) {
+	aggProps := kubecost.Properties{}
+
+	labelMap := make(map[string]string)
+	annotationMap := make(map[string]string)
+	for _, raw := range qp.GetList(key, ",") {
+		fields := strings.Split(raw, ":")
+
+		switch kubecost.ParseProperty(fields[0]) {
+		case kubecost.ClusterProp:
+			aggProps.SetCluster("")
+		case kubecost.NodeProp:
+			aggProps.SetNode("")
+		case kubecost.NamespaceProp:
+			aggProps.SetNamespace("")
+		case kubecost.ControllerKindProp:
+			aggProps.SetControllerKind("")
+		case kubecost.ControllerProp:
+			aggProps.SetController("")
+		case kubecost.PodProp:
+			aggProps.SetPod("")
+		case kubecost.ContainerProp:
+			aggProps.SetContainer("")
+		case kubecost.ServiceProp:
+			aggProps.SetServices([]string{})
+		case kubecost.LabelProp:
+			if len(fields) != 2 {
+				return kubecost.Properties{}, fmt.Errorf("illegal aggregate by label: %s", raw)
+			}
+			label := prom.SanitizeLabelName(strings.TrimSpace(fields[1]))
+			labelMap[label] = ""
+		case kubecost.AnnotationProp:
+			if len(fields) != 2 {
+				return kubecost.Properties{}, fmt.Errorf("illegal aggregate by annotation: %s", raw)
+			}
+			annotation := prom.SanitizeLabelName(strings.TrimSpace(fields[1]))
+			annotationMap[annotation] = ""
+		}
+
+	}
+
+	if len(labelMap) > 0 {
+		aggProps.SetLabels(labelMap)
+	}
+
+	if len(annotationMap) > 0 {
+		aggProps.SetAnnotations(annotationMap)
+	}
+
+	return aggProps, nil
+}
+
+// ComputeAllocationHandler computes an AllocationSetRange from the CostModel.
+func (a *Accesses) ComputeAllocationHandler(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
+	w.Header().Set("Content-Type", "application/json")
+
+	qp := util.NewQueryParams(r.URL.Query())
+
+	// Window is a required field describing the window of time over which to
+	// compute allocation data.
+	window, err := kubecost.ParseWindowWithOffset(qp.Get("window", ""), env.GetParsedUTCOffset())
+	if err != nil {
+		http.Error(w, fmt.Sprintf("Invalid 'window' parameter: %s", err), http.StatusBadRequest)
+	}
+
+	// Step is an optional parameter that defines the duration per-set, i.e.
+	// the window for an AllocationSet, of the AllocationSetRange to be
+	// computed. Defaults to the window size, making one set.
+	step := qp.GetDuration("step", window.Duration())
+
+	// Resolution is an optional parameter, defaulting to the configured ETL
+	// resolution.
+	resolution := qp.GetDuration("resolution", env.GetETLResolution())
+
+	// Aggregation is a required comma-separated list of fields by which to
+	// aggregate results. Some fields allow a sub-field, which is distinguished
+	// with a colon; e.g. "label:app".
+	// Examples: "namespace", "namespace,label:app"
+	aggregateBy, err := ParseAggregationProperties(qp, "aggregate")
+	if err != nil {
+		http.Error(w, fmt.Sprintf("Invalid 'aggregate' parameter: %s", err), http.StatusBadRequest)
+	}
+
+	// Accumulate is an optional parameter, defaulting to false, which if true
+	// sums each Set in the Range, producing one Set.
+	accumulate := qp.GetBool("accumulate", false)
+
+	// Query for AllocationSets in increments of the given step duration,
+	// appending each to the AllocationSetRange.
+	asr := kubecost.NewAllocationSetRange()
+	stepStart := *window.Start()
+	for window.End().After(stepStart) {
+		stepEnd := stepStart.Add(step)
+		stepWindow := kubecost.NewWindow(&stepStart, &stepEnd)
+
+		as, err := a.Model.ComputeAllocation(*stepWindow.Start(), *stepWindow.End(), resolution)
+		if err != nil {
+			WriteError(w, InternalServerError(err.Error()))
+			return
+		}
+		asr.Append(as)
+
+		stepStart = stepEnd
+	}
+
+	// Aggregate, if requested
+	if len(aggregateBy) > 0 {
+		err = asr.AggregateBy(aggregateBy, nil)
+		if err != nil {
+			WriteError(w, InternalServerError(err.Error()))
+			return
+		}
+	}
+
+	// Accumulate, if requested
+	if accumulate {
+		as, err := asr.Accumulate()
+		if err != nil {
+			WriteError(w, InternalServerError(err.Error()))
+			return
+		}
+		asr = kubecost.NewAllocationSetRange(as)
+	}
+
+	w.Write(WrapData(asr, nil))
+}
+
 // The below was transferred from a different package in order to maintain
 // The below was transferred from a different package in order to maintain
 // previous behavior. Ultimately, we should clean this up at some point.
 // previous behavior. Ultimately, we should clean this up at some point.
 // TODO move to util and/or standardize everything
 // TODO move to util and/or standardize everything

+ 1 - 0
pkg/costmodel/router.go

@@ -1070,6 +1070,7 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) *Accesses {
 	a.Router.GET("/costDataModel", a.CostDataModel)
 	a.Router.GET("/costDataModel", a.CostDataModel)
 	a.Router.GET("/costDataModelRange", a.CostDataModelRange)
 	a.Router.GET("/costDataModelRange", a.CostDataModelRange)
 	a.Router.GET("/aggregatedCostModel", a.AggregateCostModelHandler)
 	a.Router.GET("/aggregatedCostModel", a.AggregateCostModelHandler)
+	a.Router.GET("/allocation/compute", a.ComputeAllocationHandler)
 	a.Router.GET("/outOfClusterCosts", a.OutOfClusterCostsWithCache)
 	a.Router.GET("/outOfClusterCosts", a.OutOfClusterCostsWithCache)
 	a.Router.GET("/allNodePricing", a.GetAllNodePricing)
 	a.Router.GET("/allNodePricing", a.GetAllNodePricing)
 	a.Router.POST("/refreshPricing", a.RefreshPricingData)
 	a.Router.POST("/refreshPricing", a.RefreshPricingData)

+ 24 - 0
pkg/env/costmodelenv.go

@@ -30,6 +30,7 @@ const (
 	SQLAddressEnvVar               = "SQL_ADDRESS"
 	SQLAddressEnvVar               = "SQL_ADDRESS"
 	UseCSVProviderEnvVar           = "USE_CSV_PROVIDER"
 	UseCSVProviderEnvVar           = "USE_CSV_PROVIDER"
 	CSVRegionEnvVar                = "CSV_REGION"
 	CSVRegionEnvVar                = "CSV_REGION"
+	CSVEndpointEnvVar 			   = "CSV_ENDPOINT"
 	CSVPathEnvVar                  = "CSV_PATH"
 	CSVPathEnvVar                  = "CSV_PATH"
 	ConfigPathEnvVar               = "CONFIG_PATH"
 	ConfigPathEnvVar               = "CONFIG_PATH"
 	CloudProviderAPIKeyEnvVar      = "CLOUD_PROVIDER_API_KEY"
 	CloudProviderAPIKeyEnvVar      = "CLOUD_PROVIDER_API_KEY"
@@ -64,6 +65,7 @@ const (
 	CacheWarmingEnabledEnvVar    = "CACHE_WARMING_ENABLED"
 	CacheWarmingEnabledEnvVar    = "CACHE_WARMING_ENABLED"
 	ETLEnabledEnvVar             = "ETL_ENABLED"
 	ETLEnabledEnvVar             = "ETL_ENABLED"
 	ETLMaxBatchHours             = "ETL_MAX_BATCH_HOURS"
 	ETLMaxBatchHours             = "ETL_MAX_BATCH_HOURS"
+	ETLResolutionSeconds         = "ETL_RESOLUTION_SECONDS"
 	LegacyExternalAPIDisabledVar = "LEGACY_EXTERNAL_API_DISABLED"
 	LegacyExternalAPIDisabledVar = "LEGACY_EXTERNAL_API_DISABLED"
 )
 )
 
 
@@ -173,6 +175,12 @@ func GetCSVRegion() string {
 	return Get(CSVRegionEnvVar, "")
 	return Get(CSVRegionEnvVar, "")
 }
 }
 
 
+// GetCSVEndpoint returns the environment variable value for CSVEndpointEnvVar which represents the
+// endpoint configured for a S3 CSV provider another than AWS S3.
+func GetCSVEndpoint() string {
+	return Get(CSVEndpointEnvVar, "")
+}
+
 // GetCSVPath returns the environment variable value for CSVPathEnvVar which represents the key path
 // GetCSVPath returns the environment variable value for CSVPathEnvVar which represents the key path
 // configured for a CSV provider.
 // configured for a CSV provider.
 func GetCSVPath() string {
 func GetCSVPath() string {
@@ -349,6 +357,22 @@ func GetETLMaxBatchDuration() time.Duration {
 	return hrs * time.Hour
 	return hrs * time.Hour
 }
 }
 
 
+// GetETLResolution determines the resolution of ETL queries. The smaller the
+// duration, the higher the resolution; the higher the resolution, the more
+// accurate the query results, but the more computationally expensive. This
+// value is always 1m for Prometheus, but is configurable for Thanos.
+func GetETLResolution() time.Duration {
+	// If Thanos is not enabled, hard-code to 1m resolution
+	if !IsThanosEnabled() {
+		return 60 * time.Second
+	}
+
+	// Thanos is enabled, so use the configured ETL resolution, or default to
+	// 5m (i.e. 300s)
+	secs := time.Duration(GetInt64(ETLResolutionSeconds, 300))
+	return secs * time.Second
+}
+
 func LegacyExternalCostsAPIDisabled() bool {
 func LegacyExternalCostsAPIDisabled() bool {
 	return GetBool(LegacyExternalAPIDisabledVar, false)
 	return GetBool(LegacyExternalAPIDisabledVar, false)
 }
 }