Bläddra i källkod

Migrated proxy query and query range endpoints from /api to /model. This will allow better visibility on frontend queries made through our product.

Matt Bolt 4 år sedan
förälder
incheckning
cf2142ebcd
3 ändrade filer med 197 tillägg och 15 borttagningar
  1. 161 3
      pkg/costmodel/router.go
  2. 3 0
      pkg/prom/contextnames.go
  3. 33 12
      pkg/prom/query.go

+ 161 - 3
pkg/costmodel/router.go

@@ -11,6 +11,7 @@ import (
 	"sync"
 	"sync"
 	"time"
 	"time"
 
 
+	"github.com/kubecost/cost-model/pkg/util/httputil"
 	"github.com/kubecost/cost-model/pkg/util/timeutil"
 	"github.com/kubecost/cost-model/pkg/util/timeutil"
 
 
 	"k8s.io/klog"
 	"k8s.io/klog"
@@ -722,14 +723,165 @@ func (a *Accesses) GetPrometheusMetadata(w http.ResponseWriter, _ *http.Request,
 	w.Write(WrapData(prom.Validate(a.PrometheusClient)))
 	w.Write(WrapData(prom.Validate(a.PrometheusClient)))
 }
 }
 
 
+func (a *Accesses) PrometheusQuery(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
+	w.Header().Set("Content-Type", "application/json")
+	w.Header().Set("Access-Control-Allow-Origin", "*")
+
+	qp := httputil.NewQueryParams(r.URL.Query())
+	query := qp.Get("query", "")
+	if query == "" {
+		w.Write(WrapData(nil, fmt.Errorf("Query Parameter 'query' is unset'")))
+		return
+	}
+
+	ctx := prom.NewNamedContext(a.PrometheusClient, prom.FrontendContextName)
+	body, err := ctx.RawQuery(query)
+	if err != nil {
+		w.Write(WrapData(nil, fmt.Errorf("Error running query %s. Error: %s", query, err)))
+		return
+	}
+
+	w.Write(body)
+}
+
+func (a *Accesses) PrometheusQueryRange(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
+	w.Header().Set("Content-Type", "application/json")
+	w.Header().Set("Access-Control-Allow-Origin", "*")
+
+	qp := httputil.NewQueryParams(r.URL.Query())
+	query := qp.Get("query", "")
+	if query == "" {
+		fmt.Fprintf(w, "Error parsing query from request parameters.")
+		return
+	}
+
+	start, end, duration, err := toStartEndStep(qp)
+	if err != nil {
+		fmt.Fprintf(w, err.Error())
+		return
+	}
+
+	ctx := prom.NewNamedContext(a.PrometheusClient, prom.FrontendContextName)
+	body, err := ctx.RawQueryRange(query, start, end, duration)
+	if err != nil {
+		fmt.Fprintf(w, "Error running query %s. Error: %s", query, err)
+		return
+	}
+
+	w.Write(body)
+}
+
+func (a *Accesses) ThanosQuery(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
+	w.Header().Set("Content-Type", "application/json")
+	w.Header().Set("Access-Control-Allow-Origin", "*")
+
+	if !thanos.IsEnabled() {
+		w.Write(WrapData(nil, fmt.Errorf("ThanosDisabled")))
+		return
+	}
+
+	qp := httputil.NewQueryParams(r.URL.Query())
+	query := qp.Get("query", "")
+	if query == "" {
+		w.Write(WrapData(nil, fmt.Errorf("Query Parameter 'query' is unset'")))
+		return
+	}
+
+	ctx := prom.NewNamedContext(a.ThanosClient, prom.FrontendContextName)
+	body, err := ctx.RawQuery(query)
+	if err != nil {
+		w.Write(WrapData(nil, fmt.Errorf("Error running query %s. Error: %s", query, err)))
+		return
+	}
+
+	w.Write(body)
+}
+
+func (a *Accesses) ThanosQueryRange(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
+	w.Header().Set("Content-Type", "application/json")
+	w.Header().Set("Access-Control-Allow-Origin", "*")
+
+	if !thanos.IsEnabled() {
+		w.Write(WrapData(nil, fmt.Errorf("ThanosDisabled")))
+		return
+	}
+
+	qp := httputil.NewQueryParams(r.URL.Query())
+	query := qp.Get("query", "")
+	if query == "" {
+		fmt.Fprintf(w, "Error parsing query from request parameters.")
+		return
+	}
+
+	start, end, duration, err := toStartEndStep(qp)
+	if err != nil {
+		fmt.Fprintf(w, err.Error())
+		return
+	}
+
+	ctx := prom.NewNamedContext(a.ThanosClient, prom.FrontendContextName)
+	body, err := ctx.RawQueryRange(query, start, end, duration)
+	if err != nil {
+		fmt.Fprintf(w, "Error running query %s. Error: %s", query, err)
+		return
+	}
+
+	w.Write(body)
+}
+
+// helper for query range proxy requests
+func toStartEndStep(qp httputil.QueryParams) (start, end time.Time, step time.Duration, err error) {
+	var e error
+
+	ss := qp.Get("start", "")
+	es := qp.Get("end", "")
+	ds := qp.Get("duration", "")
+	layout := "2006-01-02T15:04:05.000Z"
+
+	start, e = time.Parse(layout, ss)
+	if e != nil {
+		err = fmt.Errorf("Error parsing time %s. Error: %s", ss, err)
+		return
+	}
+	end, e = time.Parse(layout, es)
+	if e != nil {
+		err = fmt.Errorf("Error parsing time %s. Error: %s", es, err)
+		return
+	}
+	step, e = time.ParseDuration(ds)
+	if e != nil {
+		err = fmt.Errorf("Error parsing duration %s. Error: %s", ds, err)
+		return
+	}
+	err = nil
+
+	return
+}
+
 func (a *Accesses) GetPrometheusQueueState(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
 func (a *Accesses) GetPrometheusQueueState(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
 
 
-	promClient := a.GetPrometheusClient(true)
-	queueState, err := prom.GetPrometheusQueueState(promClient)
+	promQueueState, err := prom.GetPrometheusQueueState(a.PrometheusClient)
+	if err != nil {
+		w.Write(WrapData(nil, err))
+		return
+	}
+
+	result := map[string]*prom.PrometheusQueueState{
+		"prometheus": promQueueState,
+	}
 
 
-	w.Write(WrapData(queueState, err))
+	if thanos.IsEnabled() {
+		thanosQueueState, err := prom.GetPrometheusQueueState(a.ThanosClient)
+		if err != nil {
+			log.Warningf("Error getting Thanos queue state: %s", err)
+		} else {
+			result["thanos"] = thanosQueueState
+		}
+	}
+
+	w.Write(WrapData(result, nil))
 }
 }
 
 
 // Creates a new ClusterManager instance using a boltdb storage. If that fails,
 // Creates a new ClusterManager instance using a boltdb storage. If that fails,
