| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434 |
- package prom
- import (
- "context"
- "crypto/tls"
- "crypto/x509"
- "fmt"
- "net"
- "net/http"
- "net/url"
- "os"
- "strings"
- "sync/atomic"
- "time"
- "github.com/opencost/opencost/core/pkg/collections"
- "github.com/opencost/opencost/core/pkg/log"
- "github.com/opencost/opencost/core/pkg/util/fileutil"
- "github.com/opencost/opencost/core/pkg/util/httputil"
- "github.com/opencost/opencost/core/pkg/version"
- golog "log"
- prometheus "github.com/prometheus/client_golang/api"
- )
- var UserAgent = fmt.Sprintf("Opencost/%s", version.Version)
- //--------------------------------------------------------------------------
- // QueryParamsDecorator
- //--------------------------------------------------------------------------
- // QueryParamsDecorator is used to decorate and return query parameters for
- // outgoing requests
- type QueryParamsDecorator = func(path string, values url.Values) url.Values
- //--------------------------------------------------------------------------
- // ClientAuth
- //--------------------------------------------------------------------------
- // ClientAuth is used to authenticate outgoing client requests.
- type ClientAuth struct {
- Username string
- Password string
- BearerToken string
- }
- // DefaultClientAuth returns a non-nil default ClientAuth instance.
- func DefaultClientAuth() *ClientAuth {
- return &ClientAuth{
- Username: "",
- Password: "",
- BearerToken: "",
- }
- }
- // Apply Applies the authentication data to the request headers
- func (auth *ClientAuth) Apply(req *http.Request) {
- if auth == nil {
- return
- }
- if auth.Username != "" {
- req.SetBasicAuth(auth.Username, auth.Password)
- }
- if auth.BearerToken != "" {
- token := "Bearer " + auth.BearerToken
- req.Header.Add("Authorization", token)
- }
- }
- //--------------------------------------------------------------------------
- // Rate Limit Options
- //--------------------------------------------------------------------------
- // MaxRetryAfterDuration is the maximum amount of time we should ever wait
- // during a retry. This is to prevent starvation on the request threads
- const MaxRetryAfterDuration = 10 * time.Second
- // Default header key for Mimir/Cortex-Tenant API requests
- const HeaderXScopeOrgId = "X-Scope-OrgID"
- // RateLimitRetryOpts contains retry options
- type RateLimitRetryOpts struct {
- MaxRetries int
- DefaultRetryWait time.Duration
- }
- // RateLimitResponseStatus contains the status of the rate limited retries
- type RateLimitResponseStatus struct {
- RetriesRemaining int
- WaitTime time.Duration
- }
- // String creates a string representation of the rate limit status
- func (rtrs *RateLimitResponseStatus) String() string {
- return fmt.Sprintf("Wait Time: %.2f seconds, Retries Remaining: %d", rtrs.WaitTime.Seconds(), rtrs.RetriesRemaining)
- }
- // RateLimitedError contains a list of retry statuses that occurred during
- // retries on a rate limited response
- type RateLimitedResponseError struct {
- RateLimitStatus []*RateLimitResponseStatus
- }
- // Error returns a string representation of the error, including the rate limit
- // status reports
- func (rlre *RateLimitedResponseError) Error() string {
- var sb strings.Builder
- sb.WriteString("Request was Rate Limited and Retries Exhausted:\n")
- for _, rls := range rlre.RateLimitStatus {
- sb.WriteString(" * ")
- sb.WriteString(rls.String())
- sb.WriteString("\n")
- }
- return sb.String()
- }
- //--------------------------------------------------------------------------
- // RateLimitedPrometheusClient
- //--------------------------------------------------------------------------
- // RateLimitedPrometheusClient is a prometheus client which limits the total number of
- // concurrent outbound requests allowed at a given moment.
- type RateLimitedPrometheusClient struct {
- id string
- client prometheus.Client
- auth *ClientAuth
- queue collections.BlockingQueue[*workRequest]
- decorator QueryParamsDecorator
- rateLimitRetry *RateLimitRetryOpts
- outbound atomic.Int32
- fileLogger *golog.Logger
- headerXScopeOrgId string
- }
- // NewRateLimitedClient creates a prometheus client which limits the number of concurrent outbound
- // prometheus requests.
- func NewRateLimitedClient(
- id string,
- client prometheus.Client,
- maxConcurrency int,
- auth *ClientAuth,
- decorator QueryParamsDecorator,
- rateLimitRetryOpts *RateLimitRetryOpts,
- queryLogFile string,
- headerXScopeOrgId string,
- ) (prometheus.Client, error) {
- queue := collections.NewBlockingQueue[*workRequest]()
- var logger *golog.Logger
- if queryLogFile != "" {
- exists, err := fileutil.FileExists(queryLogFile)
- if err != nil {
- log.Infof("Failed to check for existence of queryLogFile: %s: %s", queryLogFile, err)
- }
- if exists {
- err = os.Remove(queryLogFile)
- if err != nil {
- log.Infof("Failed to remove queryLogFile: %s: %s", queryLogFile, err)
- }
- }
- f, err := os.OpenFile(queryLogFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
- if err != nil {
- log.Infof("Failed to open queryLogFile: %s for query logging: %s", queryLogFile, err)
- } else {
- logger = golog.New(f, "query-log", golog.LstdFlags)
- }
- }
- // default authentication
- if auth == nil {
- auth = DefaultClientAuth()
- }
- rlpc := &RateLimitedPrometheusClient{
- id: id,
- client: client,
- queue: queue,
- decorator: decorator,
- rateLimitRetry: rateLimitRetryOpts,
- auth: auth,
- fileLogger: logger,
- headerXScopeOrgId: headerXScopeOrgId,
- }
- // Start concurrent request processing
- for i := 0; i < maxConcurrency; i++ {
- go rlpc.worker()
- }
- return rlpc, nil
- }
- // ID is used to identify the type of client
- func (rlpc *RateLimitedPrometheusClient) ID() string {
- return rlpc.id
- }
- // TotalRequests returns the total number of requests that are either waiting to be sent and/or
- // are currently outbound.
- func (rlpc *RateLimitedPrometheusClient) TotalQueuedRequests() int {
- return rlpc.queue.Length()
- }
- // TotalOutboundRequests returns the total number of concurrent outbound requests, which have been
- // sent to the server and are awaiting response.
- func (rlpc *RateLimitedPrometheusClient) TotalOutboundRequests() int {
- return int(rlpc.outbound.Load())
- }
- // Passthrough to the prometheus client API
- func (rlpc *RateLimitedPrometheusClient) URL(ep string, args map[string]string) *url.URL {
- return rlpc.client.URL(ep, args)
- }
- // EachQueuedRequest provides a mechanism to safely iterate through all queued request and return
- // metadata about each request.
- func (rlpc *RateLimitedPrometheusClient) EachQueuedRequest(f func(ctx string, query string, queueTimeMs int64)) {
- rlpc.queue.Each(func(_ int, req *workRequest) {
- f(req.contextName, req.query, time.Since(req.start).Milliseconds())
- })
- }
- // workRequest is used to queue requests
- type workRequest struct {
- ctx context.Context
- req *http.Request
- start time.Time
- respChan chan *workResponse
- // used as a sentinel value to close the worker goroutine
- closer bool
- // request metadata for diagnostics
- contextName string
- query string
- }
- // workResponse is the response payload returned to the Do method
- type workResponse struct {
- res *http.Response
- body []byte
- err error
- }
- // worker is used as a consumer goroutine to pull workRequest from the blocking queue and execute them
- func (rlpc *RateLimitedPrometheusClient) worker() {
- retryOpts := rlpc.rateLimitRetry
- retryRateLimit := retryOpts != nil
- for {
- // blocks until there is an item available
- we := rlpc.queue.Dequeue()
- // if we need to shut down all workers, we'll need to submit sentinel values
- // that will force the worker to return
- if we.closer {
- return
- }
- ctx := we.ctx
- req := we.req
- // decorate the raw query parameters
- if rlpc.decorator != nil {
- req.URL.RawQuery = rlpc.decorator(req.URL.Path, req.URL.Query()).Encode()
- }
- // measure time in queue
- timeInQueue := time.Since(we.start)
- // Increment outbound counter
- rlpc.outbound.Add(1)
- // Execute Request
- roundTripStart := time.Now()
- res, body, err := rlpc.client.Do(ctx, req)
- // If retries on rate limited response is enabled:
- // * Check for a 429 StatusCode OR 400 StatusCode and message containing "ThrottlingException"
- // * Attempt to parse a Retry-After from response headers (common on 429)
- // * If we couldn't determine how long to wait for a retry, use 1 second by default
- if res != nil && retryRateLimit {
- var status []*RateLimitResponseStatus
- var retries int = retryOpts.MaxRetries
- var defaultWait time.Duration = retryOpts.DefaultRetryWait
- for httputil.IsRateLimited(res, body) && retries > 0 {
- // calculate amount of time to wait before retry, in the event the default wait is used,
- // an exponential backoff is applied based on the number of times we've retried.
- retryAfter := httputil.RateLimitedRetryFor(res, defaultWait, retryOpts.MaxRetries-retries)
- retries--
- status = append(status, &RateLimitResponseStatus{RetriesRemaining: retries, WaitTime: retryAfter})
- log.DedupedInfof(50, "Rate Limited Prometheus Request. Waiting for: %d ms. Retries Remaining: %d", retryAfter.Milliseconds(), retries)
- // To prevent total starvation of request threads, hard limit wait time to 10s. We also want quota limits/throttles
- // to eventually pass through as an error. For example, if some quota is reached with 10 days left, we clearly
- // don't want to block for 10 days.
- if retryAfter > MaxRetryAfterDuration {
- retryAfter = MaxRetryAfterDuration
- }
- // execute wait and retry
- time.Sleep(retryAfter)
- res, body, err = rlpc.client.Do(ctx, req)
- }
- // if we've broken out of our retry loop and the resp is still rate limited,
- // then let's generate a meaningful error to pass back
- if retries == 0 && httputil.IsRateLimited(res, body) {
- err = &RateLimitedResponseError{RateLimitStatus: status}
- }
- }
- // Decrement outbound counter
- rlpc.outbound.Add(-1)
- LogQueryRequest(rlpc.fileLogger, req, timeInQueue, time.Since(roundTripStart))
- // Pass back response data over channel to caller
- we.respChan <- &workResponse{
- res: res,
- body: body,
- err: err,
- }
- }
- }
- // Rate limit and passthrough to prometheus client API
- func (rlpc *RateLimitedPrometheusClient) Do(ctx context.Context, req *http.Request) (*http.Response, []byte, error) {
- if rlpc.headerXScopeOrgId != "" {
- req.Header.Set(HeaderXScopeOrgId, rlpc.headerXScopeOrgId)
- }
- rlpc.auth.Apply(req)
- respChan := make(chan *workResponse)
- defer close(respChan)
- // request names are used as a debug utility to identify requests in queue
- contextName := "<none>"
- if n, ok := httputil.GetName(req); ok {
- contextName = n
- }
- query, _ := httputil.GetQuery(req)
- rlpc.queue.Enqueue(&workRequest{
- ctx: ctx,
- req: req,
- start: time.Now(),
- respChan: respChan,
- closer: false,
- contextName: contextName,
- query: query,
- })
- workRes := <-respChan
- return workRes.res, workRes.body, workRes.err
- }
- //--------------------------------------------------------------------------
- // Client Helpers
- //--------------------------------------------------------------------------
- // PrometheusClientConfig contains all configurable options for creating a new prometheus client
- type PrometheusClientConfig struct {
- Timeout time.Duration
- KeepAlive time.Duration
- TLSHandshakeTimeout time.Duration
- TLSInsecureSkipVerify bool
- RateLimitRetryOpts *RateLimitRetryOpts
- Auth *ClientAuth
- QueryConcurrency int
- QueryLogFile string
- HeaderXScopeOrgId string
- RootCAs *x509.CertPool
- ClientCertificates []tls.Certificate
- }
- // NewPrometheusClient creates a new rate limited client which limits by outbound concurrent requests.
- func NewPrometheusClient(address string, config *PrometheusClientConfig) (prometheus.Client, error) {
- // may be necessary for long prometheus queries
- rt := httputil.NewUserAgentTransport(UserAgent, &http.Transport{
- Proxy: http.ProxyFromEnvironment,
- DialContext: (&net.Dialer{
- Timeout: config.Timeout,
- KeepAlive: config.KeepAlive,
- }).DialContext,
- TLSHandshakeTimeout: config.TLSHandshakeTimeout,
- TLSClientConfig: &tls.Config{
- InsecureSkipVerify: config.TLSInsecureSkipVerify,
- RootCAs: config.RootCAs,
- Certificates: config.ClientCertificates,
- MinVersion: tls.VersionTLS12,
- },
- })
- pc := prometheus.Config{
- Address: address,
- RoundTripper: rt,
- }
- client, err := prometheus.NewClient(pc)
- if err != nil {
- return nil, err
- }
- return NewRateLimitedClient(
- PrometheusClientID,
- client,
- config.QueryConcurrency,
- config.Auth,
- nil,
- config.RateLimitRetryOpts,
- config.QueryLogFile,
- config.HeaderXScopeOrgId,
- )
- }
- // LogQueryRequest logs the query that was send to prom/thanos with the time in queue and total time after being sent
- func LogQueryRequest(l *golog.Logger, req *http.Request, queueTime time.Duration, sendTime time.Duration) {
- if l == nil {
- return
- }
- qp := httputil.NewQueryParams(req.URL.Query())
- query := qp.Get("query", "<Unknown>")
- l.Printf("[Queue: %fs, Outbound: %fs][Query: %s]\n", queueTime.Seconds(), sendTime.Seconds(), query)
- }
|