prom.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441
  1. package prom
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "crypto/x509"
  6. "fmt"
  7. "net"
  8. "net/http"
  9. "net/url"
  10. "os"
  11. "strings"
  12. "sync/atomic"
  13. "time"
  14. "github.com/opencost/opencost/core/pkg/collections"
  15. "github.com/opencost/opencost/core/pkg/log"
  16. "github.com/opencost/opencost/core/pkg/util/fileutil"
  17. "github.com/opencost/opencost/core/pkg/util/httputil"
  18. "github.com/opencost/opencost/core/pkg/version"
  19. "github.com/opencost/opencost/pkg/env"
  20. golog "log"
  21. prometheus "github.com/prometheus/client_golang/api"
  22. restclient "k8s.io/client-go/rest"
  23. certutil "k8s.io/client-go/util/cert"
  24. )
  25. var UserAgent = fmt.Sprintf("Opencost/%s", version.Version)
  26. //--------------------------------------------------------------------------
  27. // QueryParamsDecorator
  28. //--------------------------------------------------------------------------
  29. // QueryParamsDecorator is used to decorate and return query parameters for
  30. // outgoing requests
  31. type QueryParamsDecorator = func(path string, values url.Values) url.Values
  32. //--------------------------------------------------------------------------
  33. // ClientAuth
  34. //--------------------------------------------------------------------------
  35. // ClientAuth is used to authenticate outgoing client requests.
  36. type ClientAuth struct {
  37. Username string
  38. Password string
  39. BearerToken string
  40. }
  41. // Apply Applies the authentication data to the request headers
  42. func (auth *ClientAuth) Apply(req *http.Request) {
  43. if auth == nil {
  44. return
  45. }
  46. if auth.Username != "" {
  47. req.SetBasicAuth(auth.Username, auth.Password)
  48. }
  49. if auth.BearerToken != "" {
  50. token := "Bearer " + auth.BearerToken
  51. req.Header.Add("Authorization", token)
  52. }
  53. }
  54. //--------------------------------------------------------------------------
  55. // Rate Limit Options
  56. //--------------------------------------------------------------------------
  57. // MaxRetryAfterDuration is the maximum amount of time we should ever wait
  58. // during a retry. This is to prevent starvation on the request threads
  59. const MaxRetryAfterDuration = 10 * time.Second
  60. // Default header key for Mimir/Cortex-Tenant API requests
  61. const HeaderXScopeOrgId = "X-Scope-OrgID"
  62. // RateLimitRetryOpts contains retry options
  63. type RateLimitRetryOpts struct {
  64. MaxRetries int
  65. DefaultRetryWait time.Duration
  66. }
  67. // RateLimitResponseStatus contains the status of the rate limited retries
  68. type RateLimitResponseStatus struct {
  69. RetriesRemaining int
  70. WaitTime time.Duration
  71. }
  72. // String creates a string representation of the rate limit status
  73. func (rtrs *RateLimitResponseStatus) String() string {
  74. return fmt.Sprintf("Wait Time: %.2f seconds, Retries Remaining: %d", rtrs.WaitTime.Seconds(), rtrs.RetriesRemaining)
  75. }
  76. // RateLimitedError contains a list of retry statuses that occurred during
  77. // retries on a rate limited response
  78. type RateLimitedResponseError struct {
  79. RateLimitStatus []*RateLimitResponseStatus
  80. }
  81. // Error returns a string representation of the error, including the rate limit
  82. // status reports
  83. func (rlre *RateLimitedResponseError) Error() string {
  84. var sb strings.Builder
  85. sb.WriteString("Request was Rate Limited and Retries Exhausted:\n")
  86. for _, rls := range rlre.RateLimitStatus {
  87. sb.WriteString(" * ")
  88. sb.WriteString(rls.String())
  89. sb.WriteString("\n")
  90. }
  91. return sb.String()
  92. }
  93. //--------------------------------------------------------------------------
  94. // RateLimitedPrometheusClient
  95. //--------------------------------------------------------------------------
  96. // RateLimitedPrometheusClient is a prometheus client which limits the total number of
  97. // concurrent outbound requests allowed at a given moment.
  98. type RateLimitedPrometheusClient struct {
  99. id string
  100. client prometheus.Client
  101. auth *ClientAuth
  102. queue collections.BlockingQueue[*workRequest]
  103. decorator QueryParamsDecorator
  104. rateLimitRetry *RateLimitRetryOpts
  105. outbound atomic.Int32
  106. fileLogger *golog.Logger
  107. headerXScopeOrgId string
  108. }
  109. // requestCounter is used to determine if the prometheus client keeps track of
  110. // the concurrent outbound requests
  111. type requestCounter interface {
  112. TotalQueuedRequests() int
  113. TotalOutboundRequests() int
  114. }
  115. // NewRateLimitedClient creates a prometheus client which limits the number of concurrent outbound
  116. // prometheus requests.
  117. func NewRateLimitedClient(
  118. id string,
  119. client prometheus.Client,
  120. maxConcurrency int,
  121. auth *ClientAuth,
  122. decorator QueryParamsDecorator,
  123. rateLimitRetryOpts *RateLimitRetryOpts,
  124. queryLogFile string,
  125. headerXScopeOrgId string) (prometheus.Client, error) {
  126. queue := collections.NewBlockingQueue[*workRequest]()
  127. var logger *golog.Logger
  128. if queryLogFile != "" {
  129. exists, err := fileutil.FileExists(queryLogFile)
  130. if err != nil {
  131. log.Infof("Failed to check for existence of queryLogFile: %s: %s", queryLogFile, err)
  132. }
  133. if exists {
  134. err = os.Remove(queryLogFile)
  135. if err != nil {
  136. log.Infof("Failed to remove queryLogFile: %s: %s", queryLogFile, err)
  137. }
  138. }
  139. f, err := os.OpenFile(queryLogFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
  140. if err != nil {
  141. log.Infof("Failed to open queryLogFile: %s for query logging: %s", queryLogFile, err)
  142. } else {
  143. logger = golog.New(f, "query-log", golog.LstdFlags)
  144. }
  145. }
  146. // default authentication
  147. if auth == nil {
  148. auth = &ClientAuth{
  149. Username: "",
  150. Password: "",
  151. BearerToken: "",
  152. }
  153. }
  154. rlpc := &RateLimitedPrometheusClient{
  155. id: id,
  156. client: client,
  157. queue: queue,
  158. decorator: decorator,
  159. rateLimitRetry: rateLimitRetryOpts,
  160. auth: auth,
  161. fileLogger: logger,
  162. headerXScopeOrgId: headerXScopeOrgId,
  163. }
  164. // Start concurrent request processing
  165. for i := 0; i < maxConcurrency; i++ {
  166. go rlpc.worker()
  167. }
  168. return rlpc, nil
  169. }
  170. // ID is used to identify the type of client
  171. func (rlpc *RateLimitedPrometheusClient) ID() string {
  172. return rlpc.id
  173. }
  174. // TotalRequests returns the total number of requests that are either waiting to be sent and/or
  175. // are currently outbound.
  176. func (rlpc *RateLimitedPrometheusClient) TotalQueuedRequests() int {
  177. return rlpc.queue.Length()
  178. }
  179. // TotalOutboundRequests returns the total number of concurrent outbound requests, which have been
  180. // sent to the server and are awaiting response.
  181. func (rlpc *RateLimitedPrometheusClient) TotalOutboundRequests() int {
  182. return int(rlpc.outbound.Load())
  183. }
  184. // Passthrough to the prometheus client API
  185. func (rlpc *RateLimitedPrometheusClient) URL(ep string, args map[string]string) *url.URL {
  186. return rlpc.client.URL(ep, args)
  187. }
  188. // workRequest is used to queue requests
  189. type workRequest struct {
  190. ctx context.Context
  191. req *http.Request
  192. start time.Time
  193. respChan chan *workResponse
  194. // used as a sentinel value to close the worker goroutine
  195. closer bool
  196. // request metadata for diagnostics
  197. contextName string
  198. query string
  199. }
  200. // workResponse is the response payload returned to the Do method
  201. type workResponse struct {
  202. res *http.Response
  203. body []byte
  204. err error
  205. }
  206. // worker is used as a consumer goroutine to pull workRequest from the blocking queue and execute them
  207. func (rlpc *RateLimitedPrometheusClient) worker() {
  208. retryOpts := rlpc.rateLimitRetry
  209. retryRateLimit := retryOpts != nil
  210. for {
  211. // blocks until there is an item available
  212. we := rlpc.queue.Dequeue()
  213. // if we need to shut down all workers, we'll need to submit sentinel values
  214. // that will force the worker to return
  215. if we.closer {
  216. return
  217. }
  218. ctx := we.ctx
  219. req := we.req
  220. // decorate the raw query parameters
  221. if rlpc.decorator != nil {
  222. req.URL.RawQuery = rlpc.decorator(req.URL.Path, req.URL.Query()).Encode()
  223. }
  224. // measure time in queue
  225. timeInQueue := time.Since(we.start)
  226. // Increment outbound counter
  227. rlpc.outbound.Add(1)
  228. // Execute Request
  229. roundTripStart := time.Now()
  230. res, body, err := rlpc.client.Do(ctx, req)
  231. // If retries on rate limited response is enabled:
  232. // * Check for a 429 StatusCode OR 400 StatusCode and message containing "ThrottlingException"
  233. // * Attempt to parse a Retry-After from response headers (common on 429)
  234. // * If we couldn't determine how long to wait for a retry, use 1 second by default
  235. if res != nil && retryRateLimit {
  236. var status []*RateLimitResponseStatus
  237. var retries int = retryOpts.MaxRetries
  238. var defaultWait time.Duration = retryOpts.DefaultRetryWait
  239. for httputil.IsRateLimited(res, body) && retries > 0 {
  240. // calculate amount of time to wait before retry, in the event the default wait is used,
  241. // an exponential backoff is applied based on the number of times we've retried.
  242. retryAfter := httputil.RateLimitedRetryFor(res, defaultWait, retryOpts.MaxRetries-retries)
  243. retries--
  244. status = append(status, &RateLimitResponseStatus{RetriesRemaining: retries, WaitTime: retryAfter})
  245. log.DedupedInfof(50, "Rate Limited Prometheus Request. Waiting for: %d ms. Retries Remaining: %d", retryAfter.Milliseconds(), retries)
  246. // To prevent total starvation of request threads, hard limit wait time to 10s. We also want quota limits/throttles
  247. // to eventually pass through as an error. For example, if some quota is reached with 10 days left, we clearly
  248. // don't want to block for 10 days.
  249. if retryAfter > MaxRetryAfterDuration {
  250. retryAfter = MaxRetryAfterDuration
  251. }
  252. // execute wait and retry
  253. time.Sleep(retryAfter)
  254. res, body, err = rlpc.client.Do(ctx, req)
  255. }
  256. // if we've broken out of our retry loop and the resp is still rate limited,
  257. // then let's generate a meaningful error to pass back
  258. if retries == 0 && httputil.IsRateLimited(res, body) {
  259. err = &RateLimitedResponseError{RateLimitStatus: status}
  260. }
  261. }
  262. // Decrement outbound counter
  263. rlpc.outbound.Add(-1)
  264. LogQueryRequest(rlpc.fileLogger, req, timeInQueue, time.Since(roundTripStart))
  265. // Pass back response data over channel to caller
  266. we.respChan <- &workResponse{
  267. res: res,
  268. body: body,
  269. err: err,
  270. }
  271. }
  272. }
  273. // Rate limit and passthrough to prometheus client API
  274. func (rlpc *RateLimitedPrometheusClient) Do(ctx context.Context, req *http.Request) (*http.Response, []byte, error) {
  275. if rlpc.headerXScopeOrgId != "" {
  276. req.Header.Set(HeaderXScopeOrgId, rlpc.headerXScopeOrgId)
  277. }
  278. rlpc.auth.Apply(req)
  279. respChan := make(chan *workResponse)
  280. defer close(respChan)
  281. // request names are used as a debug utility to identify requests in queue
  282. contextName := "<none>"
  283. if n, ok := httputil.GetName(req); ok {
  284. contextName = n
  285. }
  286. query, _ := httputil.GetQuery(req)
  287. rlpc.queue.Enqueue(&workRequest{
  288. ctx: ctx,
  289. req: req,
  290. start: time.Now(),
  291. respChan: respChan,
  292. closer: false,
  293. contextName: contextName,
  294. query: query,
  295. })
  296. workRes := <-respChan
  297. return workRes.res, workRes.body, workRes.err
  298. }
  299. //--------------------------------------------------------------------------
  300. // Client Helpers
  301. //--------------------------------------------------------------------------
  302. // PrometheusClientConfig contains all configurable options for creating a new prometheus client
  303. type PrometheusClientConfig struct {
  304. Timeout time.Duration
  305. KeepAlive time.Duration
  306. TLSHandshakeTimeout time.Duration
  307. TLSInsecureSkipVerify bool
  308. RateLimitRetryOpts *RateLimitRetryOpts
  309. Auth *ClientAuth
  310. QueryConcurrency int
  311. QueryLogFile string
  312. HeaderXScopeOrgId string
  313. }
  314. // NewPrometheusClient creates a new rate limited client which limits by outbound concurrent requests.
  315. func NewPrometheusClient(address string, config *PrometheusClientConfig) (prometheus.Client, error) {
  316. var tlsCaCert *x509.CertPool
  317. // We will use the service account token and service-ca.crt to authenticate with the Prometheus server via kube-rbac-proxy.
  318. // We need to ensure that the service account has the necessary permissions to access the Prometheus server by binding it to the appropriate role.
  319. if env.IsKubeRbacProxyEnabled() {
  320. restConfig, err := restclient.InClusterConfig()
  321. if err != nil {
  322. log.Errorf("KUBE_RBAC_PROXY_ENABLED was set to true but failed to get in-cluster config: %s", err)
  323. }
  324. config.Auth.BearerToken = restConfig.BearerToken
  325. tlsCaCert, err = certutil.NewPool(`/var/run/secrets/kubernetes.io/serviceaccount/service-ca.crt`)
  326. if err != nil {
  327. log.Errorf("KUBE_RBAC_PROXY_ENABLED was set to true but failed to load service-ca.crt: %s", err)
  328. }
  329. }
  330. // may be necessary for long prometheus queries
  331. rt := httputil.NewUserAgentTransport(UserAgent, &http.Transport{
  332. Proxy: http.ProxyFromEnvironment,
  333. DialContext: (&net.Dialer{
  334. Timeout: config.Timeout,
  335. KeepAlive: config.KeepAlive,
  336. }).DialContext,
  337. TLSHandshakeTimeout: config.TLSHandshakeTimeout,
  338. TLSClientConfig: &tls.Config{
  339. InsecureSkipVerify: config.TLSInsecureSkipVerify,
  340. RootCAs: tlsCaCert,
  341. },
  342. })
  343. pc := prometheus.Config{
  344. Address: address,
  345. RoundTripper: rt,
  346. }
  347. client, err := prometheus.NewClient(pc)
  348. if err != nil {
  349. return nil, err
  350. }
  351. return NewRateLimitedClient(
  352. PrometheusClientID,
  353. client,
  354. config.QueryConcurrency,
  355. config.Auth,
  356. nil,
  357. config.RateLimitRetryOpts,
  358. config.QueryLogFile,
  359. config.HeaderXScopeOrgId,
  360. )
  361. }
  362. // LogQueryRequest logs the query that was send to prom/thanos with the time in queue and total time after being sent
  363. func LogQueryRequest(l *golog.Logger, req *http.Request, queueTime time.Duration, sendTime time.Duration) {
  364. if l == nil {
  365. return
  366. }
  367. qp := httputil.NewQueryParams(req.URL.Query())
  368. query := qp.Get("query", "<Unknown>")
  369. l.Printf("[Queue: %fs, Outbound: %fs][Query: %s]\n", queueTime.Seconds(), sendTime.Seconds(), query)
  370. }