prom.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. package prom
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "net"
  6. "net/http"
  7. "net/url"
  8. "os"
  9. "time"
  10. "github.com/kubecost/cost-model/pkg/env"
  11. "github.com/kubecost/cost-model/pkg/log"
  12. "github.com/kubecost/cost-model/pkg/util"
  13. golog "log"
  14. prometheus "github.com/prometheus/client_golang/api"
  15. )
  16. //--------------------------------------------------------------------------
  17. // QueryParamsDecorator
  18. //--------------------------------------------------------------------------
  19. // QueryParamsDecorator is used to decorate and return query parameters for
  20. // outgoing requests
  21. type QueryParamsDecorator = func(path string, values url.Values) url.Values
  22. //--------------------------------------------------------------------------
  23. // ClientAuth
  24. //--------------------------------------------------------------------------
  25. // ClientAuth is used to authenticate outgoing client requests.
  26. type ClientAuth struct {
  27. Username string
  28. Password string
  29. BearerToken string
  30. }
  31. // Apply Applies the authentication data to the request headers
  32. func (auth *ClientAuth) Apply(req *http.Request) {
  33. if auth == nil {
  34. return
  35. }
  36. if auth.Username != "" {
  37. req.SetBasicAuth(auth.Username, auth.Password)
  38. }
  39. if auth.BearerToken != "" {
  40. token := "Bearer " + auth.BearerToken
  41. req.Header.Add("Authorization", token)
  42. }
  43. }
  44. //--------------------------------------------------------------------------
  45. // RateLimitedPrometheusClient
  46. //--------------------------------------------------------------------------
  47. // RateLimitedPrometheusClient is a prometheus client which limits the total number of
  48. // concurrent outbound requests allowed at a given moment.
  49. type RateLimitedPrometheusClient struct {
  50. id string
  51. client prometheus.Client
  52. auth *ClientAuth
  53. queue util.BlockingQueue
  54. decorator QueryParamsDecorator
  55. outbound *util.AtomicInt32
  56. fileLogger *golog.Logger
  57. }
  58. // requestCounter is used to determine if the prometheus client keeps track of
  59. // the concurrent outbound requests
  60. type requestCounter interface {
  61. TotalRequests() int
  62. TotalOutboundRequests() int
  63. }
  64. // NewRateLimitedClient creates a prometheus client which limits the number of concurrent outbound
  65. // prometheus requests.
  66. func NewRateLimitedClient(id string, config prometheus.Config, maxConcurrency int, auth *ClientAuth, decorator QueryParamsDecorator, queryLogFile string) (prometheus.Client, error) {
  67. c, err := prometheus.NewClient(config)
  68. if err != nil {
  69. return nil, err
  70. }
  71. queue := util.NewBlockingQueue()
  72. outbound := util.NewAtomicInt32(0)
  73. var logger *golog.Logger
  74. if queryLogFile != "" {
  75. exists, err := util.FileExists(queryLogFile)
  76. if exists {
  77. os.Remove(queryLogFile)
  78. }
  79. f, err := os.OpenFile(queryLogFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
  80. if err != nil {
  81. log.Infof("Failed to open queryLogFile: %s for query logging: %s", queryLogFile, err)
  82. } else {
  83. logger = golog.New(f, "query-log", golog.LstdFlags)
  84. }
  85. }
  86. rlpc := &RateLimitedPrometheusClient{
  87. id: id,
  88. client: c,
  89. queue: queue,
  90. decorator: decorator,
  91. outbound: outbound,
  92. auth: auth,
  93. fileLogger: logger,
  94. }
  95. // Start concurrent request processing
  96. for i := 0; i < maxConcurrency; i++ {
  97. go rlpc.worker()
  98. }
  99. return rlpc, nil
  100. }
  101. // ID is used to identify the type of client
  102. func (rlpc *RateLimitedPrometheusClient) ID() string {
  103. return rlpc.id
  104. }
  105. // TotalRequests returns the total number of requests that are either waiting to be sent and/or
  106. // are currently outbound.
  107. func (rlpc *RateLimitedPrometheusClient) TotalRequests() int {
  108. return rlpc.queue.Length()
  109. }
  110. // TotalOutboundRequests returns the total number of concurrent outbound requests, which have been
  111. // sent to the server and are awaiting response.
  112. func (rlpc *RateLimitedPrometheusClient) TotalOutboundRequests() int {
  113. return int(rlpc.outbound.Get())
  114. }
  115. // Passthrough to the prometheus client API
  116. func (rlpc *RateLimitedPrometheusClient) URL(ep string, args map[string]string) *url.URL {
  117. return rlpc.client.URL(ep, args)
  118. }
  119. // workRequest is used to queue requests
  120. type workRequest struct {
  121. ctx context.Context
  122. req *http.Request
  123. start time.Time
  124. respChan chan *workResponse
  125. // used as a sentinel value to close the worker goroutine
  126. closer bool
  127. }
  128. // workResponse is the response payload returned to the Do method
  129. type workResponse struct {
  130. res *http.Response
  131. body []byte
  132. warnings prometheus.Warnings
  133. err error
  134. }
  135. // worker is used as a consumer goroutine to pull workRequest from the blocking queue and execute them
  136. func (rlpc *RateLimitedPrometheusClient) worker() {
  137. for {
  138. // blocks until there is an item available
  139. item := rlpc.queue.Dequeue()
  140. // Ensure the dequeued item was a workRequest
  141. if we, ok := item.(*workRequest); ok {
  142. // if we need to shut down all workers, we'll need to submit sentinel values
  143. // that will force the worker to return
  144. if we.closer {
  145. return
  146. }
  147. ctx := we.ctx
  148. req := we.req
  149. // decorate the raw query parameters
  150. if rlpc.decorator != nil {
  151. req.URL.RawQuery = rlpc.decorator(req.URL.Path, req.URL.Query()).Encode()
  152. }
  153. // measure time in queue
  154. timeInQueue := time.Since(we.start)
  155. // Increment outbound counter
  156. rlpc.outbound.Increment()
  157. // Execute Request
  158. roundTripStart := time.Now()
  159. res, body, warnings, err := rlpc.client.Do(ctx, req)
  160. // Decrement outbound counter
  161. rlpc.outbound.Decrement()
  162. LogQueryRequest(rlpc.fileLogger, req, timeInQueue, time.Since(roundTripStart))
  163. // Pass back response data over channel to caller
  164. we.respChan <- &workResponse{
  165. res: res,
  166. body: body,
  167. warnings: warnings,
  168. err: err,
  169. }
  170. }
  171. }
  172. }
  173. // Rate limit and passthrough to prometheus client API
  174. func (rlpc *RateLimitedPrometheusClient) Do(ctx context.Context, req *http.Request) (*http.Response, []byte, prometheus.Warnings, error) {
  175. rlpc.auth.Apply(req)
  176. respChan := make(chan *workResponse)
  177. defer close(respChan)
  178. rlpc.queue.Enqueue(&workRequest{
  179. ctx: ctx,
  180. req: req,
  181. start: time.Now(),
  182. respChan: respChan,
  183. closer: false,
  184. })
  185. workRes := <-respChan
  186. return workRes.res, workRes.body, workRes.warnings, workRes.err
  187. }
  188. //--------------------------------------------------------------------------
  189. // Client Helpers
  190. //--------------------------------------------------------------------------
  191. func NewPrometheusClient(address string, timeout, keepAlive time.Duration, queryConcurrency int, queryLogFile string) (prometheus.Client, error) {
  192. tlsConfig := &tls.Config{InsecureSkipVerify: env.GetInsecureSkipVerify()}
  193. // may be necessary for long prometheus queries. TODO: make this configurable
  194. pc := prometheus.Config{
  195. Address: address,
  196. RoundTripper: &http.Transport{
  197. Proxy: http.ProxyFromEnvironment,
  198. DialContext: (&net.Dialer{
  199. Timeout: timeout,
  200. KeepAlive: keepAlive,
  201. }).DialContext,
  202. TLSHandshakeTimeout: 10 * time.Second,
  203. TLSClientConfig: tlsConfig,
  204. },
  205. }
  206. auth := &ClientAuth{
  207. Username: env.GetDBBasicAuthUsername(),
  208. Password: env.GetDBBasicAuthUserPassword(),
  209. BearerToken: env.GetDBBearerToken(),
  210. }
  211. return NewRateLimitedClient(PrometheusClientID, pc, queryConcurrency, auth, nil, queryLogFile)
  212. }
  213. // LogPrometheusClientState logs the current state, with respect to outbound requests, if that
  214. // information is available.
  215. func LogPrometheusClientState(client prometheus.Client) {
  216. if rc, ok := client.(requestCounter); ok {
  217. total := rc.TotalRequests()
  218. outbound := rc.TotalOutboundRequests()
  219. queued := total - outbound
  220. log.Infof("Outbound Requests: %d, Queued Requests: %d, Total Requests: %d", outbound, queued, total)
  221. }
  222. }
  223. // LogQueryRequest logs the query that was send to prom/thanos with the time in queue and total time after being sent
  224. func LogQueryRequest(l *golog.Logger, req *http.Request, queueTime time.Duration, sendTime time.Duration) {
  225. if l == nil {
  226. return
  227. }
  228. qp := util.NewQueryParams(req.URL.Query())
  229. query := qp.Get("query", "<Unknown>")
  230. l.Printf("[Queue: %fs, Outbound: %fs][Query: %s]\n", queueTime.Seconds(), sendTime.Seconds(), query)
  231. }