Selaa lähdekoodia

Issue #173 implement caching for aggregatedCostModel endpoint

Niko Kovacevic 6 vuotta sitten
vanhempi
sitoutus
42a31f1dc1
3 muutettua tiedostoa jossa 106 lisäystä ja 16 poistoa
  1. 3 0
      go.mod
  2. 2 0
      go.sum
  3. 101 16
      main.go

+ 3 - 0
go.mod

@@ -23,6 +23,7 @@ require (
 	github.com/julienschmidt/httprouter v1.2.0
 	github.com/lib/pq v1.2.0
 	github.com/mitchellh/go-homedir v1.1.0 // indirect
+	github.com/patrickmn/go-cache v2.1.0+incompatible
 	github.com/pkg/errors v0.8.1 // indirect
 	github.com/prometheus/client_golang v1.0.0
 	github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90
@@ -39,3 +40,5 @@ require (
 	k8s.io/client-go v0.0.0-20190620085101-78d2af792bab
 	k8s.io/klog v0.4.0
 )
+
+go 1.13

+ 2 - 0
go.sum

@@ -169,6 +169,8 @@ github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1Cpa
 github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
 github.com/openzipkin/zipkin-go v0.1.3/go.mod h1:NtoC/o8u3JlF1lSlyPNswIbeQH9bJTmOf0Erfk+hxe8=
 github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJc5AZX7/PBEpw=
+github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
+github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
 github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
 github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
 github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw=

+ 101 - 16
main.go

@@ -18,6 +18,7 @@ import (
 	"github.com/julienschmidt/httprouter"
 	costAnalyzerCloud "github.com/kubecost/cost-model/cloud"
 	costModel "github.com/kubecost/cost-model/costmodel"
+	"github.com/patrickmn/go-cache"
 	prometheusClient "github.com/prometheus/client_golang/api"
 	prometheusAPI "github.com/prometheus/client_golang/api/prometheus/v1"
 	v1 "k8s.io/api/core/v1"
@@ -61,6 +62,7 @@ type Accesses struct {
 	ServiceSelectorRecorder       *prometheus.GaugeVec
 	DeploymentSelectorRecorder    *prometheus.GaugeVec
 	Model                         *costModel.CostModel
+	Cache                         *cache.Cache
 }
 
 type DataEnvelope struct {
@@ -70,24 +72,50 @@ type DataEnvelope struct {
 	Message string      `json:"message,omitempty"`
 }
 
+func wrapDataWithMessage(data interface{}, err error, message string) []byte {
+	var resp []byte
+
+	if err != nil {
+		klog.V(1).Infof("Error returned to client: %s", err.Error())
+		resp, _ = json.Marshal(&DataEnvelope{
+			Code:    http.StatusInternalServerError,
+			Status:  "error",
+			Message: err.Error(),
+			Data:    data,
+		})
+	} else {
+		resp, _ = json.Marshal(&DataEnvelope{
+			Code:    http.StatusOK,
+			Status:  "success",
+			Data:    data,
+			Message: message,
+		})
+
+	}
+
+	return resp
+}
+
 func wrapData(data interface{}, err error) []byte {
 	var resp []byte
+
 	if err != nil {
 		klog.V(1).Infof("Error returned to client: %s", err.Error())
 		resp, _ = json.Marshal(&DataEnvelope{
-			Code:    500,
+			Code:    http.StatusInternalServerError,
 			Status:  "error",
 			Message: err.Error(),
 			Data:    data,
 		})
 	} else {
 		resp, _ = json.Marshal(&DataEnvelope{
-			Code:   200,
+			Code:   http.StatusOK,
 			Status: "success",
 			Data:   data,
 		})
 
 	}
+
 	return resp
 }
 
@@ -137,7 +165,7 @@ func (a *Accesses) CostDataModel(w http.ResponseWriter, r *http.Request, ps http
 	offset := r.URL.Query().Get("offset")
 	fields := r.URL.Query().Get("filterFields")
 	namespace := r.URL.Query().Get("namespace")
-	aggregation := r.URL.Query().Get("aggregation")
+	aggregationField := r.URL.Query().Get("aggregation")
 	aggregationSubField := r.URL.Query().Get("aggregationSubfield")
 
 	if offset != "" {
@@ -145,7 +173,7 @@ func (a *Accesses) CostDataModel(w http.ResponseWriter, r *http.Request, ps http
 	}
 
 	data, err := a.Model.ComputeCostData(a.PrometheusClient, a.KubeClientSet, a.Cloud, window, offset, namespace)
-	if aggregation != "" {
+	if aggregationField != "" {
 		c, err := a.Cloud.GetConfig()
 		if err != nil {
 			w.Write(wrapData(nil, err))
@@ -155,7 +183,7 @@ func (a *Accesses) CostDataModel(w http.ResponseWriter, r *http.Request, ps http
 			w.Write(wrapData(nil, err))
 		}
 
-		agg := costModel.AggregateCostModel(data, discount, aggregation, aggregationSubField)
+		agg := costModel.AggregateCostModel(data, discount, aggregationField, aggregationSubField)
 		w.Write(wrapData(agg, nil))
 	} else {
 		if fields != "" {
@@ -202,12 +230,25 @@ func (a *Accesses) ClusterCostsOverTime(w http.ResponseWriter, r *http.Request,
 func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
+
 	window := r.URL.Query().Get("window")
 	offset := r.URL.Query().Get("offset")
-	aggregation := r.URL.Query().Get("aggregation")
 	namespace := r.URL.Query().Get("namespace")
+	aggregationField := r.URL.Query().Get("aggregation")
 	aggregationSubField := r.URL.Query().Get("aggregationSubfield")
 
+	disableCache := r.URL.Query().Get("disableCache") == "true"
+	clearCache := r.URL.Query().Get("clearCache") == "true"
+
+	// TODO nikovacevic-caching this should be required, right? Can we return 400 Bad Request if not set?
+	if aggregationField == "" {
+		w.WriteHeader(http.StatusBadRequest)
+		w.Write(wrapDataWithMessage(nil, fmt.Errorf("Invalid aggregation field"), fmt.Sprintf("cache miss")))
+		return
+	}
+
+	// endTime defaults to the current time, unless an offset is explicity declared,
+	// in which case it shifts endTime back by given duration
 	endTime := time.Now()
 	if offset != "" {
 		o, err := time.ParseDuration(offset)
@@ -219,6 +260,8 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
 		endTime = endTime.Add(-1 * o)
 	}
 
+	// if window is defined in terms of days, convert it to hours
+	// e.g. convert "2d" to "48h"
 	if window[len(window)-1:] == "d" {
 		count := window[:len(window)-1]
 		val, err := strconv.ParseInt(count, 10, 64)
@@ -230,6 +273,8 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
 		window = fmt.Sprintf("%dh", val)
 	}
 
+	// convert time window into start and end times, formatted
+	// as ISO datetime strings
 	d, err := time.ParseDuration(window)
 	if err != nil {
 		w.Write(wrapData(nil, err))
@@ -239,23 +284,52 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
 	layout := "2006-01-02T15:04:05.000Z"
 	start := startTime.Format(layout)
 	end := endTime.Format(layout)
+
+	// TODO nikovacevic-caching
+	// move clear/disable/key logic into a centralized component
+
+	// clear cache prior to checking the cache so that a clearCache=true
+	// request always returns a freshly computed value
+	if clearCache {
+		a.Cache.Flush()
+	}
+
+	aggKey := fmt.Sprintf("aggregate:%s:%s:%s:%s:%s", window, offset, namespace, aggregationField, aggregationSubField)
+
+	// check the cache for aggregated response; if cache is hit and not disabled, return response
+	if result, found := a.Cache.Get(aggKey); found && !disableCache {
+		// TODO send http.StatusNotModified when testing is complete
+		w.WriteHeader(http.StatusOK)
+		w.Write(wrapDataWithMessage(result, nil, fmt.Sprintf("cache hit: %s", aggKey)))
+		return
+	}
+
+	// TODO nikovacevic-caching cache raw cost data and unmarshal it
+	// rawKey := fmt.Sprintf("raw:%s:%s:%s:%s", start, end, "1h", namespace)
+	// data, found := a.Cache.Get(rawKey)
+	// if !found || disableCache ...
+
 	data, err := a.Model.ComputeCostDataRange(a.PrometheusClient, a.KubeClientSet, a.Cloud, start, end, "1h", namespace)
 	if err != nil {
-		w.Write(wrapData(nil, err))
+		w.Write(wrapDataWithMessage(nil, err, fmt.Sprintf("cache miss")))
 		return
 	}
 	c, err := a.Cloud.GetConfig()
 	if err != nil {
-		w.Write(wrapData(nil, err))
+		w.Write(wrapDataWithMessage(nil, err, fmt.Sprintf("cache miss")))
+		return
 	}
 	discount, err := strconv.ParseFloat(c.Discount[:len(c.Discount)-1], 64)
 	if err != nil {
-		w.Write(wrapData(nil, err))
-	}
-	if aggregation != "" {
-		agg := costModel.AggregateCostModel(data, discount*0.01, aggregation, aggregationSubField)
-		w.Write(wrapData(agg, nil))
+		w.Write(wrapDataWithMessage(nil, err, fmt.Sprintf("cache miss")))
+		return
 	}
+
+	// aggregate cost model data by given fields and cache the result for the default expiration
+	result := costModel.AggregateCostModel(data, discount*0.01, aggregationField, aggregationSubField)
+	a.Cache.Set(aggKey, result, cache.DefaultExpiration)
+
+	w.Write(wrapDataWithMessage(result, nil, fmt.Sprintf("cache miss")))
 }
 
 func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
@@ -267,14 +341,14 @@ func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps
 	window := r.URL.Query().Get("window")
 	fields := r.URL.Query().Get("filterFields")
 	namespace := r.URL.Query().Get("namespace")
-	aggregation := r.URL.Query().Get("aggregation")
+	aggregationField := r.URL.Query().Get("aggregation")
 	aggregationSubField := r.URL.Query().Get("aggregationSubfield")
 
 	data, err := a.Model.ComputeCostDataRange(a.PrometheusClient, a.KubeClientSet, a.Cloud, start, end, window, namespace)
 	if err != nil {
 		w.Write(wrapData(nil, err))
 	}
-	if aggregation != "" {
+	if aggregationField != "" {
 		c, err := a.Cloud.GetConfig()
 		if err != nil {
 			w.Write(wrapData(nil, err))
@@ -283,7 +357,7 @@ func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps
 		if err != nil {
 			w.Write(wrapData(nil, err))
 		}
-		agg := costModel.AggregateCostModel(data, discount, aggregation, aggregationSubField)
+		agg := costModel.AggregateCostModel(data, discount, aggregationField, aggregationSubField)
 		w.Write(wrapData(agg, nil))
 	} else {
 		if fields != "" {
@@ -771,6 +845,9 @@ func main() {
 		KubeClientSet: kubeClientset,
 	})
 
+	// cache responses from model for a default of 2 minutes; clear expired responses every 10 minutes
+	modelCache := cache.New(time.Minute*2, time.Minute*10)
+
 	a := Accesses{
 		PrometheusClient:              promCli,
 		KubeClientSet:                 kubeClientset,
@@ -789,6 +866,7 @@ func main() {
 		NetworkInternetEgressRecorder: NetworkInternetEgressRecorder,
 		PersistentVolumePriceRecorder: pvGv,
 		Model:                         costModel.NewCostModel(kubeClientset),
+		Cache:                         modelCache,
 	}
 
 	remoteEnabled := os.Getenv(remoteEnabled)
@@ -832,6 +910,13 @@ func main() {
 	router.GET("/containerUptimes", a.ContainerUptimes)
 	router.GET("/aggregatedCostModel", a.AggregateCostModel)
 
+	router.GET("/image", func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
+		w.Header().Set("Content-Type", "application/json")
+		w.Header().Set("Access-Control-Allow-Origin", "*")
+		w.Write([]byte("nikovacevic/kubecost-cost-model:caching-4"))
+		return
+	})
+
 	rootMux := http.NewServeMux()
 	rootMux.Handle("/", router)
 	rootMux.Handle("/metrics", promhttp.Handler())