query.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421
  1. package prom
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "net/url"
  7. "strconv"
  8. "time"
  9. "github.com/opencost/opencost/pkg/env"
  10. "github.com/opencost/opencost/pkg/errors"
  11. "github.com/opencost/opencost/pkg/log"
  12. "github.com/opencost/opencost/pkg/util/httputil"
  13. "github.com/opencost/opencost/pkg/util/json"
  14. prometheus "github.com/prometheus/client_golang/api"
  15. v1 "github.com/prometheus/client_golang/api/prometheus/v1"
  16. )
  17. const (
  18. apiPrefix = "/api/v1"
  19. epQuery = apiPrefix + "/query"
  20. epQueryRange = apiPrefix + "/query_range"
  21. )
  22. // prometheus query offset to apply to each non-range query
  23. // package scope to prevent calling duration parse each use
  24. var promQueryOffset time.Duration = env.GetPrometheusQueryOffset()
  25. // Context wraps a Prometheus client and provides methods for querying and
  26. // parsing query responses and errors.
  27. type Context struct {
  28. Client prometheus.Client
  29. name string
  30. errorCollector *QueryErrorCollector
  31. }
  32. // NewContext creates a new Prometheus querying context from the given client
  33. func NewContext(client prometheus.Client) *Context {
  34. var ec QueryErrorCollector
  35. return &Context{
  36. Client: client,
  37. name: "",
  38. errorCollector: &ec,
  39. }
  40. }
  41. // NewNamedContext creates a new named Prometheus querying context from the given client
  42. func NewNamedContext(client prometheus.Client, name string) *Context {
  43. ctx := NewContext(client)
  44. ctx.name = name
  45. return ctx
  46. }
  47. // Warnings returns the warnings collected from the Context's ErrorCollector
  48. func (ctx *Context) Warnings() []*QueryWarning {
  49. return ctx.errorCollector.Warnings()
  50. }
  51. // HasWarnings returns true if the ErrorCollector has warnings.
  52. func (ctx *Context) HasWarnings() bool {
  53. return ctx.errorCollector.IsWarning()
  54. }
  55. // Errors returns the errors collected from the Context's ErrorCollector.
  56. func (ctx *Context) Errors() []*QueryError {
  57. return ctx.errorCollector.Errors()
  58. }
  59. // HasErrors returns true if the ErrorCollector has errors
  60. func (ctx *Context) HasErrors() bool {
  61. return ctx.errorCollector.IsError()
  62. }
  63. // ErrorCollection returns the aggregation of errors if there exists errors. Otherwise,
  64. // nil is returned
  65. func (ctx *Context) ErrorCollection() error {
  66. if ctx.errorCollector.IsError() {
  67. // errorCollector implements the error interface
  68. return ctx.errorCollector
  69. }
  70. return nil
  71. }
  72. // Query returns a QueryResultsChan, then runs the given query and sends the
  73. // results on the provided channel. Receiver is responsible for closing the
  74. // channel, preferably using the Read method.
  75. func (ctx *Context) Query(query string) QueryResultsChan {
  76. resCh := make(QueryResultsChan)
  77. go runQuery(query, ctx, resCh, time.Now(), "")
  78. return resCh
  79. }
  80. // QueryAtTime returns a QueryResultsChan, then runs the given query at the
  81. // given time (see time parameter here: https://prometheus.io/docs/prometheus/latest/querying/api/#instant-queries)
  82. // and sends the results on the provided channel. Receiver is responsible for
  83. // closing the channel, preferably using the Read method.
  84. func (ctx *Context) QueryAtTime(query string, t time.Time) QueryResultsChan {
  85. resCh := make(QueryResultsChan)
  86. go runQuery(query, ctx, resCh, t, "")
  87. return resCh
  88. }
  89. // ProfileQuery returns a QueryResultsChan, then runs the given query with a profile
  90. // label and sends the results on the provided channel. Receiver is responsible for closing the
  91. // channel, preferably using the Read method.
  92. func (ctx *Context) ProfileQuery(query string, profileLabel string) QueryResultsChan {
  93. resCh := make(QueryResultsChan)
  94. go runQuery(query, ctx, resCh, time.Now(), profileLabel)
  95. return resCh
  96. }
  97. // QueryAll returns one QueryResultsChan for each query provided, then runs
  98. // each query concurrently and returns results on each channel, respectively,
  99. // in the order they were provided; i.e. the response to queries[1] will be
  100. // sent on channel resChs[1].
  101. func (ctx *Context) QueryAll(queries ...string) []QueryResultsChan {
  102. resChs := []QueryResultsChan{}
  103. for _, q := range queries {
  104. resChs = append(resChs, ctx.Query(q))
  105. }
  106. return resChs
  107. }
  108. // ProfileQueryAll returns one QueryResultsChan for each query provided, then runs
  109. // each ProfileQuery concurrently and returns results on each channel, respectively,
  110. // in the order they were provided; i.e. the response to queries[1] will be
  111. // sent on channel resChs[1].
  112. func (ctx *Context) ProfileQueryAll(queries ...string) []QueryResultsChan {
  113. resChs := []QueryResultsChan{}
  114. for _, q := range queries {
  115. resChs = append(resChs, ctx.ProfileQuery(q, fmt.Sprintf("Query #%d", len(resChs)+1)))
  116. }
  117. return resChs
  118. }
  119. func (ctx *Context) QuerySync(query string) ([]*QueryResult, v1.Warnings, error) {
  120. raw, warnings, err := ctx.query(query, time.Now())
  121. if err != nil {
  122. return nil, warnings, err
  123. }
  124. results := NewQueryResults(query, raw)
  125. if results.Error != nil {
  126. return nil, warnings, results.Error
  127. }
  128. return results.Results, warnings, nil
  129. }
  130. // QueryURL returns the URL used to query Prometheus
  131. func (ctx *Context) QueryURL() *url.URL {
  132. return ctx.Client.URL(epQuery, nil)
  133. }
  134. // runQuery executes the prometheus query asynchronously, collects results and
  135. // errors, and passes them through the results channel.
  136. func runQuery(query string, ctx *Context, resCh QueryResultsChan, t time.Time, profileLabel string) {
  137. defer errors.HandlePanic()
  138. startQuery := time.Now()
  139. raw, warnings, requestError := ctx.query(query, t)
  140. results := NewQueryResults(query, raw)
  141. // report all warnings, request, and parse errors (nils will be ignored)
  142. ctx.errorCollector.Report(query, warnings, requestError, results.Error)
  143. if profileLabel != "" {
  144. log.Profile(startQuery, profileLabel)
  145. }
  146. resCh <- results
  147. }
  148. // RawQuery is a direct query to the prometheus client and returns the body of the response
  149. func (ctx *Context) RawQuery(query string, t time.Time) ([]byte, error) {
  150. u := ctx.Client.URL(epQuery, nil)
  151. q := u.Query()
  152. q.Set("query", query)
  153. if t.IsZero() {
  154. t = time.Now()
  155. }
  156. q.Set("time", strconv.FormatInt(t.Unix(), 10))
  157. u.RawQuery = q.Encode()
  158. req, err := http.NewRequest(http.MethodPost, u.String(), nil)
  159. if err != nil {
  160. return nil, err
  161. }
  162. // Set QueryContext name if non empty
  163. if ctx.name != "" {
  164. req = httputil.SetName(req, ctx.name)
  165. }
  166. req = httputil.SetQuery(req, query)
  167. // Note that the warnings return value from client.Do() is always nil using this
  168. // version of the prometheus client library. We parse the warnings out of the response
  169. // body after json decodidng completes.
  170. resp, body, err := ctx.Client.Do(context.Background(), req)
  171. if err != nil {
  172. if resp == nil {
  173. return nil, fmt.Errorf("query error: '%s' fetching query '%s'", err.Error(), query)
  174. }
  175. return nil, fmt.Errorf("query error %d: '%s' fetching query '%s'", resp.StatusCode, err.Error(), query)
  176. }
  177. // Unsuccessful Status Code, log body and status
  178. statusCode := resp.StatusCode
  179. statusText := http.StatusText(statusCode)
  180. if resp.StatusCode < 200 || resp.StatusCode >= 300 {
  181. return nil, CommErrorf("%d (%s) URL: '%s', Request Headers: '%s', Headers: '%s', Body: '%s' Query: '%s'", statusCode, statusText, req.URL, req.Header, httputil.HeaderString(resp.Header), body, query)
  182. }
  183. return body, err
  184. }
  185. func (ctx *Context) query(query string, t time.Time) (interface{}, v1.Warnings, error) {
  186. body, err := ctx.RawQuery(query, t)
  187. if err != nil {
  188. return nil, nil, err
  189. }
  190. var toReturn interface{}
  191. err = json.Unmarshal(body, &toReturn)
  192. if err != nil {
  193. return nil, nil, fmt.Errorf("query '%s' caused unmarshal error: %s", query, err)
  194. }
  195. warnings := warningsFrom(toReturn)
  196. for _, w := range warnings {
  197. // NoStoreAPIWarning is a warning that we would consider an error. It returns partial data relating only to the
  198. // store apis which were reachable. In order to ensure integrity of data across all clusters, we'll need to identify
  199. // this warning and convert it to an error.
  200. if IsNoStoreAPIWarning(w) {
  201. return nil, warnings, CommErrorf("Error: %s, Body: %s, Query: %s", w, body, query)
  202. }
  203. log.Warnf("fetching query '%s': %s", query, w)
  204. }
  205. return toReturn, warnings, nil
  206. }
  207. // isRequestStepAligned will check if the start and end times are aligned with the step
  208. func (ctx *Context) isRequestStepAligned(start, end time.Time, step time.Duration) bool {
  209. startInUnix := start.Unix()
  210. endInUnix := end.Unix()
  211. stepInSeconds := step.Milliseconds() / 1e3
  212. return startInUnix%stepInSeconds == 0 && endInUnix%stepInSeconds == 0
  213. }
  214. func (ctx *Context) QueryRange(query string, start, end time.Time, step time.Duration) QueryResultsChan {
  215. resCh := make(QueryResultsChan)
  216. if !ctx.isRequestStepAligned(start, end, step) {
  217. start, end = ctx.alignWindow(start, end, step)
  218. }
  219. go runQueryRange(query, start, end, step, ctx, resCh, "")
  220. return resCh
  221. }
  222. func (ctx *Context) ProfileQueryRange(query string, start, end time.Time, step time.Duration, profileLabel string) QueryResultsChan {
  223. resCh := make(QueryResultsChan)
  224. go runQueryRange(query, start, end, step, ctx, resCh, profileLabel)
  225. return resCh
  226. }
  227. func (ctx *Context) QueryRangeSync(query string, start, end time.Time, step time.Duration) ([]*QueryResult, v1.Warnings, error) {
  228. raw, warnings, err := ctx.queryRange(query, start, end, step)
  229. if err != nil {
  230. return nil, warnings, err
  231. }
  232. results := NewQueryResults(query, raw)
  233. if results.Error != nil {
  234. return nil, warnings, results.Error
  235. }
  236. return results.Results, warnings, nil
  237. }
  238. // QueryRangeURL returns the URL used to query_range Prometheus
  239. func (ctx *Context) QueryRangeURL() *url.URL {
  240. return ctx.Client.URL(epQueryRange, nil)
  241. }
  242. // runQueryRange executes the prometheus queryRange asynchronously, collects results and
  243. // errors, and passes them through the results channel.
  244. func runQueryRange(query string, start, end time.Time, step time.Duration, ctx *Context, resCh QueryResultsChan, profileLabel string) {
  245. defer errors.HandlePanic()
  246. startQuery := time.Now()
  247. raw, warnings, requestError := ctx.queryRange(query, start, end, step)
  248. results := NewQueryResults(query, raw)
  249. // report all warnings, request, and parse errors (nils will be ignored)
  250. ctx.errorCollector.Report(query, warnings, requestError, results.Error)
  251. if profileLabel != "" {
  252. log.Profile(startQuery, profileLabel)
  253. }
  254. resCh <- results
  255. }
  256. // RawQuery is a direct query to the prometheus client and returns the body of the response
  257. func (ctx *Context) RawQueryRange(query string, start, end time.Time, step time.Duration) ([]byte, error) {
  258. u := ctx.Client.URL(epQueryRange, nil)
  259. q := u.Query()
  260. q.Set("query", query)
  261. q.Set("start", start.Format(time.RFC3339Nano))
  262. q.Set("end", end.Format(time.RFC3339Nano))
  263. q.Set("step", strconv.FormatFloat(step.Seconds(), 'f', 3, 64))
  264. u.RawQuery = q.Encode()
  265. req, err := http.NewRequest(http.MethodPost, u.String(), nil)
  266. if err != nil {
  267. return nil, err
  268. }
  269. // Set QueryContext name if non empty
  270. if ctx.name != "" {
  271. req = httputil.SetName(req, ctx.name)
  272. }
  273. req = httputil.SetQuery(req, query)
  274. // Note that the warnings return value from client.Do() is always nil using this
  275. // version of the prometheus client library. We parse the warnings out of the response
  276. // body after json decodidng completes.
  277. resp, body, err := ctx.Client.Do(context.Background(), req)
  278. if err != nil {
  279. if resp == nil {
  280. return nil, fmt.Errorf("Error: %s, Body: %s Query: %s", err.Error(), body, query)
  281. }
  282. return nil, fmt.Errorf("%d (%s) Headers: %s Error: %s Body: %s Query: %s", resp.StatusCode, http.StatusText(resp.StatusCode), httputil.HeaderString(resp.Header), body, err.Error(), query)
  283. }
  284. // Unsuccessful Status Code, log body and status
  285. statusCode := resp.StatusCode
  286. statusText := http.StatusText(statusCode)
  287. if resp.StatusCode < 200 || resp.StatusCode >= 300 {
  288. return nil, CommErrorf("%d (%s) Headers: %s, Body: %s Query: %s", statusCode, statusText, httputil.HeaderString(resp.Header), body, query)
  289. }
  290. return body, err
  291. }
  292. func (ctx *Context) queryRange(query string, start, end time.Time, step time.Duration) (interface{}, v1.Warnings, error) {
  293. body, err := ctx.RawQueryRange(query, start, end, step)
  294. if err != nil {
  295. return nil, nil, err
  296. }
  297. var toReturn interface{}
  298. err = json.Unmarshal(body, &toReturn)
  299. if err != nil {
  300. return nil, nil, fmt.Errorf("query '%s' caused unmarshal error: %s", query, err)
  301. }
  302. warnings := warningsFrom(toReturn)
  303. for _, w := range warnings {
  304. // NoStoreAPIWarning is a warning that we would consider an error. It returns partial data relating only to the
  305. // store apis which were reachable. In order to ensure integrity of data across all clusters, we'll need to identify
  306. // this warning and convert it to an error.
  307. if IsNoStoreAPIWarning(w) {
  308. return nil, warnings, CommErrorf("Error: %s, Body: %s, Query: %s", w, body, query)
  309. }
  310. log.Warnf("fetching query '%s': %s", query, w)
  311. }
  312. return toReturn, warnings, nil
  313. }
  314. // alignWindow will update the start and end times to be aligned with the step duration.
  315. // Current implementation will always floor the start/end times
  316. func (ctx *Context) alignWindow(start time.Time, end time.Time, step time.Duration) (time.Time, time.Time) {
  317. // Convert the step duration from Milliseconds to Seconds to match the Unix timestamp, which is in seconds
  318. stepInSeconds := step.Milliseconds() / 1e3
  319. alignedStart := (start.Unix() / stepInSeconds) * stepInSeconds
  320. alignedEnd := (end.Unix() / stepInSeconds) * stepInSeconds
  321. return time.Unix(alignedStart, 0).UTC(), time.Unix(alignedEnd, 0).UTC()
  322. }
  323. // Extracts the warnings from the resulting json if they exist (part of the prometheus response api).
  324. func warningsFrom(result interface{}) v1.Warnings {
  325. var warnings v1.Warnings
  326. if resultMap, ok := result.(map[string]interface{}); ok {
  327. if warningProp, ok := resultMap["warnings"]; ok {
  328. if w, ok := warningProp.([]string); ok {
  329. warnings = w
  330. }
  331. }
  332. }
  333. return warnings
  334. }