Sfoglia il codice sorgente

Merge pull request #176 from kubecost/nikovacevic-caching

Issue #173 implement caching for aggregatedCostModel endpoint
Niko Kovacevic 6 anni fa
parent
commit
22b4148c29
3 ha cambiato i file con 100 aggiunte e 32 eliminazioni
  1. 1 0
      go.mod
  2. 2 0
      go.sum
  3. 97 32
      main.go

+ 1 - 0
go.mod

@@ -23,6 +23,7 @@ require (
 	github.com/julienschmidt/httprouter v1.2.0
 	github.com/lib/pq v1.2.0
 	github.com/mitchellh/go-homedir v1.1.0 // indirect
+	github.com/patrickmn/go-cache v2.1.0+incompatible
 	github.com/pkg/errors v0.8.1 // indirect
 	github.com/prometheus/client_golang v1.0.0
 	github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90

+ 2 - 0
go.sum

@@ -169,6 +169,8 @@ github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1Cpa
 github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
 github.com/openzipkin/zipkin-go v0.1.3/go.mod h1:NtoC/o8u3JlF1lSlyPNswIbeQH9bJTmOf0Erfk+hxe8=
 github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJc5AZX7/PBEpw=
+github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
+github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
 github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
 github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
 github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw=

+ 97 - 32
main.go

@@ -18,13 +18,12 @@ import (
 	"github.com/julienschmidt/httprouter"
 	costAnalyzerCloud "github.com/kubecost/cost-model/cloud"
 	costModel "github.com/kubecost/cost-model/costmodel"
+	"github.com/patrickmn/go-cache"
 	prometheusClient "github.com/prometheus/client_golang/api"
 	prometheusAPI "github.com/prometheus/client_golang/api/prometheus/v1"
-	v1 "k8s.io/api/core/v1"
-
 	"github.com/prometheus/client_golang/prometheus"
-
 	"github.com/prometheus/client_golang/prometheus/promhttp"
+	v1 "k8s.io/api/core/v1"
 
 	"k8s.io/client-go/kubernetes"
 	"k8s.io/client-go/rest"
@@ -63,6 +62,7 @@ type Accesses struct {
 	ServiceSelectorRecorder       *prometheus.GaugeVec
 	DeploymentSelectorRecorder    *prometheus.GaugeVec
 	Model                         *costModel.CostModel
+	Cache                         *cache.Cache
 }
 
 type DataEnvelope struct {
@@ -72,24 +72,50 @@ type DataEnvelope struct {
 	Message string      `json:"message,omitempty"`
 }
 
+func wrapDataWithMessage(data interface{}, err error, message string) []byte {
+	var resp []byte
+
+	if err != nil {
+		klog.V(1).Infof("Error returned to client: %s", err.Error())
+		resp, _ = json.Marshal(&DataEnvelope{
+			Code:    http.StatusInternalServerError,
+			Status:  "error",
+			Message: err.Error(),
+			Data:    data,
+		})
+	} else {
+		resp, _ = json.Marshal(&DataEnvelope{
+			Code:    http.StatusOK,
+			Status:  "success",
+			Data:    data,
+			Message: message,
+		})
+
+	}
+
+	return resp
+}
+
 func wrapData(data interface{}, err error) []byte {
 	var resp []byte
+
 	if err != nil {
 		klog.V(1).Infof("Error returned to client: %s", err.Error())
 		resp, _ = json.Marshal(&DataEnvelope{
-			Code:    500,
+			Code:    http.StatusInternalServerError,
 			Status:  "error",
 			Message: err.Error(),
 			Data:    data,
 		})
 	} else {
 		resp, _ = json.Marshal(&DataEnvelope{
-			Code:   200,
+			Code:   http.StatusOK,
 			Status: "success",
 			Data:   data,
 		})
 
 	}
+
 	return resp
 }
 
@@ -139,7 +165,7 @@ func (a *Accesses) CostDataModel(w http.ResponseWriter, r *http.Request, ps http
 	offset := r.URL.Query().Get("offset")
 	fields := r.URL.Query().Get("filterFields")
 	namespace := r.URL.Query().Get("namespace")
-	aggregation := r.URL.Query().Get("aggregation")
+	aggregationField := r.URL.Query().Get("aggregation")
 	aggregationSubField := r.URL.Query().Get("aggregationSubfield")
 
 	if offset != "" {
@@ -147,7 +173,7 @@ func (a *Accesses) CostDataModel(w http.ResponseWriter, r *http.Request, ps http
 	}
 
 	data, err := a.Model.ComputeCostData(a.PrometheusClient, a.KubeClientSet, a.Cloud, window, offset, namespace)
-	if aggregation != "" {
+	if aggregationField != "" {
 		c, err := a.Cloud.GetConfig()
 		if err != nil {
 			w.Write(wrapData(nil, err))
@@ -157,7 +183,7 @@ func (a *Accesses) CostDataModel(w http.ResponseWriter, r *http.Request, ps http
 			w.Write(wrapData(nil, err))
 		}
 		discount = discount * 0.01
-		agg := costModel.AggregateCostModel(data, discount, 1.0, nil, aggregation, aggregationSubField)
+		agg := costModel.AggregateCostModel(data, discount, 1.0, nil, aggregationField, aggregationSubField)
 		w.Write(wrapData(agg, nil))
 	} else {
 		if fields != "" {
@@ -204,16 +230,28 @@ func (a *Accesses) ClusterCostsOverTime(w http.ResponseWriter, r *http.Request,
 func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
+
 	window := r.URL.Query().Get("window")
 	offset := r.URL.Query().Get("offset")
-	aggregation := r.URL.Query().Get("aggregation")
 	namespace := r.URL.Query().Get("namespace")
+	aggregationField := r.URL.Query().Get("aggregation")
 	aggregationSubField := r.URL.Query().Get("aggregationSubfield")
 	allocateIdle := r.URL.Query().Get("allocateIdle")
 	sharedNamespaces := r.URL.Query().Get("sharedNamespaces")
 	sharedLabelNames := r.URL.Query().Get("sharedLabelNames")
 	sharedLabelValues := r.URL.Query().Get("sharedLabelValues")
 
+	disableCache := r.URL.Query().Get("disableCache") == "true"
+	clearCache := r.URL.Query().Get("clearCache") == "true"
+
+	if aggregationField == "" {
+		w.WriteHeader(http.StatusBadRequest)
+		w.Write(wrapData(nil, fmt.Errorf("Missing aggregation parameter")))
+		return
+	}
+
+	// endTime defaults to the current time, unless an offset is explicity declared,
+	// in which case it shifts endTime back by given duration
 	endTime := time.Now()
 	if offset != "" {
 		o, err := time.ParseDuration(offset)
@@ -225,6 +263,8 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
 		endTime = endTime.Add(-1 * o)
 	}
 
+	// if window is defined in terms of days, convert it to hours
+	// e.g. convert "2d" to "48h"
 	if window[len(window)-1:] == "d" {
 		count := window[:len(window)-1]
 		val, err := strconv.ParseInt(count, 10, 64)
@@ -236,6 +276,8 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
 		window = fmt.Sprintf("%dh", val)
 	}
 
+	// convert time window into start and end times, formatted
+	// as ISO datetime strings
 	d, err := time.ParseDuration(window)
 	if err != nil {
 		w.Write(wrapData(nil, err))
@@ -247,6 +289,22 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
 	start := startTime.Format(layout)
 	end := endTime.Format(layout)
 
+	// clear cache prior to checking the cache so that a clearCache=true
+	// request always returns a freshly computed value
+	if clearCache {
+		a.Cache.Flush()
+	}
+
+	aggKey := fmt.Sprintf("aggregate:%s:%s:%s:%s:%s", window, offset, namespace, aggregationField, aggregationSubField)
+
+	// check the cache for aggregated response; if cache is hit and not disabled, return response
+	if result, found := a.Cache.Get(aggKey); found && !disableCache {
+		// TODO send http.StatusNotModified when testing is complete
+		w.WriteHeader(http.StatusOK)
+		w.Write(wrapDataWithMessage(result, nil, fmt.Sprintf("cache hit: %s", aggKey)))
+		return
+	}
+
 	remote := r.URL.Query().Get("remote")
 
 	remoteAvailable := os.Getenv(remoteEnabled)
@@ -255,6 +313,7 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
 		remoteEnabled = true
 	}
 	klog.Infof("REMOTE ENABLED: %t", remoteEnabled)
+
 	data, err := a.Model.ComputeCostDataRange(a.PrometheusClient, a.KubeClientSet, a.Cloud, start, end, "1h", namespace, remoteEnabled)
 	if err != nil {
 		w.Write(wrapData(nil, err))
@@ -281,28 +340,30 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
 		}
 	}
 
-	if aggregation != "" {
-		sn := []string{}
-		sln := []string{}
-		slv := []string{}
-		if sharedNamespaces != "" {
-			sn = strings.Split(sharedNamespaces, ",")
-		}
-		if sharedLabelNames != "" {
-			sln = strings.Split(sharedLabelNames, ",")
-			slv = strings.Split(sharedLabelValues, ",")
-			if len(sln) != len(slv) || slv[0] == "" {
-				w.Write(wrapData(nil, fmt.Errorf("Supply exacly one label value per label name")))
-				return
-			}
-		}
-		var s *costModel.SharedResourceInfo
-		if len(sn) > 0 || len(sln) > 0 {
-			s = costModel.NewSharedResourceInfo(true, sn, sln, slv)
+	sn := []string{}
+	sln := []string{}
+	slv := []string{}
+	if sharedNamespaces != "" {
+		sn = strings.Split(sharedNamespaces, ",")
+	}
+	if sharedLabelNames != "" {
+		sln = strings.Split(sharedLabelNames, ",")
+		slv = strings.Split(sharedLabelValues, ",")
+		if len(sln) != len(slv) || slv[0] == "" {
+			w.Write(wrapData(nil, fmt.Errorf("Supply exacly one label value per label name")))
+			return
 		}
-		agg := costModel.AggregateCostModel(data, discount, idleCoefficient, s, aggregation, aggregationSubField)
-		w.Write(wrapData(agg, nil))
 	}
+	var s *costModel.SharedResourceInfo
+	if len(sn) > 0 || len(sln) > 0 {
+		s = costModel.NewSharedResourceInfo(true, sn, sln, slv)
+	}
+
+	// aggregate cost model data by given fields and cache the result for the default expiration
+	result := costModel.AggregateCostModel(data, discount, idleCoefficient, s, aggregationField, aggregationSubField)
+	a.Cache.Set(aggKey, result, cache.DefaultExpiration)
+
+	w.Write(wrapDataWithMessage(result, nil, fmt.Sprintf("cache miss: %s", aggKey)))
 }
 
 func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
@@ -314,7 +375,7 @@ func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps
 	window := r.URL.Query().Get("window")
 	fields := r.URL.Query().Get("filterFields")
 	namespace := r.URL.Query().Get("namespace")
-	aggregation := r.URL.Query().Get("aggregation")
+	aggregationField := r.URL.Query().Get("aggregation")
 	aggregationSubField := r.URL.Query().Get("aggregationSubfield")
 	remote := r.URL.Query().Get("remote")
 
@@ -327,7 +388,7 @@ func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps
 	if err != nil {
 		w.Write(wrapData(nil, err))
 	}
-	if aggregation != "" {
+	if aggregationField != "" {
 		c, err := a.Cloud.GetConfig()
 		if err != nil {
 			w.Write(wrapData(nil, err))
@@ -337,7 +398,7 @@ func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps
 			w.Write(wrapData(nil, err))
 		}
 		discount = discount * 0.01
-		agg := costModel.AggregateCostModel(data, discount, 1.0, nil, aggregation, aggregationSubField)
+		agg := costModel.AggregateCostModel(data, discount, 1.0, nil, aggregationField, aggregationSubField)
 		w.Write(wrapData(agg, nil))
 	} else {
 		if fields != "" {
@@ -824,6 +885,9 @@ func main() {
 		KubeClientSet: kubeClientset,
 	})
 
+	// cache responses from model for a default of 2 minutes; clear expired responses every 10 minutes
+	modelCache := cache.New(time.Minute*2, time.Minute*10)
+
 	a := Accesses{
 		PrometheusClient:              promCli,
 		KubeClientSet:                 kubeClientset,
@@ -842,6 +906,7 @@ func main() {
 		NetworkInternetEgressRecorder: NetworkInternetEgressRecorder,
 		PersistentVolumePriceRecorder: pvGv,
 		Model:                         costModel.NewCostModel(kubeClientset),
+		Cache:                         modelCache,
 	}
 
 	remoteEnabled := os.Getenv(remoteEnabled)