|
|
@@ -10,6 +10,7 @@ import (
|
|
|
"time"
|
|
|
|
|
|
"github.com/kubecost/cost-model/pkg/errors"
|
|
|
+ "github.com/kubecost/cost-model/pkg/log"
|
|
|
"github.com/kubecost/cost-model/pkg/util"
|
|
|
prometheus "github.com/prometheus/client_golang/api"
|
|
|
"k8s.io/klog"
|
|
|
@@ -54,19 +55,18 @@ func (ctx *Context) HasErrors() bool {
|
|
|
func (ctx *Context) Query(query string) QueryResultsChan {
|
|
|
resCh := make(QueryResultsChan)
|
|
|
|
|
|
- go func(ctx *Context, resCh QueryResultsChan) {
|
|
|
- defer errors.HandlePanic()
|
|
|
+ go runQuery(query, ctx, resCh, "")
|
|
|
|
|
|
- raw, promErr := ctx.query(query)
|
|
|
- ctx.ErrorCollector.Report(promErr)
|
|
|
+ return resCh
|
|
|
+}
|
|
|
|
|
|
- results := NewQueryResults(query, raw)
|
|
|
- if results.Error != nil {
|
|
|
- ctx.ErrorCollector.Report(results.Error)
|
|
|
- }
|
|
|
+// ProfileQuery returns a QueryResultsChan, then runs the given query with a profile
|
|
|
+// label and sends the results on the provided channel. Receiver is responsible for closing the
|
|
|
+// channel, preferably using the Read method.
|
|
|
+func (ctx *Context) ProfileQuery(query string, profileLabel string) QueryResultsChan {
|
|
|
+ resCh := make(QueryResultsChan)
|
|
|
|
|
|
- resCh <- results
|
|
|
- }(ctx, resCh)
|
|
|
+ go runQuery(query, ctx, resCh, profileLabel)
|
|
|
|
|
|
return resCh
|
|
|
}
|
|
|
@@ -85,6 +85,20 @@ func (ctx *Context) QueryAll(queries ...string) []QueryResultsChan {
|
|
|
return resChs
|
|
|
}
|
|
|
|
|
|
+// ProfileQueryAll returns one QueryResultsChan for each query provided, then runs
|
|
|
+// each ProfileQuery concurrently and returns results on each channel, respectively,
|
|
|
+// in the order they were provided; i.e. the response to queries[1] will be
|
|
|
+// sent on channel resChs[1].
|
|
|
+func (ctx *Context) ProfileQueryAll(queries ...string) []QueryResultsChan {
|
|
|
+ resChs := []QueryResultsChan{}
|
|
|
+
|
|
|
+ for _, q := range queries {
|
|
|
+ resChs = append(resChs, ctx.ProfileQuery(q, fmt.Sprintf("Query #%d", len(resChs)+1)))
|
|
|
+ }
|
|
|
+
|
|
|
+ return resChs
|
|
|
+}
|
|
|
+
|
|
|
func (ctx *Context) QuerySync(query string) ([]*QueryResult, error) {
|
|
|
raw, err := ctx.query(query)
|
|
|
if err != nil {
|
|
|
@@ -104,6 +118,27 @@ func (ctx *Context) QueryURL() *url.URL {
|
|
|
return ctx.Client.URL(epQuery, nil)
|
|
|
}
|
|
|
|
|
|
+// runQuery executes the prometheus query asynchronously, collects results and
|
|
|
+// errors, and passes them through the results channel.
|
|
|
+func runQuery(query string, ctx *Context, resCh QueryResultsChan, profileLabel string) {
|
|
|
+ defer errors.HandlePanic()
|
|
|
+ startQuery := time.Now()
|
|
|
+
|
|
|
+ raw, promErr := ctx.query(query)
|
|
|
+ ctx.ErrorCollector.Report(promErr)
|
|
|
+
|
|
|
+ results := NewQueryResults(query, raw)
|
|
|
+ if results.Error != nil {
|
|
|
+ ctx.ErrorCollector.Report(results.Error)
|
|
|
+ }
|
|
|
+
|
|
|
+ if profileLabel != "" {
|
|
|
+ log.Profile(startQuery, profileLabel)
|
|
|
+ }
|
|
|
+
|
|
|
+ resCh <- results
|
|
|
+}
|
|
|
+
|
|
|
func (ctx *Context) query(query string) (interface{}, error) {
|
|
|
u := ctx.Client.URL(epQuery, nil)
|
|
|
q := u.Query()
|
|
|
@@ -139,19 +174,15 @@ func (ctx *Context) query(query string) (interface{}, error) {
|
|
|
func (ctx *Context) QueryRange(query string, start, end time.Time, step time.Duration) QueryResultsChan {
|
|
|
resCh := make(QueryResultsChan)
|
|
|
|
|
|
- go func(ctx *Context, resCh QueryResultsChan) {
|
|
|
- defer errors.HandlePanic()
|
|
|
+ go runQueryRange(query, start, end, step, ctx, resCh, "")
|
|
|
|
|
|
- raw, promErr := ctx.queryRange(query, start, end, step)
|
|
|
- ctx.ErrorCollector.Report(promErr)
|
|
|
+ return resCh
|
|
|
+}
|
|
|
|
|
|
- results := NewQueryResults(query, raw)
|
|
|
- if results.Error != nil {
|
|
|
- ctx.ErrorCollector.Report(results.Error)
|
|
|
- }
|
|
|
+func (ctx *Context) ProfileQueryRange(query string, start, end time.Time, step time.Duration, profileLabel string) QueryResultsChan {
|
|
|
+ resCh := make(QueryResultsChan)
|
|
|
|
|
|
- resCh <- results
|
|
|
- }(ctx, resCh)
|
|
|
+ go runQueryRange(query, start, end, step, ctx, resCh, profileLabel)
|
|
|
|
|
|
return resCh
|
|
|
}
|
|
|
@@ -175,6 +206,27 @@ func (ctx *Context) QueryRangeURL() *url.URL {
|
|
|
return ctx.Client.URL(epQueryRange, nil)
|
|
|
}
|
|
|
|
|
|
+// runQueryRange executes the prometheus queryRange asynchronously, collects results and
|
|
|
+// errors, and passes them through the results channel.
|
|
|
+func runQueryRange(query string, start, end time.Time, step time.Duration, ctx *Context, resCh QueryResultsChan, profileLabel string) {
|
|
|
+ defer errors.HandlePanic()
|
|
|
+ startQuery := time.Now()
|
|
|
+
|
|
|
+ raw, promErr := ctx.queryRange(query, start, end, step)
|
|
|
+ ctx.ErrorCollector.Report(promErr)
|
|
|
+
|
|
|
+ results := NewQueryResults(query, raw)
|
|
|
+ if results.Error != nil {
|
|
|
+ ctx.ErrorCollector.Report(results.Error)
|
|
|
+ }
|
|
|
+
|
|
|
+ if profileLabel != "" {
|
|
|
+ log.Profile(startQuery, profileLabel)
|
|
|
+ }
|
|
|
+
|
|
|
+ resCh <- results
|
|
|
+}
|
|
|
+
|
|
|
func (ctx *Context) queryRange(query string, start, end time.Time, step time.Duration) (interface{}, error) {
|
|
|
u := ctx.Client.URL(epQueryRange, nil)
|
|
|
q := u.Query()
|