query.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463
  1. package prom
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "net/url"
  7. "strconv"
  8. "time"
  9. "github.com/opencost/opencost/core/pkg/log"
  10. "github.com/opencost/opencost/core/pkg/source"
  11. "github.com/opencost/opencost/core/pkg/util/httputil"
  12. "github.com/opencost/opencost/core/pkg/util/json"
  13. "github.com/opencost/opencost/core/pkg/errors"
  14. prometheus "github.com/prometheus/client_golang/api"
  15. v1 "github.com/prometheus/client_golang/api/prometheus/v1"
  16. )
  17. const (
  18. epQuery = apiPrefix + "/query"
  19. epQueryRange = apiPrefix + "/query_range"
  20. )
  21. // ContextFactory is a factory for creating new Contexts for prometheus queries.
  22. type ContextFactory struct {
  23. client prometheus.Client
  24. config *OpenCostPrometheusConfig
  25. }
  26. // NewContextFactory creates a new ContextFactory with the provided prometheus client.
  27. func NewContextFactory(client prometheus.Client, promConfig *OpenCostPrometheusConfig) *ContextFactory {
  28. return &ContextFactory{
  29. client: client,
  30. config: promConfig,
  31. }
  32. }
  33. // NewContext creates a new prometheus query context.
  34. func (cf *ContextFactory) NewContext() *Context {
  35. return NewContext(cf.client, cf.config)
  36. }
  37. // NewContext creates a new named prometheus query context.
  38. func (cf *ContextFactory) NewNamedContext(name string) *Context {
  39. return NewNamedContext(cf.client, cf.config, name)
  40. }
  41. // Context wraps a Prometheus client and provides methods for querying and
  42. // parsing query responses and errors.
  43. type Context struct {
  44. Client prometheus.Client
  45. config *OpenCostPrometheusConfig
  46. name string
  47. errorCollector *source.QueryErrorCollector
  48. }
  49. // NewContext creates a new Prometheus querying context from the given client
  50. func NewContext(client prometheus.Client, config *OpenCostPrometheusConfig) *Context {
  51. var ec source.QueryErrorCollector
  52. return &Context{
  53. Client: client,
  54. config: config,
  55. name: "",
  56. errorCollector: &ec,
  57. }
  58. }
  59. // NewNamedContext creates a new named Prometheus querying context from the given client
  60. func NewNamedContext(client prometheus.Client, config *OpenCostPrometheusConfig, name string) *Context {
  61. ctx := NewContext(client, config)
  62. ctx.name = name
  63. return ctx
  64. }
  65. // Warnings returns the warnings collected from the Context's ErrorCollector
  66. func (ctx *Context) Warnings() []*source.QueryWarning {
  67. return ctx.errorCollector.Warnings()
  68. }
  69. // HasWarnings returns true if the ErrorCollector has warnings.
  70. func (ctx *Context) HasWarnings() bool {
  71. return ctx.errorCollector.IsWarning()
  72. }
  73. // Errors returns the errors collected from the Context's ErrorCollector.
  74. func (ctx *Context) Errors() []*source.QueryError {
  75. return ctx.errorCollector.Errors()
  76. }
  77. // HasErrors returns true if the ErrorCollector has errors
  78. func (ctx *Context) HasErrors() bool {
  79. return ctx.errorCollector.IsError()
  80. }
  81. // ErrorCollection returns the aggregation of errors if there exists errors. Otherwise,
  82. // nil is returned
  83. func (ctx *Context) ErrorCollection() error {
  84. if ctx.errorCollector.IsError() {
  85. // errorCollector implements the error interface
  86. return ctx.errorCollector
  87. }
  88. return nil
  89. }
  90. // Query returns a QueryResultsChan, then runs the given query and sends the
  91. // results on the provided channel. Receiver is responsible for closing the
  92. // channel, preferably using the Read method.
  93. func (ctx *Context) Query(query string) source.QueryResultsChan {
  94. resCh := make(source.QueryResultsChan)
  95. go runQuery(query, ctx, resCh, time.Now(), "")
  96. return resCh
  97. }
  98. // QueryAtTime returns a QueryResultsChan, then runs the given query at the
  99. // given time (see time parameter here: https://prometheus.io/docs/prometheus/latest/querying/api/#instant-queries)
  100. // and sends the results on the provided channel. Receiver is responsible for
  101. // closing the channel, preferably using the Read method.
  102. func (ctx *Context) QueryAtTime(query string, t time.Time) source.QueryResultsChan {
  103. resCh := make(source.QueryResultsChan)
  104. go runQuery(query, ctx, resCh, t, "")
  105. return resCh
  106. }
  107. // ProfileQuery returns a QueryResultsChan, then runs the given query with a profile
  108. // label and sends the results on the provided channel. Receiver is responsible for closing the
  109. // channel, preferably using the Read method.
  110. func (ctx *Context) ProfileQuery(query string, profileLabel string) source.QueryResultsChan {
  111. resCh := make(source.QueryResultsChan)
  112. go runQuery(query, ctx, resCh, time.Now(), profileLabel)
  113. return resCh
  114. }
  115. // QueryAll returns one QueryResultsChan for each query provided, then runs
  116. // each query concurrently and returns results on each channel, respectively,
  117. // in the order they were provided; i.e. the response to queries[1] will be
  118. // sent on channel resChs[1].
  119. func (ctx *Context) QueryAll(queries ...string) []source.QueryResultsChan {
  120. resChs := []source.QueryResultsChan{}
  121. for _, q := range queries {
  122. resChs = append(resChs, ctx.Query(q))
  123. }
  124. return resChs
  125. }
  126. // ProfileQueryAll returns one QueryResultsChan for each query provided, then runs
  127. // each ProfileQuery concurrently and returns results on each channel, respectively,
  128. // in the order they were provided; i.e. the response to queries[1] will be
  129. // sent on channel resChs[1].
  130. func (ctx *Context) ProfileQueryAll(queries ...string) []source.QueryResultsChan {
  131. resChs := []source.QueryResultsChan{}
  132. for _, q := range queries {
  133. resChs = append(resChs, ctx.ProfileQuery(q, fmt.Sprintf("Query #%d", len(resChs)+1)))
  134. }
  135. return resChs
  136. }
  137. func (ctx *Context) QuerySync(query string) ([]*source.QueryResult, v1.Warnings, error) {
  138. raw, warnings, err := ctx.query(query, time.Now())
  139. if err != nil {
  140. return nil, warnings, err
  141. }
  142. results := NewQueryResults(query, raw, ctx.config.LabelMapping)
  143. if results.Error != nil {
  144. return nil, warnings, results.Error
  145. }
  146. return results.Results, warnings, nil
  147. }
  148. // QueryURL returns the URL used to query Prometheus
  149. func (ctx *Context) QueryURL() *url.URL {
  150. return ctx.Client.URL(epQuery, nil)
  151. }
  152. // runQuery executes the prometheus query asynchronously, collects results and
  153. // errors, and passes them through the results channel.
  154. func runQuery(query string, ctx *Context, resCh source.QueryResultsChan, t time.Time, profileLabel string) {
  155. defer errors.HandlePanic()
  156. startQuery := time.Now()
  157. raw, warnings, requestError := ctx.query(query, t)
  158. var parseError error
  159. var results *source.QueryResults
  160. if requestError != nil {
  161. results = NewQueryResultError(query, requestError)
  162. } else {
  163. results = NewQueryResults(query, raw, ctx.config.LabelMapping)
  164. parseError = results.Error
  165. }
  166. // report all warnings, request, and parse errors (nils will be ignored)
  167. ctx.errorCollector.Report(query, warnings, requestError, parseError)
  168. if profileLabel != "" {
  169. log.Profile(startQuery, profileLabel)
  170. }
  171. resCh <- results
  172. }
  173. // RawQuery is a direct query to the prometheus client and returns the body of the response
  174. func (ctx *Context) RawQuery(query string, t time.Time) ([]byte, error) {
  175. u := ctx.Client.URL(epQuery, nil)
  176. q := u.Query()
  177. q.Set("query", query)
  178. if t.IsZero() {
  179. t = time.Now()
  180. }
  181. q.Set("time", strconv.FormatInt(t.Unix(), 10))
  182. u.RawQuery = q.Encode()
  183. req, err := http.NewRequest(http.MethodPost, u.String(), nil)
  184. if err != nil {
  185. return nil, err
  186. }
  187. // Set QueryContext name if non empty
  188. if ctx.name != "" {
  189. req = httputil.SetName(req, ctx.name)
  190. }
  191. req = httputil.SetQuery(req, query)
  192. // Note that the warnings return value from client.Do() is always nil using this
  193. // version of the prometheus client library. We parse the warnings out of the response
  194. // body after json decodidng completes.
  195. resp, body, err := ctx.Client.Do(context.Background(), req)
  196. if err != nil {
  197. if resp == nil {
  198. return nil, fmt.Errorf("query error: '%s' fetching query '%s'", err.Error(), query)
  199. }
  200. return nil, fmt.Errorf("query error %d: '%s' fetching query '%s'", resp.StatusCode, err.Error(), query)
  201. }
  202. // Unsuccessful Status Code, log body and status
  203. statusCode := resp.StatusCode
  204. statusText := http.StatusText(statusCode)
  205. if resp.StatusCode < 200 || resp.StatusCode >= 300 {
  206. return nil, source.CommErrorf("%d (%s) URL: '%s', Body: '%s' Query: '%s'", statusCode, statusText, req.URL, body, query)
  207. }
  208. return body, err
  209. }
  210. func (ctx *Context) query(query string, t time.Time) (interface{}, v1.Warnings, error) {
  211. body, err := ctx.RawQuery(query, t)
  212. if err != nil {
  213. return nil, nil, err
  214. }
  215. var toReturn interface{}
  216. err = json.Unmarshal(body, &toReturn)
  217. if err != nil {
  218. return nil, nil, fmt.Errorf("query '%s' caused unmarshal error: %s", query, err)
  219. }
  220. warnings := warningsFrom(toReturn)
  221. for _, w := range warnings {
  222. // NoStoreAPIWarning is a warning that we would consider an error. It returns partial data relating only to the
  223. // store apis which were reachable. In order to ensure integrity of data across all clusters, we'll need to identify
  224. // this warning and convert it to an error.
  225. if source.IsNoStoreAPIWarning(w) {
  226. return nil, warnings, source.CommErrorf("Error: %s, Body: %s, Query: %s", w, body, query)
  227. }
  228. log.Warnf("fetching query '%s': %s", query, w)
  229. }
  230. return toReturn, warnings, nil
  231. }
  232. // isRequestStepAligned will check if the start and end times are aligned with the step
  233. func (ctx *Context) isRequestStepAligned(start, end time.Time, step time.Duration) bool {
  234. startInUnix := start.Unix()
  235. endInUnix := end.Unix()
  236. stepInSeconds := step.Milliseconds() / 1e3
  237. return startInUnix%stepInSeconds == 0 && endInUnix%stepInSeconds == 0
  238. }
  239. func (ctx *Context) QueryRange(query string, start, end time.Time, step time.Duration) source.QueryResultsChan {
  240. resCh := make(source.QueryResultsChan)
  241. if !ctx.isRequestStepAligned(start, end, step) {
  242. start, end = ctx.alignWindow(start, end, step)
  243. }
  244. go runQueryRange(query, start, end, step, ctx, resCh, "")
  245. return resCh
  246. }
  247. func (ctx *Context) ProfileQueryRange(query string, start, end time.Time, step time.Duration, profileLabel string) source.QueryResultsChan {
  248. resCh := make(source.QueryResultsChan)
  249. go runQueryRange(query, start, end, step, ctx, resCh, profileLabel)
  250. return resCh
  251. }
  252. func (ctx *Context) QueryRangeSync(query string, start, end time.Time, step time.Duration) ([]*source.QueryResult, v1.Warnings, error) {
  253. raw, warnings, err := ctx.queryRange(query, start, end, step)
  254. if err != nil {
  255. return nil, warnings, err
  256. }
  257. results := NewQueryResults(query, raw, ctx.config.LabelMapping)
  258. if results.Error != nil {
  259. return nil, warnings, results.Error
  260. }
  261. return results.Results, warnings, nil
  262. }
  263. // QueryRangeURL returns the URL used to query_range Prometheus
  264. func (ctx *Context) QueryRangeURL() *url.URL {
  265. return ctx.Client.URL(epQueryRange, nil)
  266. }
  267. // runQueryRange executes the prometheus queryRange asynchronously, collects results and
  268. // errors, and passes them through the results channel.
  269. func runQueryRange(query string, start, end time.Time, step time.Duration, ctx *Context, resCh source.QueryResultsChan, profileLabel string) {
  270. defer errors.HandlePanic()
  271. startQuery := time.Now()
  272. raw, warnings, requestError := ctx.queryRange(query, start, end, step)
  273. var parseError error
  274. var results *source.QueryResults
  275. if requestError != nil {
  276. results = NewQueryResultError(query, requestError)
  277. } else {
  278. results = NewQueryResults(query, raw, ctx.config.LabelMapping)
  279. parseError = results.Error
  280. }
  281. // report all warnings, request, and parse errors (nils will be ignored)
  282. ctx.errorCollector.Report(query, warnings, requestError, parseError)
  283. if profileLabel != "" {
  284. log.Profile(startQuery, profileLabel)
  285. }
  286. resCh <- results
  287. }
  288. // RawQuery is a direct query to the prometheus client and returns the body of the response
  289. func (ctx *Context) RawQueryRange(query string, start, end time.Time, step time.Duration) ([]byte, error) {
  290. u := ctx.Client.URL(epQueryRange, nil)
  291. q := u.Query()
  292. q.Set("query", query)
  293. q.Set("start", start.Format(time.RFC3339Nano))
  294. q.Set("end", end.Format(time.RFC3339Nano))
  295. q.Set("step", strconv.FormatFloat(step.Seconds(), 'f', 3, 64))
  296. u.RawQuery = q.Encode()
  297. req, err := http.NewRequest(http.MethodPost, u.String(), nil)
  298. if err != nil {
  299. return nil, err
  300. }
  301. // Set QueryContext name if non empty
  302. if ctx.name != "" {
  303. req = httputil.SetName(req, ctx.name)
  304. }
  305. req = httputil.SetQuery(req, query)
  306. // Note that the warnings return value from client.Do() is always nil using this
  307. // version of the prometheus client library. We parse the warnings out of the response
  308. // body after json decodidng completes.
  309. resp, body, err := ctx.Client.Do(context.Background(), req)
  310. if err != nil {
  311. if resp == nil {
  312. return nil, fmt.Errorf("Error: %s, Body: %s Query: %s", err.Error(), body, query)
  313. }
  314. return nil, fmt.Errorf("%d (%s) Error: %s Body: %s Query: %s", resp.StatusCode, http.StatusText(resp.StatusCode), body, err.Error(), query)
  315. }
  316. // Unsuccessful Status Code, log body and status
  317. statusCode := resp.StatusCode
  318. statusText := http.StatusText(statusCode)
  319. if resp.StatusCode < 200 || resp.StatusCode >= 300 {
  320. return nil, source.CommErrorf("%d (%s) Body: %s Query: %s", statusCode, statusText, body, query)
  321. }
  322. return body, err
  323. }
  324. func (ctx *Context) queryRange(query string, start, end time.Time, step time.Duration) (interface{}, v1.Warnings, error) {
  325. body, err := ctx.RawQueryRange(query, start, end, step)
  326. if err != nil {
  327. return nil, nil, err
  328. }
  329. var toReturn interface{}
  330. err = json.Unmarshal(body, &toReturn)
  331. if err != nil {
  332. return nil, nil, fmt.Errorf("query '%s' caused unmarshal error: %s", query, err)
  333. }
  334. warnings := warningsFrom(toReturn)
  335. for _, w := range warnings {
  336. // NoStoreAPIWarning is a warning that we would consider an error. It returns partial data relating only to the
  337. // store apis which were reachable. In order to ensure integrity of data across all clusters, we'll need to identify
  338. // this warning and convert it to an error.
  339. if source.IsNoStoreAPIWarning(w) {
  340. return nil, warnings, source.CommErrorf("Error: %s, Body: %s, Query: %s", w, body, query)
  341. }
  342. log.Warnf("fetching query '%s': %s", query, w)
  343. }
  344. return toReturn, warnings, nil
  345. }
  346. // alignWindow will update the start and end times to be aligned with the step duration.
  347. // Current implementation will always floor the start/end times
  348. func (ctx *Context) alignWindow(start time.Time, end time.Time, step time.Duration) (time.Time, time.Time) {
  349. // Convert the step duration from Milliseconds to Seconds to match the Unix timestamp, which is in seconds
  350. stepInSeconds := step.Milliseconds() / 1e3
  351. alignedStart := (start.Unix() / stepInSeconds) * stepInSeconds
  352. alignedEnd := (end.Unix() / stepInSeconds) * stepInSeconds
  353. return time.Unix(alignedStart, 0).UTC(), time.Unix(alignedEnd, 0).UTC()
  354. }
  355. // Extracts the warnings from the resulting json if they exist (part of the prometheus response api).
  356. func warningsFrom(result interface{}) v1.Warnings {
  357. var warnings v1.Warnings
  358. if resultMap, ok := result.(map[string]interface{}); ok {
  359. if warningProp, ok := resultMap["warnings"]; ok {
  360. if w, ok := warningProp.([]string); ok {
  361. warnings = w
  362. }
  363. }
  364. }
  365. return warnings
  366. }