| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115 |
- package prom
- import (
- "context"
- "encoding/json"
- "fmt"
- "net/http"
- "github.com/kubecost/cost-model/pkg/errors"
- "github.com/kubecost/cost-model/pkg/util"
- prometheus "github.com/prometheus/client_golang/api"
- "k8s.io/klog"
- )
- const (
- apiPrefix = "/api/v1"
- epQuery = apiPrefix + "/query"
- )
- // Context wraps a Prometheus client and provides methods for querying and
- // parsing query responses and errors.
- type Context struct {
- Client prometheus.Client
- ErrorCollector *errors.ErrorCollector
- semaphore *util.Semaphore
- }
- // NewContext creates a new Promethues querying context from the given client
- func NewContext(client prometheus.Client) *Context {
- var ec errors.ErrorCollector
- // By deafult, allow 20 concurrent queries, which is the Prometheus default
- sem := util.NewSemaphore(20)
- return &Context{
- Client: client,
- ErrorCollector: &ec,
- semaphore: sem,
- }
- }
- // Errors returns the errors collected from the Context's ErrorCollector
- func (ctx *Context) Errors() []error {
- return ctx.ErrorCollector.Errors()
- }
- // TODO SetMaxConcurrency
- // QueryAll returns one QueryResultsChan for each query provided, then runs
- // each query concurrently and returns results on each channel, respectively,
- // in the order they were provided; i.e. the response to queries[1] will be
- // sent on channel resChs[1].
- func (ctx *Context) QueryAll(queries ...string) []QueryResultsChan {
- resChs := []QueryResultsChan{}
- for _, q := range queries {
- resChs = append(resChs, ctx.Query(q))
- }
- return resChs
- }
- // Query returns a QueryResultsChan, then runs the given query and sends the
- // results on the provided channel. Receiver is responsible for closing the
- // channel, preferably using the Read method.
- func (ctx *Context) Query(query string) QueryResultsChan {
- resCh := make(QueryResultsChan)
- go func(ctx *Context, resCh QueryResultsChan) {
- defer errors.HandlePanic()
- raw, promErr := ctx.query(query)
- ctx.ErrorCollector.Report(promErr)
- results, parseErr := NewQueryResults(raw)
- ctx.ErrorCollector.Report(parseErr)
- resCh <- results
- }(ctx, resCh)
- return resCh
- }
- func (ctx *Context) query(query string) (interface{}, error) {
- ctx.semaphore.Acquire()
- defer ctx.semaphore.Return()
- u := ctx.Client.URL(epQuery, nil)
- q := u.Query()
- q.Set("query", query)
- u.RawQuery = q.Encode()
- req, err := http.NewRequest(http.MethodPost, u.String(), nil)
- if err != nil {
- return nil, err
- }
- resp, body, warnings, err := ctx.Client.Do(context.Background(), req)
- for _, w := range warnings {
- klog.V(3).Infof("Warning '%s' fetching query '%s'", w, query)
- }
- if err != nil {
- if resp == nil {
- return nil, fmt.Errorf("Error %s fetching query %s", err.Error(), query)
- }
- return nil, fmt.Errorf("%d Error %s fetching query %s", resp.StatusCode, err.Error(), query)
- }
- var toReturn interface{}
- err = json.Unmarshal(body, &toReturn)
- if err != nil {
- return nil, fmt.Errorf("Error %s fetching query %s", err.Error(), query)
- }
- return toReturn, nil
- }
|