prom.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402
  1. package prom
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "fmt"
  6. "net"
  7. "net/http"
  8. "net/url"
  9. "os"
  10. "strings"
  11. "sync/atomic"
  12. "time"
  13. "github.com/opencost/opencost/pkg/collections"
  14. "github.com/opencost/opencost/pkg/log"
  15. "github.com/opencost/opencost/pkg/util/fileutil"
  16. "github.com/opencost/opencost/pkg/util/httputil"
  17. "github.com/opencost/opencost/pkg/version"
  18. golog "log"
  19. prometheus "github.com/prometheus/client_golang/api"
  20. )
  21. var UserAgent = fmt.Sprintf("Opencost/%s", version.Version)
  22. //--------------------------------------------------------------------------
  23. // QueryParamsDecorator
  24. //--------------------------------------------------------------------------
  25. // QueryParamsDecorator is used to decorate and return query parameters for
  26. // outgoing requests
  27. type QueryParamsDecorator = func(path string, values url.Values) url.Values
  28. //--------------------------------------------------------------------------
  29. // ClientAuth
  30. //--------------------------------------------------------------------------
  31. // ClientAuth is used to authenticate outgoing client requests.
  32. type ClientAuth struct {
  33. Username string
  34. Password string
  35. BearerToken string
  36. }
  37. // Apply Applies the authentication data to the request headers
  38. func (auth *ClientAuth) Apply(req *http.Request) {
  39. if auth == nil {
  40. return
  41. }
  42. if auth.Username != "" {
  43. req.SetBasicAuth(auth.Username, auth.Password)
  44. }
  45. if auth.BearerToken != "" {
  46. token := "Bearer " + auth.BearerToken
  47. req.Header.Add("Authorization", token)
  48. }
  49. }
  50. //--------------------------------------------------------------------------
  51. // Rate Limit Options
  52. //--------------------------------------------------------------------------
  53. // MaxRetryAfterDuration is the maximum amount of time we should ever wait
  54. // during a retry. This is to prevent starvation on the request threads
  55. const MaxRetryAfterDuration = 10 * time.Second
  56. // RateLimitRetryOpts contains retry options
  57. type RateLimitRetryOpts struct {
  58. MaxRetries int
  59. DefaultRetryWait time.Duration
  60. }
  61. // RateLimitResponseStatus contains the status of the rate limited retries
  62. type RateLimitResponseStatus struct {
  63. RetriesRemaining int
  64. WaitTime time.Duration
  65. }
  66. // String creates a string representation of the rate limit status
  67. func (rtrs *RateLimitResponseStatus) String() string {
  68. return fmt.Sprintf("Wait Time: %.2f seconds, Retries Remaining: %d", rtrs.WaitTime.Seconds(), rtrs.RetriesRemaining)
  69. }
  70. // RateLimitedError contains a list of retry statuses that occurred during
  71. // retries on a rate limited response
  72. type RateLimitedResponseError struct {
  73. RateLimitStatus []*RateLimitResponseStatus
  74. }
  75. // Error returns a string representation of the error, including the rate limit
  76. // status reports
  77. func (rlre *RateLimitedResponseError) Error() string {
  78. var sb strings.Builder
  79. sb.WriteString("Request was Rate Limited and Retries Exhausted:\n")
  80. for _, rls := range rlre.RateLimitStatus {
  81. sb.WriteString(" * ")
  82. sb.WriteString(rls.String())
  83. sb.WriteString("\n")
  84. }
  85. return sb.String()
  86. }
  87. //--------------------------------------------------------------------------
  88. // RateLimitedPrometheusClient
  89. //--------------------------------------------------------------------------
  90. // RateLimitedPrometheusClient is a prometheus client which limits the total number of
  91. // concurrent outbound requests allowed at a given moment.
  92. type RateLimitedPrometheusClient struct {
  93. id string
  94. client prometheus.Client
  95. auth *ClientAuth
  96. queue collections.BlockingQueue[*workRequest]
  97. decorator QueryParamsDecorator
  98. rateLimitRetry *RateLimitRetryOpts
  99. outbound atomic.Int32
  100. fileLogger *golog.Logger
  101. }
  102. // requestCounter is used to determine if the prometheus client keeps track of
  103. // the concurrent outbound requests
  104. type requestCounter interface {
  105. TotalQueuedRequests() int
  106. TotalOutboundRequests() int
  107. }
  108. // NewRateLimitedClient creates a prometheus client which limits the number of concurrent outbound
  109. // prometheus requests.
  110. func NewRateLimitedClient(
  111. id string,
  112. client prometheus.Client,
  113. maxConcurrency int,
  114. auth *ClientAuth,
  115. decorator QueryParamsDecorator,
  116. rateLimitRetryOpts *RateLimitRetryOpts,
  117. queryLogFile string) (prometheus.Client, error) {
  118. queue := collections.NewBlockingQueue[*workRequest]()
  119. var logger *golog.Logger
  120. if queryLogFile != "" {
  121. exists, err := fileutil.FileExists(queryLogFile)
  122. if exists {
  123. os.Remove(queryLogFile)
  124. }
  125. f, err := os.OpenFile(queryLogFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
  126. if err != nil {
  127. log.Infof("Failed to open queryLogFile: %s for query logging: %s", queryLogFile, err)
  128. } else {
  129. logger = golog.New(f, "query-log", golog.LstdFlags)
  130. }
  131. }
  132. // default authentication
  133. if auth == nil {
  134. auth = &ClientAuth{
  135. Username: "",
  136. Password: "",
  137. BearerToken: "",
  138. }
  139. }
  140. rlpc := &RateLimitedPrometheusClient{
  141. id: id,
  142. client: client,
  143. queue: queue,
  144. decorator: decorator,
  145. rateLimitRetry: rateLimitRetryOpts,
  146. auth: auth,
  147. fileLogger: logger,
  148. }
  149. // Start concurrent request processing
  150. for i := 0; i < maxConcurrency; i++ {
  151. go rlpc.worker()
  152. }
  153. return rlpc, nil
  154. }
  155. // ID is used to identify the type of client
  156. func (rlpc *RateLimitedPrometheusClient) ID() string {
  157. return rlpc.id
  158. }
  159. // TotalRequests returns the total number of requests that are either waiting to be sent and/or
  160. // are currently outbound.
  161. func (rlpc *RateLimitedPrometheusClient) TotalQueuedRequests() int {
  162. return rlpc.queue.Length()
  163. }
  164. // TotalOutboundRequests returns the total number of concurrent outbound requests, which have been
  165. // sent to the server and are awaiting response.
  166. func (rlpc *RateLimitedPrometheusClient) TotalOutboundRequests() int {
  167. return int(rlpc.outbound.Load())
  168. }
  169. // Passthrough to the prometheus client API
  170. func (rlpc *RateLimitedPrometheusClient) URL(ep string, args map[string]string) *url.URL {
  171. return rlpc.client.URL(ep, args)
  172. }
  173. // workRequest is used to queue requests
  174. type workRequest struct {
  175. ctx context.Context
  176. req *http.Request
  177. start time.Time
  178. respChan chan *workResponse
  179. // used as a sentinel value to close the worker goroutine
  180. closer bool
  181. // request metadata for diagnostics
  182. contextName string
  183. query string
  184. }
  185. // workResponse is the response payload returned to the Do method
  186. type workResponse struct {
  187. res *http.Response
  188. body []byte
  189. err error
  190. }
  191. // worker is used as a consumer goroutine to pull workRequest from the blocking queue and execute them
  192. func (rlpc *RateLimitedPrometheusClient) worker() {
  193. retryOpts := rlpc.rateLimitRetry
  194. retryRateLimit := retryOpts != nil
  195. for {
  196. // blocks until there is an item available
  197. we := rlpc.queue.Dequeue()
  198. // if we need to shut down all workers, we'll need to submit sentinel values
  199. // that will force the worker to return
  200. if we.closer {
  201. return
  202. }
  203. ctx := we.ctx
  204. req := we.req
  205. // decorate the raw query parameters
  206. if rlpc.decorator != nil {
  207. req.URL.RawQuery = rlpc.decorator(req.URL.Path, req.URL.Query()).Encode()
  208. }
  209. // measure time in queue
  210. timeInQueue := time.Since(we.start)
  211. // Increment outbound counter
  212. rlpc.outbound.Add(1)
  213. // Execute Request
  214. roundTripStart := time.Now()
  215. res, body, err := rlpc.client.Do(ctx, req)
  216. // If retries on rate limited response is enabled:
  217. // * Check for a 429 StatusCode OR 400 StatusCode and message containing "ThrottlingException"
  218. // * Attempt to parse a Retry-After from response headers (common on 429)
  219. // * If we couldn't determine how long to wait for a retry, use 1 second by default
  220. if res != nil && retryRateLimit {
  221. var status []*RateLimitResponseStatus
  222. var retries int = retryOpts.MaxRetries
  223. var defaultWait time.Duration = retryOpts.DefaultRetryWait
  224. for httputil.IsRateLimited(res, body) && retries > 0 {
  225. // calculate amount of time to wait before retry, in the event the default wait is used,
  226. // an exponential backoff is applied based on the number of times we've retried.
  227. retryAfter := httputil.RateLimitedRetryFor(res, defaultWait, retryOpts.MaxRetries-retries)
  228. retries--
  229. status = append(status, &RateLimitResponseStatus{RetriesRemaining: retries, WaitTime: retryAfter})
  230. log.DedupedInfof(50, "Rate Limited Prometheus Request. Waiting for: %d ms. Retries Remaining: %d", retryAfter.Milliseconds(), retries)
  231. // To prevent total starvation of request threads, hard limit wait time to 10s. We also want quota limits/throttles
  232. // to eventually pass through as an error. For example, if some quota is reached with 10 days left, we clearly
  233. // don't want to block for 10 days.
  234. if retryAfter > MaxRetryAfterDuration {
  235. retryAfter = MaxRetryAfterDuration
  236. }
  237. // execute wait and retry
  238. time.Sleep(retryAfter)
  239. res, body, err = rlpc.client.Do(ctx, req)
  240. }
  241. // if we've broken out of our retry loop and the resp is still rate limited,
  242. // then let's generate a meaningful error to pass back
  243. if retries == 0 && httputil.IsRateLimited(res, body) {
  244. err = &RateLimitedResponseError{RateLimitStatus: status}
  245. }
  246. }
  247. // Decrement outbound counter
  248. rlpc.outbound.Add(-1)
  249. LogQueryRequest(rlpc.fileLogger, req, timeInQueue, time.Since(roundTripStart))
  250. // Pass back response data over channel to caller
  251. we.respChan <- &workResponse{
  252. res: res,
  253. body: body,
  254. err: err,
  255. }
  256. }
  257. }
  258. // Rate limit and passthrough to prometheus client API
  259. func (rlpc *RateLimitedPrometheusClient) Do(ctx context.Context, req *http.Request) (*http.Response, []byte, error) {
  260. rlpc.auth.Apply(req)
  261. respChan := make(chan *workResponse)
  262. defer close(respChan)
  263. // request names are used as a debug utility to identify requests in queue
  264. contextName := "<none>"
  265. if n, ok := httputil.GetName(req); ok {
  266. contextName = n
  267. }
  268. query, _ := httputil.GetQuery(req)
  269. rlpc.queue.Enqueue(&workRequest{
  270. ctx: ctx,
  271. req: req,
  272. start: time.Now(),
  273. respChan: respChan,
  274. closer: false,
  275. contextName: contextName,
  276. query: query,
  277. })
  278. workRes := <-respChan
  279. return workRes.res, workRes.body, workRes.err
  280. }
  281. //--------------------------------------------------------------------------
  282. // Client Helpers
  283. //--------------------------------------------------------------------------
  284. // PrometheusClientConfig contains all configurable options for creating a new prometheus client
  285. type PrometheusClientConfig struct {
  286. Timeout time.Duration
  287. KeepAlive time.Duration
  288. TLSHandshakeTimeout time.Duration
  289. TLSInsecureSkipVerify bool
  290. RateLimitRetryOpts *RateLimitRetryOpts
  291. Auth *ClientAuth
  292. QueryConcurrency int
  293. QueryLogFile string
  294. }
  295. // NewPrometheusClient creates a new rate limited client which limits by outbound concurrent requests.
  296. func NewPrometheusClient(address string, config *PrometheusClientConfig) (prometheus.Client, error) {
  297. // may be necessary for long prometheus queries
  298. rt := httputil.NewUserAgentTransport(UserAgent, &http.Transport{
  299. Proxy: http.ProxyFromEnvironment,
  300. DialContext: (&net.Dialer{
  301. Timeout: config.Timeout,
  302. KeepAlive: config.KeepAlive,
  303. }).DialContext,
  304. TLSHandshakeTimeout: config.TLSHandshakeTimeout,
  305. TLSClientConfig: &tls.Config{
  306. InsecureSkipVerify: config.TLSInsecureSkipVerify,
  307. },
  308. })
  309. pc := prometheus.Config{
  310. Address: address,
  311. RoundTripper: rt,
  312. }
  313. client, err := prometheus.NewClient(pc)
  314. if err != nil {
  315. return nil, err
  316. }
  317. return NewRateLimitedClient(
  318. PrometheusClientID,
  319. client,
  320. config.QueryConcurrency,
  321. config.Auth,
  322. nil,
  323. config.RateLimitRetryOpts,
  324. config.QueryLogFile,
  325. )
  326. }
  327. // LogQueryRequest logs the query that was send to prom/thanos with the time in queue and total time after being sent
  328. func LogQueryRequest(l *golog.Logger, req *http.Request, queueTime time.Duration, sendTime time.Duration) {
  329. if l == nil {
  330. return
  331. }
  332. qp := httputil.NewQueryParams(req.URL.Query())
  333. query := qp.Get("query", "<Unknown>")
  334. l.Printf("[Queue: %fs, Outbound: %fs][Query: %s]\n", queueTime.Seconds(), sendTime.Seconds(), query)
  335. }