@@ -1083,6 +1235,12 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) *Accesses {
 	a.Router.GET("/pricingSourceStatus", a.GetPricingSourceStatus)
 	a.Router.GET("/pricingSourceStatus", a.GetPricingSourceStatus)
 	a.Router.GET("/pricingSourceCounts", a.GetPricingSourceCounts)
 	a.Router.GET("/pricingSourceCounts", a.GetPricingSourceCounts)
 
 
+	// prom query proxies
+	a.Router.GET("/prometheusQuery", a.PrometheusQuery)
+	a.Router.GET("/prometheusQueryRange", a.PrometheusQueryRange)
+	a.Router.GET("/thanosQuery", a.ThanosQuery)
+	a.Router.GET("/thanosQueryRange", a.ThanosQueryRange)
+
 	// diagnostics
 	// diagnostics
 	a.Router.GET("/diagnostics/requestQueue", a.GetPrometheusQueueState)
 	a.Router.GET("/diagnostics/requestQueue", a.GetPrometheusQueueState)
 
 

+ 3 - 0
pkg/prom/contextnames.go

@@ -18,4 +18,7 @@ const (
 
 
 	// ClusterMapContextName is the name we assign the cluster map query context [metadata]
 	// ClusterMapContextName is the name we assign the cluster map query context [metadata]
 	ClusterMapContextName = "cluster-map"
 	ClusterMapContextName = "cluster-map"
+
+	// FrontendContextName is the name we assign queries proxied from the frontend [metadata]
+	FrontendContextName = "frontend"
 )
 )

