Просмотр исходного кода

Merge branch 'niko/aggapi' of github.com:kubecost/cost-model into niko/aggapi

Matt Bolt 5 лет назад
Родитель
Сommit
556b4fbf76
3 измененных файлов с 131 добавлено и 37 удалено
  1. 23 20
      pkg/costmodel/aggregation.go
  2. 64 17
      pkg/costmodel/router.go
  3. 44 0
      pkg/env/costmodelenv.go

+ 23 - 20
pkg/costmodel/aggregation.go

@@ -12,6 +12,7 @@ import (
 
 	"github.com/julienschmidt/httprouter"
 	"github.com/kubecost/cost-model/pkg/cloud"
+	"github.com/kubecost/cost-model/pkg/env"
 	"github.com/kubecost/cost-model/pkg/kubecost"
 	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/prom"
@@ -1613,7 +1614,7 @@ func (a *Accesses) AggregateCostModelHandler(w http.ResponseWriter, r *http.Requ
 	// determine duration and offset from query parameters
 	window, err := kubecost.ParseWindowWithOffset(windowStr, env.GetParsedUTCOffset())
 	if err != nil || window.Start() == nil {
-		WriteError(w, BadRequest(fmt.Sprintf("invalid window: %s", err)))
+		http.Error(w, fmt.Sprintf("invalid window: %s", err), http.StatusBadRequest)
 		return
 	}
 	duration, offset := window.ToDurationOffset()
@@ -1712,19 +1713,19 @@ func (a *Accesses) AggregateCostModelHandler(w http.ResponseWriter, r *http.Requ
 
 	// aggregation field is required
 	if field == "" {
-		WriteError(w, BadRequest("Missing aggregation field parameter"))
+		http.Error(w, "Missing aggregation field parameter", http.StatusBadRequest)
 		return
 	}
 
 	// aggregation subfield is required when aggregation field is "label"
 	if field == "label" && len(subfields) == 0 {
-		WriteError(w, BadRequest("Missing aggregation subfield parameter for aggregation by label"))
+		http.Error(w, "Missing aggregation subfield parameter for aggregation by label", http.StatusBadRequest)
 		return
 	}
 
 	// enforce one of four available rate options
 	if rate != "" && rate != "hourly" && rate != "daily" && rate != "monthly" {
-		WriteError(w, BadRequest("If set, rate parameter must be one of: 'hourly', 'daily', 'monthly'"))
+		http.Error(w, "If set, rate parameter must be one of: 'hourly', 'daily', 'monthly'", http.StatusBadRequest)
 		return
 	}
 
@@ -1750,7 +1751,7 @@ func (a *Accesses) AggregateCostModelHandler(w http.ResponseWriter, r *http.Requ
 		sln = strings.Split(sharedLabelNames, ",")
 		slv = strings.Split(sharedLabelValues, ",")
 		if len(sln) != len(slv) || slv[0] == "" {
-			WriteError(w, BadRequest("Supply exacly one shared label value per shared label name"))
+			http.Error(w, "Supply exacly one shared label value per shared label name", http.StatusBadRequest)
 			return
 		}
 	}
@@ -1772,14 +1773,16 @@ func (a *Accesses) AggregateCostModelHandler(w http.ResponseWriter, r *http.Requ
 	var data map[string]*Aggregation
 	var message string
 
-	etlEnabled := env.IsETLEnabled()
-	useETLAdapter := r.URL.Query().Get("etl") == "true"
-	if etlEnabled && useETLAdapter {
-		data, message, err = a.AdaptETLAggregateCostModel(window, field, subfields, rate, filters, sr, shared, allocateIdle, includeTimeSeries)
-	} else {
-		data, message, err = a.ComputeAggregateCostModel(promClient, duration, offset, field, subfields, rate, filters,
-			sr, shared, allocateIdle, includeTimeSeries, includeEfficiency, disableCache, clearCache, noCache, noExpireCache, remoteEnabled, false)
-	}
+	// etlEnabled := env.IsETLEnabled()
+	// useETLAdapter := r.URL.Query().Get("etl") == "true"
+	// if etlEnabled && useETLAdapter {
+	// 	data, message, err = a.AdaptETLAggregateCostModel(window, field, subfields, rate, filters, sr, shared, allocateIdle, includeTimeSeries)
+	// } else {
+	// 	data, message, err = a.ComputeAggregateCostModel(promClient, duration, offset, field, subfields, rate, filters,
+	// 		sr, shared, allocateIdle, includeTimeSeries, includeEfficiency, disableCache, clearCache, noCache, noExpireCache, remoteEnabled, false)
+	// }
+	data, message, err = a.ComputeAggregateCostModel(promClient, duration, offset, field, subfields, rate, filters,
+		sr, shared, allocateIdle, includeTimeSeries, includeEfficiency, disableCache, clearCache, noCache, noExpireCache, remoteEnabled, false)
 
 	// Find any warnings in http request context
 	warning, _ := product.GetWarning(r)
@@ -1787,9 +1790,9 @@ func (a *Accesses) AggregateCostModelHandler(w http.ResponseWriter, r *http.Requ
 	if err != nil {
 		if emptyErr, ok := err.(*EmptyDataError); ok {
 			if warning == "" {
-				WriteDataWithMessage(w, map[string]interface{}{}, emptyErr.Error())
+				w.Write(WrapData(map[string]interface{}{}, emptyErr))
 			} else {
-				WriteDataWithMessageAndWarning(w, map[string]interface{}{}, emptyErr.Error(), warning)
+				w.Write(WrapDataWithWarning(map[string]interface{}{}, emptyErr, warning))
 			}
 			return
 		}
@@ -1808,21 +1811,21 @@ func (a *Accesses) AggregateCostModelHandler(w http.ResponseWriter, r *http.Requ
 					}
 				}
 
-				WriteError(w, InternalServerError(msg))
+				http.Error(w, msg, http.StatusInternalServerError)
 			} else {
 				// Boundary error outside of 90 day period; may not be available
-				WriteError(w, InternalServerError(boundaryErr.Error()))
+				http.Error(w, boundaryErr.Error(), http.StatusInternalServerError)
 			}
 			return
 		}
 		errStr := fmt.Sprintf("error computing aggregate cost model: %s", err)
-		WriteError(w, InternalServerError(errStr))
+		http.Error(w, errStr, http.StatusInternalServerError)
 		return
 	}
 
 	if warning == "" {
-		WriteDataWithMessage(w, data, message)
+		w.Write(WrapDataWithMessage(data, nil, message))
 	} else {
-		WriteDataWithMessageAndWarning(w, data, message, warning)
+		w.Write(WrapDataWithMessageAndWarning(data, nil, message, warning))
 	}
 }

+ 64 - 17
pkg/costmodel/router.go

@@ -23,7 +23,6 @@ import (
 	"github.com/kubecost/cost-model/pkg/costmodel/clusters"
 	"github.com/kubecost/cost-model/pkg/env"
 	"github.com/kubecost/cost-model/pkg/errors"
-	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/prom"
 	"github.com/kubecost/cost-model/pkg/thanos"
 	prometheusClient "github.com/prometheus/client_golang/api"
@@ -142,11 +141,12 @@ func (a *Accesses) ClusterCostsFromCacheHandler(w http.ResponseWriter, r *http.R
 	}
 }
 
-type DataEnvelope struct {
+type Response struct {
 	Code    int         `json:"code"`
 	Status  string      `json:"status"`
 	Data    interface{} `json:"data"`
 	Message string      `json:"message,omitempty"`
+	Warning string      `json:"warning,omitempty"`
 }
 
 // FilterFunc is a filter that returns true iff the given CostData should be filtered out, and the environment that was used as the filter criteria, if it was an aggregate
@@ -306,48 +306,95 @@ func ParseTimeRange(duration, offset string) (*time.Time, *time.Time, error) {
 	return &startTime, &endTime, nil
 }
 
+func WrapData(data interface{}, err error) []byte {
+	var resp []byte
+
+	if err != nil {
+		klog.V(1).Infof("Error returned to client: %s", err.Error())
+		resp, _ = json.Marshal(&Response{
+			Code:    http.StatusInternalServerError,
+			Status:  "error",
+			Message: err.Error(),
+			Data:    data,
+		})
+	} else {
+		resp, _ = json.Marshal(&Response{
+			Code:   http.StatusOK,
+			Status: "success",
+			Data:   data,
+		})
+	}
+
+	return resp
+}
+
 func WrapDataWithMessage(data interface{}, err error, message string) []byte {
 	var resp []byte
 
 	if err != nil {
 		klog.V(1).Infof("Error returned to client: %s", err.Error())
-		resp, _ = json.Marshal(&DataEnvelope{
+		resp, _ = json.Marshal(&Response{
 			Code:    http.StatusInternalServerError,
 			Status:  "error",
 			Message: err.Error(),
 			Data:    data,
 		})
 	} else {
-		resp, _ = json.Marshal(&DataEnvelope{
+		resp, _ = json.Marshal(&Response{
 			Code:    http.StatusOK,
 			Status:  "success",
 			Data:    data,
 			Message: message,
 		})
-
 	}
 
 	return resp
 }
 
-func WrapData(data interface{}, err error) []byte {
+func WrapDataWithWarning(data interface{}, err error, warning string) []byte {
 	var resp []byte
 
 	if err != nil {
 		klog.V(1).Infof("Error returned to client: %s", err.Error())
-		resp, _ = json.Marshal(&DataEnvelope{
+		resp, _ = json.Marshal(&Response{
 			Code:    http.StatusInternalServerError,
 			Status:  "error",
 			Message: err.Error(),
+			Warning: warning,
 			Data:    data,
 		})
 	} else {
-		resp, _ = json.Marshal(&DataEnvelope{
-			Code:   http.StatusOK,
-			Status: "success",
-			Data:   data,
+		resp, _ = json.Marshal(&Response{
+			Code:    http.StatusOK,
+			Status:  "success",
+			Data:    data,
+			Warning: warning,
 		})
+	}
+
+	return resp
+}
+
+func WrapDataWithMessageAndWarning(data interface{}, err error, message, warning string) []byte {
+	var resp []byte
 
+	if err != nil {
+		klog.V(1).Infof("Error returned to client: %s", err.Error())
+		resp, _ = json.Marshal(&Response{
+			Code:    http.StatusInternalServerError,
+			Status:  "error",
+			Message: err.Error(),
+			Warning: warning,
+			Data:    data,
+		})
+	} else {
+		resp, _ = json.Marshal(&Response{
+			Code:    http.StatusOK,
+			Status:  "success",
+			Data:    data,
+			Message: message,
+			Warning: warning,
+		})
 	}
 
 	return resp
@@ -1132,12 +1179,12 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) *Accesses {
 	}
 
 	// warm the cache unless explicitly set to false
-	if env.IsCacheWarmingEnabled() {
-		log.Infof("Init: AggregateCostModel cache warming enabled")
-		warmAggregateCostModelCache()
-	} else {
-		log.Infof("Init: AggregateCostModel cache warming disabled")
-	}
+	// if env.IsCacheWarmingEnabled() {
+	// 	log.Infof("Init: AggregateCostModel cache warming enabled")
+	// 	warmAggregateCostModelCache()
+	// } else {
+	// 	log.Infof("Init: AggregateCostModel cache warming disabled")
+	// }
 
 	a := Accesses{
 		Router:                        httprouter.New(),

+ 44 - 0
pkg/env/costmodelenv.go

@@ -1,5 +1,13 @@
 package env
 
+import (
+	"regexp"
+	"strconv"
+	"time"
+
+	"github.com/kubecost/cost-model/pkg/log"
+)
+
 const (
 	AppVersionEnvVar = "APP_VERSION"
 
@@ -46,6 +54,8 @@ const (
 	InsecureSkipVerify = "INSECURE_SKIP_VERIFY"
 
 	KubeConfigPathEnvVar = "KUBECONFIG_PATH"
+
+	UTCOffsetEnvVar = "UTC_OFFSET"
 )
 
 // GetAWSAccessKeyID returns the environment variable value for AWSAccessKeyIDEnvVar which represents
@@ -265,3 +275,37 @@ func GetMultiClusterBearerToken() string {
 func GetKubeConfigPath() string {
 	return Get(KubeConfigPathEnvVar, "")
 }
+
+// GetUTCOffset returns the environemnt variable value for UTCOffset
+func GetUTCOffset() string {
+	return Get(UTCOffsetEnvVar, "")
+}
+
+// GetParsedUTCOffset returns the duration of the configured UTC offset
+func GetParsedUTCOffset() time.Duration {
+	offset := time.Duration(0)
+
+	if offsetStr := GetUTCOffset(); offsetStr != "" {
+		regex := regexp.MustCompile(`^(\+|-)(\d\d):(\d\d)$`)
+		match := regex.FindStringSubmatch(offsetStr)
+		if match == nil {
+			log.Warningf("Illegal UTC offset: %s", offsetStr)
+			return offset
+		}
+
+		sig := 1
+		if match[1] == "-" {
+			sig = -1
+		}
+
+		hrs64, _ := strconv.ParseInt(match[2], 10, 64)
+		hrs := sig * int(hrs64)
+
+		mins64, _ := strconv.ParseInt(match[3], 10, 64)
+		mins := sig * int(mins64)
+
+		offset = time.Duration(hrs)*time.Hour + time.Duration(mins)
+	}
+
+	return offset
+}