prom.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. package prom
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "net"
  6. "net/http"
  7. "net/url"
  8. "os"
  9. "time"
  10. "github.com/kubecost/cost-model/pkg/collections"
  11. "github.com/kubecost/cost-model/pkg/env"
  12. "github.com/kubecost/cost-model/pkg/log"
  13. "github.com/kubecost/cost-model/pkg/util/atomic"
  14. "github.com/kubecost/cost-model/pkg/util/fileutil"
  15. "github.com/kubecost/cost-model/pkg/util/httputil"
  16. golog "log"
  17. prometheus "github.com/prometheus/client_golang/api"
  18. )
  19. //--------------------------------------------------------------------------
  20. // QueryParamsDecorator
  21. //--------------------------------------------------------------------------
  22. // QueryParamsDecorator is used to decorate and return query parameters for
  23. // outgoing requests
  24. type QueryParamsDecorator = func(path string, values url.Values) url.Values
  25. //--------------------------------------------------------------------------
  26. // ClientAuth
  27. //--------------------------------------------------------------------------
  28. // ClientAuth is used to authenticate outgoing client requests.
  29. type ClientAuth struct {
  30. Username string
  31. Password string
  32. BearerToken string
  33. }
  34. // Apply Applies the authentication data to the request headers
  35. func (auth *ClientAuth) Apply(req *http.Request) {
  36. if auth == nil {
  37. return
  38. }
  39. if auth.Username != "" {
  40. req.SetBasicAuth(auth.Username, auth.Password)
  41. }
  42. if auth.BearerToken != "" {
  43. token := "Bearer " + auth.BearerToken
  44. req.Header.Add("Authorization", token)
  45. }
  46. }
  47. //--------------------------------------------------------------------------
  48. // RateLimitedPrometheusClient
  49. //--------------------------------------------------------------------------
  50. // RateLimitedPrometheusClient is a prometheus client which limits the total number of
  51. // concurrent outbound requests allowed at a given moment.
  52. type RateLimitedPrometheusClient struct {
  53. id string
  54. client prometheus.Client
  55. auth *ClientAuth
  56. queue collections.BlockingQueue
  57. decorator QueryParamsDecorator
  58. outbound *atomic.AtomicInt32
  59. fileLogger *golog.Logger
  60. }
  61. // requestCounter is used to determine if the prometheus client keeps track of
  62. // the concurrent outbound requests
  63. type requestCounter interface {
  64. TotalQueuedRequests() int
  65. TotalOutboundRequests() int
  66. }
  67. // NewRateLimitedClient creates a prometheus client which limits the number of concurrent outbound
  68. // prometheus requests.
  69. func NewRateLimitedClient(id string, config prometheus.Config, maxConcurrency int, auth *ClientAuth, decorator QueryParamsDecorator, queryLogFile string) (prometheus.Client, error) {
  70. c, err := prometheus.NewClient(config)
  71. if err != nil {
  72. return nil, err
  73. }
  74. queue := collections.NewBlockingQueue()
  75. outbound := atomic.NewAtomicInt32(0)
  76. var logger *golog.Logger
  77. if queryLogFile != "" {
  78. exists, err := fileutil.FileExists(queryLogFile)
  79. if exists {
  80. os.Remove(queryLogFile)
  81. }
  82. f, err := os.OpenFile(queryLogFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
  83. if err != nil {
  84. log.Infof("Failed to open queryLogFile: %s for query logging: %s", queryLogFile, err)
  85. } else {
  86. logger = golog.New(f, "query-log", golog.LstdFlags)
  87. }
  88. }
  89. rlpc := &RateLimitedPrometheusClient{
  90. id: id,
  91. client: c,
  92. queue: queue,
  93. decorator: decorator,
  94. outbound: outbound,
  95. auth: auth,
  96. fileLogger: logger,
  97. }
  98. // Start concurrent request processing
  99. for i := 0; i < maxConcurrency; i++ {
  100. go rlpc.worker()
  101. }
  102. return rlpc, nil
  103. }
  104. // ID is used to identify the type of client
  105. func (rlpc *RateLimitedPrometheusClient) ID() string {
  106. return rlpc.id
  107. }
  108. // TotalRequests returns the total number of requests that are either waiting to be sent and/or
  109. // are currently outbound.
  110. func (rlpc *RateLimitedPrometheusClient) TotalQueuedRequests() int {
  111. return rlpc.queue.Length()
  112. }
  113. // TotalOutboundRequests returns the total number of concurrent outbound requests, which have been
  114. // sent to the server and are awaiting response.
  115. func (rlpc *RateLimitedPrometheusClient) TotalOutboundRequests() int {
  116. return int(rlpc.outbound.Get())
  117. }
  118. // Passthrough to the prometheus client API
  119. func (rlpc *RateLimitedPrometheusClient) URL(ep string, args map[string]string) *url.URL {
  120. return rlpc.client.URL(ep, args)
  121. }
  122. // workRequest is used to queue requests
  123. type workRequest struct {
  124. ctx context.Context
  125. req *http.Request
  126. start time.Time
  127. respChan chan *workResponse
  128. // used as a sentinel value to close the worker goroutine
  129. closer bool
  130. // request metadata for diagnostics
  131. contextName string
  132. query string
  133. }
  134. // workResponse is the response payload returned to the Do method
  135. type workResponse struct {
  136. res *http.Response
  137. body []byte
  138. warnings prometheus.Warnings
  139. err error
  140. }
  141. // worker is used as a consumer goroutine to pull workRequest from the blocking queue and execute them
  142. func (rlpc *RateLimitedPrometheusClient) worker() {
  143. for {
  144. // blocks until there is an item available
  145. item := rlpc.queue.Dequeue()
  146. // Ensure the dequeued item was a workRequest
  147. if we, ok := item.(*workRequest); ok {
  148. // if we need to shut down all workers, we'll need to submit sentinel values
  149. // that will force the worker to return
  150. if we.closer {
  151. return
  152. }
  153. ctx := we.ctx
  154. req := we.req
  155. // decorate the raw query parameters
  156. if rlpc.decorator != nil {
  157. req.URL.RawQuery = rlpc.decorator(req.URL.Path, req.URL.Query()).Encode()
  158. }
  159. // measure time in queue
  160. timeInQueue := time.Since(we.start)
  161. // Increment outbound counter
  162. rlpc.outbound.Increment()
  163. // Execute Request
  164. roundTripStart := time.Now()
  165. res, body, warnings, err := rlpc.client.Do(ctx, req)
  166. // Decrement outbound counter
  167. rlpc.outbound.Decrement()
  168. LogQueryRequest(rlpc.fileLogger, req, timeInQueue, time.Since(roundTripStart))
  169. // Pass back response data over channel to caller
  170. we.respChan <- &workResponse{
  171. res: res,
  172. body: body,
  173. warnings: warnings,
  174. err: err,
  175. }
  176. }
  177. }
  178. }
  179. // Rate limit and passthrough to prometheus client API
  180. func (rlpc *RateLimitedPrometheusClient) Do(ctx context.Context, req *http.Request) (*http.Response, []byte, prometheus.Warnings, error) {
  181. rlpc.auth.Apply(req)
  182. respChan := make(chan *workResponse)
  183. defer close(respChan)
  184. // request names are used as a debug utility to identify requests in queue
  185. contextName := "<none>"
  186. if n, ok := httputil.GetName(req); ok {
  187. contextName = n
  188. }
  189. query, _ := httputil.GetQuery(req)
  190. rlpc.queue.Enqueue(&workRequest{
  191. ctx: ctx,
  192. req: req,
  193. start: time.Now(),
  194. respChan: respChan,
  195. closer: false,
  196. contextName: contextName,
  197. query: query,
  198. })
  199. workRes := <-respChan
  200. return workRes.res, workRes.body, workRes.warnings, workRes.err
  201. }
  202. //--------------------------------------------------------------------------
  203. // Client Helpers
  204. //--------------------------------------------------------------------------
  205. func NewPrometheusClient(address string, timeout, keepAlive time.Duration, queryConcurrency int, queryLogFile string) (prometheus.Client, error) {
  206. tlsConfig := &tls.Config{InsecureSkipVerify: env.GetInsecureSkipVerify()}
  207. // may be necessary for long prometheus queries. TODO: make this configurable
  208. pc := prometheus.Config{
  209. Address: address,
  210. RoundTripper: &http.Transport{
  211. Proxy: http.ProxyFromEnvironment,
  212. DialContext: (&net.Dialer{
  213. Timeout: timeout,
  214. KeepAlive: keepAlive,
  215. }).DialContext,
  216. TLSHandshakeTimeout: 10 * time.Second,
  217. TLSClientConfig: tlsConfig,
  218. },
  219. }
  220. auth := &ClientAuth{
  221. Username: env.GetDBBasicAuthUsername(),
  222. Password: env.GetDBBasicAuthUserPassword(),
  223. BearerToken: env.GetDBBearerToken(),
  224. }
  225. return NewRateLimitedClient(PrometheusClientID, pc, queryConcurrency, auth, nil, queryLogFile)
  226. }
  227. // LogQueryRequest logs the query that was send to prom/thanos with the time in queue and total time after being sent
  228. func LogQueryRequest(l *golog.Logger, req *http.Request, queueTime time.Duration, sendTime time.Duration) {
  229. if l == nil {
  230. return
  231. }
  232. qp := httputil.NewQueryParams(req.URL.Query())
  233. query := qp.Get("query", "<Unknown>")
  234. l.Printf("[Queue: %fs, Outbound: %fs][Query: %s]\n", queueTime.Seconds(), sendTime.Seconds(), query)
  235. }