+ 33 - 12
pkg/prom/query.go

@@ -166,7 +166,8 @@ func runQuery(query string, ctx *Context, resCh QueryResultsChan, profileLabel s
 	resCh <- results
 	resCh <- results
 }
 }
 
 
-func (ctx *Context) query(query string) (interface{}, prometheus.Warnings, error) {
+// RawQuery is a direct query to the prometheus client and returns the body of the response
+func (ctx *Context) RawQuery(query string) ([]byte, error) {
 	u := ctx.Client.URL(epQuery, nil)
 	u := ctx.Client.URL(epQuery, nil)
 	q := u.Query()
 	q := u.Query()
 	q.Set("query", query)
 	q.Set("query", query)
@@ -174,7 +175,7 @@ func (ctx *Context) query(query string) (interface{}, prometheus.Warnings, error
 
 
 	req, err := http.NewRequest(http.MethodPost, u.String(), nil)
 	req, err := http.NewRequest(http.MethodPost, u.String(), nil)
 	if err != nil {
 	if err != nil {
-		return nil, nil, err
+		return nil, err
 	}
 	}
 
 
 	// Set QueryContext name if non empty
 	// Set QueryContext name if non empty
@@ -189,22 +190,32 @@ func (ctx *Context) query(query string) (interface{}, prometheus.Warnings, error
 	resp, body, _, err := ctx.Client.Do(context.Background(), req)
 	resp, body, _, err := ctx.Client.Do(context.Background(), req)
 	if err != nil {
 	if err != nil {
 		if resp == nil {
 		if resp == nil {
-			return nil, nil, fmt.Errorf("query error: '%s' fetching query '%s'", err.Error(), query)
+			return nil, fmt.Errorf("query error: '%s' fetching query '%s'", err.Error(), query)
 		}
 		}
 
 
-		return nil, nil, fmt.Errorf("query error %d: '%s' fetching query '%s'", resp.StatusCode, err.Error(), query)
+		return nil, fmt.Errorf("query error %d: '%s' fetching query '%s'", resp.StatusCode, err.Error(), query)
 	}
 	}
+
 	// Unsuccessful Status Code, log body and status
 	// Unsuccessful Status Code, log body and status
 	statusCode := resp.StatusCode
 	statusCode := resp.StatusCode
 	statusText := http.StatusText(statusCode)
 	statusText := http.StatusText(statusCode)
 	if resp.StatusCode < 200 || resp.StatusCode >= 300 {
 	if resp.StatusCode < 200 || resp.StatusCode >= 300 {
-		return nil, nil, CommErrorf("%d (%s) URL: '%s', Request Headers: '%s', Headers: '%s', Body: '%s' Query: '%s'", statusCode, statusText, req.URL, req.Header, httputil.HeaderString(resp.Header), body, query)
+		return nil, CommErrorf("%d (%s) URL: '%s', Request Headers: '%s', Headers: '%s', Body: '%s' Query: '%s'", statusCode, statusText, req.URL, req.Header, httputil.HeaderString(resp.Header), body, query)
+	}
+
+	return body, err
+}
+
+func (ctx *Context) query(query string) (interface{}, prometheus.Warnings, error) {
+	body, err := ctx.RawQuery(query)
+	if err != nil {
+		return nil, nil, err
 	}
 	}
 
 
 	var toReturn interface{}
 	var toReturn interface{}
 	err = json.Unmarshal(body, &toReturn)
 	err = json.Unmarshal(body, &toReturn)
 	if err != nil {
 	if err != nil {
-		return nil, nil, fmt.Errorf("query error: '%s' fetching query '%s'", err.Error(), query)
+		return nil, nil, fmt.Errorf("Unmarshal Error: %s\nQuery: %s", err, query)
 	}
 	}
 
 
 	warnings := warningsFrom(toReturn)
 	warnings := warningsFrom(toReturn)
@@ -276,7 +287,8 @@ func runQueryRange(query string, start, end time.Time, step time.Duration, ctx *
 	resCh <- results
 	resCh <- results
 }
 }
 
 
-func (ctx *Context) queryRange(query string, start, end time.Time, step time.Duration) (interface{}, prometheus.Warnings, error) {
+// RawQuery is a direct query to the prometheus client and returns the body of the response
+func (ctx *Context) RawQueryRange(query string, start, end time.Time, step time.Duration) ([]byte, error) {
 	u := ctx.Client.URL(epQueryRange, nil)
 	u := ctx.Client.URL(epQueryRange, nil)
 	q := u.Query()
 	q := u.Query()
 	q.Set("query", query)
 	q.Set("query", query)
@@ -287,7 +299,7 @@ func (ctx *Context) queryRange(query string, start, end time.Time, step time.Dur
 
 
 	req, err := http.NewRequest(http.MethodPost, u.String(), nil)
 	req, err := http.NewRequest(http.MethodPost, u.String(), nil)
 	if err != nil {
 	if err != nil {
-		return nil, nil, err
+		return nil, err
 	}
 	}
 
 
 	// Set QueryContext name if non empty
 	// Set QueryContext name if non empty
@@ -302,23 +314,32 @@ func (ctx *Context) queryRange(query string, start, end time.Time, step time.Dur
 	resp, body, _, err := ctx.Client.Do(context.Background(), req)
 	resp, body, _, err := ctx.Client.Do(context.Background(), req)
 	if err != nil {
 	if err != nil {
 		if resp == nil {
 		if resp == nil {
-			return nil, nil, fmt.Errorf("Error: %s, Body: %s Query: %s", err.Error(), body, query)
+			return nil, fmt.Errorf("Error: %s, Body: %s Query: %s", err.Error(), body, query)
 		}
 		}
 
 
-		return nil, nil, fmt.Errorf("%d (%s) Headers: %s Error: %s Body: %s Query: %s", resp.StatusCode, http.StatusText(resp.StatusCode), httputil.HeaderString(resp.Header), body, err.Error(), query)
+		return nil, fmt.Errorf("%d (%s) Headers: %s Error: %s Body: %s Query: %s", resp.StatusCode, http.StatusText(resp.StatusCode), httputil.HeaderString(resp.Header), body, err.Error(), query)
 	}
 	}
 
 
 	// Unsuccessful Status Code, log body and status
 	// Unsuccessful Status Code, log body and status
 	statusCode := resp.StatusCode
 	statusCode := resp.StatusCode
 	statusText := http.StatusText(statusCode)
 	statusText := http.StatusText(statusCode)
 	if resp.StatusCode < 200 || resp.StatusCode >= 300 {
 	if resp.StatusCode < 200 || resp.StatusCode >= 300 {
-		return nil, nil, CommErrorf("%d (%s) Headers: %s, Body: %s Query: %s", statusCode, statusText, httputil.HeaderString(resp.Header), body, query)
+		return nil, CommErrorf("%d (%s) Headers: %s, Body: %s Query: %s", statusCode, statusText, httputil.HeaderString(resp.Header), body, query)
+	}
+
+	return body, err
+}
+
+func (ctx *Context) queryRange(query string, start, end time.Time, step time.Duration) (interface{}, prometheus.Warnings, error) {
+	body, err := ctx.RawQueryRange(query, start, end, step)
+	if err != nil {
+		return nil, nil, err
 	}
 	}
 
 
 	var toReturn interface{}
 	var toReturn interface{}
 	err = json.Unmarshal(body, &toReturn)
 	err = json.Unmarshal(body, &toReturn)
 	if err != nil {
 	if err != nil {
-		return nil, nil, fmt.Errorf("%d (%s) Headers: %s Error: %s Body: %s Query: %s", statusCode, statusText, httputil.HeaderString(resp.Header), err.Error(), body, query)
+		return nil, nil, fmt.Errorf("Unmarshal Error: %s\nQuery: %s", err, query)
 	}
 	}
 
 
 	warnings := warningsFrom(toReturn)
 	warnings := warningsFrom(toReturn)