query.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472
  1. package prom
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "net/url"
  7. "strconv"
  8. "time"
  9. "github.com/opencost/opencost/core/pkg/log"
  10. "github.com/opencost/opencost/core/pkg/source"
  11. "github.com/opencost/opencost/core/pkg/util/httputil"
  12. "github.com/opencost/opencost/core/pkg/util/json"
  13. "github.com/opencost/opencost/core/pkg/errors"
  14. prometheus "github.com/prometheus/client_golang/api"
  15. v1 "github.com/prometheus/client_golang/api/prometheus/v1"
  16. )
  17. const (
  18. epQuery = apiPrefix + "/query"
  19. epQueryRange = apiPrefix + "/query_range"
  20. )
  21. // ContextFactory is a factory for creating new Contexts for prometheus queries.
  22. type ContextFactory struct {
  23. client prometheus.Client
  24. config *OpenCostPrometheusConfig
  25. }
  26. // NewContextFactory creates a new ContextFactory with the provided prometheus client.
  27. func NewContextFactory(client prometheus.Client, promConfig *OpenCostPrometheusConfig) *ContextFactory {
  28. return &ContextFactory{
  29. client: client,
  30. config: promConfig,
  31. }
  32. }
  33. // NewContext creates a new prometheus query context.
  34. func (cf *ContextFactory) NewContext() *Context {
  35. return NewContext(cf.client, cf.config)
  36. }
  37. // NewContext creates a new named prometheus query context.
  38. func (cf *ContextFactory) NewNamedContext(name string) *Context {
  39. return NewNamedContext(cf.client, cf.config, name)
  40. }
  41. // Context wraps a Prometheus client and provides methods for querying and
  42. // parsing query responses and errors.
  43. type Context struct {
  44. Client prometheus.Client
  45. config *OpenCostPrometheusConfig
  46. name string
  47. errorCollector *source.QueryErrorCollector
  48. }
  49. // NewContext creates a new Prometheus querying context from the given client
  50. func NewContext(client prometheus.Client, config *OpenCostPrometheusConfig) *Context {
  51. var ec source.QueryErrorCollector
  52. return &Context{
  53. Client: client,
  54. config: config,
  55. name: "",
  56. errorCollector: &ec,
  57. }
  58. }
  59. // NewNamedContext creates a new named Prometheus querying context from the given client
  60. func NewNamedContext(client prometheus.Client, config *OpenCostPrometheusConfig, name string) *Context {
  61. ctx := NewContext(client, config)
  62. ctx.name = name
  63. return ctx
  64. }
  65. // Warnings returns the warnings collected from the Context's ErrorCollector
  66. func (ctx *Context) Warnings() []*source.QueryWarning {
  67. return ctx.errorCollector.Warnings()
  68. }
  69. // HasWarnings returns true if the ErrorCollector has warnings.
  70. func (ctx *Context) HasWarnings() bool {
  71. return ctx.errorCollector.IsWarning()
  72. }
  73. // Errors returns the errors collected from the Context's ErrorCollector.
  74. func (ctx *Context) Errors() []*source.QueryError {
  75. return ctx.errorCollector.Errors()
  76. }
  77. // HasErrors returns true if the ErrorCollector has errors
  78. func (ctx *Context) HasErrors() bool {
  79. return ctx.errorCollector.IsError()
  80. }
  81. // ErrorCollection returns the aggregation of errors if there exists errors. Otherwise,
  82. // nil is returned
  83. func (ctx *Context) ErrorCollection() error {
  84. if ctx.errorCollector.IsError() {
  85. // errorCollector implements the error interface
  86. return ctx.errorCollector
  87. }
  88. return nil
  89. }
  90. // Query returns a QueryResultsChan, then runs the given query and sends the
  91. // results on the provided channel. Receiver is responsible for closing the
  92. // channel, preferably using the Read method.
  93. func (ctx *Context) Query(query string) source.QueryResultsChan {
  94. resCh := make(source.QueryResultsChan)
  95. go runQuery(query, ctx, resCh, time.Now(), "")
  96. return resCh
  97. }
  98. // QueryAtTime returns a QueryResultsChan, then runs the given query at the
  99. // given time (see time parameter here: https://prometheus.io/docs/prometheus/latest/querying/api/#instant-queries)
  100. // and sends the results on the provided channel. Receiver is responsible for
  101. // closing the channel, preferably using the Read method.
  102. func (ctx *Context) QueryAtTime(query string, t time.Time) source.QueryResultsChan {
  103. resCh := make(source.QueryResultsChan)
  104. go runQuery(query, ctx, resCh, t, "")
  105. return resCh
  106. }
  107. // ProfileQuery returns a QueryResultsChan, then runs the given query with a profile
  108. // label and sends the results on the provided channel. Receiver is responsible for closing the
  109. // channel, preferably using the Read method.
  110. func (ctx *Context) ProfileQuery(query string, profileLabel string) source.QueryResultsChan {
  111. resCh := make(source.QueryResultsChan)
  112. go runQuery(query, ctx, resCh, time.Now(), profileLabel)
  113. return resCh
  114. }
  115. // QueryAll returns one QueryResultsChan for each query provided, then runs
  116. // each query concurrently and returns results on each channel, respectively,
  117. // in the order they were provided; i.e. the response to queries[1] will be
  118. // sent on channel resChs[1].
  119. func (ctx *Context) QueryAll(queries ...string) []source.QueryResultsChan {
  120. resChs := []source.QueryResultsChan{}
  121. for _, q := range queries {
  122. resChs = append(resChs, ctx.Query(q))
  123. }
  124. return resChs
  125. }
  126. // ProfileQueryAll returns one QueryResultsChan for each query provided, then runs
  127. // each ProfileQuery concurrently and returns results on each channel, respectively,
  128. // in the order they were provided; i.e. the response to queries[1] will be
  129. // sent on channel resChs[1].
  130. func (ctx *Context) ProfileQueryAll(queries ...string) []source.QueryResultsChan {
  131. resChs := []source.QueryResultsChan{}
  132. for _, q := range queries {
  133. resChs = append(resChs, ctx.ProfileQuery(q, fmt.Sprintf("Query #%d", len(resChs)+1)))
  134. }
  135. return resChs
  136. }
  137. func (ctx *Context) QuerySync(query string) ([]*source.QueryResult, v1.Warnings, error) {
  138. raw, warnings, err := ctx.query(query, time.Now())
  139. if err != nil {
  140. return nil, warnings, err
  141. }
  142. // create result keys from custom cluster label
  143. resultKeys := source.ClusterKeyWithDefaults(ctx.config.ClusterLabel)
  144. results := NewQueryResults(query, raw, resultKeys)
  145. if results.Error != nil {
  146. return nil, warnings, results.Error
  147. }
  148. return results.Results, warnings, nil
  149. }
  150. // QueryURL returns the URL used to query Prometheus
  151. func (ctx *Context) QueryURL() *url.URL {
  152. return ctx.Client.URL(epQuery, nil)
  153. }
  154. // runQuery executes the prometheus query asynchronously, collects results and
  155. // errors, and passes them through the results channel.
  156. func runQuery(query string, ctx *Context, resCh source.QueryResultsChan, t time.Time, profileLabel string) {
  157. defer errors.HandlePanic()
  158. startQuery := time.Now()
  159. raw, warnings, requestError := ctx.query(query, t)
  160. var parseError error
  161. var results *source.QueryResults
  162. if requestError != nil {
  163. results = NewQueryResultError(query, requestError)
  164. } else {
  165. // create result keys from custom cluster label
  166. resultKeys := source.ClusterKeyWithDefaults(ctx.config.ClusterLabel)
  167. results = NewQueryResults(query, raw, resultKeys)
  168. parseError = results.Error
  169. }
  170. // report all warnings, request, and parse errors (nils will be ignored)
  171. ctx.errorCollector.Report(query, warnings, requestError, parseError)
  172. if profileLabel != "" {
  173. log.Profile(startQuery, profileLabel)
  174. }
  175. resCh <- results
  176. }
  177. // RawQuery is a direct query to the prometheus client and returns the body of the response
  178. func (ctx *Context) RawQuery(query string, t time.Time) ([]byte, error) {
  179. u := ctx.Client.URL(epQuery, nil)
  180. q := u.Query()
  181. q.Set("query", query)
  182. if t.IsZero() {
  183. t = time.Now()
  184. }
  185. q.Set("time", strconv.FormatInt(t.Unix(), 10))
  186. u.RawQuery = q.Encode()
  187. req, err := http.NewRequest(http.MethodPost, u.String(), nil)
  188. if err != nil {
  189. return nil, err
  190. }
  191. // Set QueryContext name if non empty
  192. if ctx.name != "" {
  193. req = httputil.SetName(req, ctx.name)
  194. }
  195. req = httputil.SetQuery(req, query)
  196. // Note that the warnings return value from client.Do() is always nil using this
  197. // version of the prometheus client library. We parse the warnings out of the response
  198. // body after json decodidng completes.
  199. resp, body, err := ctx.Client.Do(context.Background(), req)
  200. if err != nil {
  201. if resp == nil {
  202. return nil, fmt.Errorf("query error: '%s' fetching query '%s'", err.Error(), query)
  203. }
  204. return nil, fmt.Errorf("query error %d: '%s' fetching query '%s'", resp.StatusCode, err.Error(), query)
  205. }
  206. // Unsuccessful Status Code, log body and status
  207. statusCode := resp.StatusCode
  208. statusText := http.StatusText(statusCode)
  209. if resp.StatusCode < 200 || resp.StatusCode >= 300 {
  210. return nil, source.CommErrorf("%d (%s) URL: '%s', Body: '%s' Query: '%s'", statusCode, statusText, req.URL, body, query)
  211. }
  212. return body, err
  213. }
  214. func (ctx *Context) query(query string, t time.Time) (interface{}, v1.Warnings, error) {
  215. body, err := ctx.RawQuery(query, t)
  216. if err != nil {
  217. return nil, nil, err
  218. }
  219. var toReturn interface{}
  220. err = json.Unmarshal(body, &toReturn)
  221. if err != nil {
  222. return nil, nil, fmt.Errorf("query '%s' caused unmarshal error: %s", query, err)
  223. }
  224. warnings := warningsFrom(toReturn)
  225. for _, w := range warnings {
  226. // NoStoreAPIWarning is a warning that we would consider an error. It returns partial data relating only to the
  227. // store apis which were reachable. In order to ensure integrity of data across all clusters, we'll need to identify
  228. // this warning and convert it to an error.
  229. if source.IsNoStoreAPIWarning(w) {
  230. return nil, warnings, source.CommErrorf("Error: %s, Body: %s, Query: %s", w, body, query)
  231. }
  232. log.Warnf("fetching query '%s': %s", query, w)
  233. }
  234. return toReturn, warnings, nil
  235. }
  236. // isRequestStepAligned will check if the start and end times are aligned with the step
  237. func (ctx *Context) isRequestStepAligned(start, end time.Time, step time.Duration) bool {
  238. startInUnix := start.Unix()
  239. endInUnix := end.Unix()
  240. stepInSeconds := step.Milliseconds() / 1e3
  241. return startInUnix%stepInSeconds == 0 && endInUnix%stepInSeconds == 0
  242. }
  243. func (ctx *Context) QueryRange(query string, start, end time.Time, step time.Duration) source.QueryResultsChan {
  244. resCh := make(source.QueryResultsChan)
  245. if !ctx.isRequestStepAligned(start, end, step) {
  246. start, end = ctx.alignWindow(start, end, step)
  247. }
  248. go runQueryRange(query, start, end, step, ctx, resCh, "")
  249. return resCh
  250. }
  251. func (ctx *Context) ProfileQueryRange(query string, start, end time.Time, step time.Duration, profileLabel string) source.QueryResultsChan {
  252. resCh := make(source.QueryResultsChan)
  253. go runQueryRange(query, start, end, step, ctx, resCh, profileLabel)
  254. return resCh
  255. }
  256. func (ctx *Context) QueryRangeSync(query string, start, end time.Time, step time.Duration) ([]*source.QueryResult, v1.Warnings, error) {
  257. raw, warnings, err := ctx.queryRange(query, start, end, step)
  258. if err != nil {
  259. return nil, warnings, err
  260. }
  261. // create result keys from custom cluster label
  262. resultKeys := source.ClusterKeyWithDefaults(ctx.config.ClusterLabel)
  263. results := NewQueryResults(query, raw, resultKeys)
  264. if results.Error != nil {
  265. return nil, warnings, results.Error
  266. }
  267. return results.Results, warnings, nil
  268. }
  269. // QueryRangeURL returns the URL used to query_range Prometheus
  270. func (ctx *Context) QueryRangeURL() *url.URL {
  271. return ctx.Client.URL(epQueryRange, nil)
  272. }
  273. // runQueryRange executes the prometheus queryRange asynchronously, collects results and
  274. // errors, and passes them through the results channel.
  275. func runQueryRange(query string, start, end time.Time, step time.Duration, ctx *Context, resCh source.QueryResultsChan, profileLabel string) {
  276. defer errors.HandlePanic()
  277. startQuery := time.Now()
  278. raw, warnings, requestError := ctx.queryRange(query, start, end, step)
  279. var parseError error
  280. var results *source.QueryResults
  281. if requestError != nil {
  282. results = NewQueryResultError(query, requestError)
  283. } else {
  284. // create result keys from custom cluster label
  285. resultKeys := source.ClusterKeyWithDefaults(ctx.config.ClusterLabel)
  286. results = NewQueryResults(query, raw, resultKeys)
  287. parseError = results.Error
  288. }
  289. // report all warnings, request, and parse errors (nils will be ignored)
  290. ctx.errorCollector.Report(query, warnings, requestError, parseError)
  291. if profileLabel != "" {
  292. log.Profile(startQuery, profileLabel)
  293. }
  294. resCh <- results
  295. }
  296. // RawQuery is a direct query to the prometheus client and returns the body of the response
  297. func (ctx *Context) RawQueryRange(query string, start, end time.Time, step time.Duration) ([]byte, error) {
  298. u := ctx.Client.URL(epQueryRange, nil)
  299. q := u.Query()
  300. q.Set("query", query)
  301. q.Set("start", start.Format(time.RFC3339Nano))
  302. q.Set("end", end.Format(time.RFC3339Nano))
  303. q.Set("step", strconv.FormatFloat(step.Seconds(), 'f', 3, 64))
  304. u.RawQuery = q.Encode()
  305. req, err := http.NewRequest(http.MethodPost, u.String(), nil)
  306. if err != nil {
  307. return nil, err
  308. }
  309. // Set QueryContext name if non empty
  310. if ctx.name != "" {
  311. req = httputil.SetName(req, ctx.name)
  312. }
  313. req = httputil.SetQuery(req, query)
  314. // Note that the warnings return value from client.Do() is always nil using this
  315. // version of the prometheus client library. We parse the warnings out of the response
  316. // body after json decodidng completes.
  317. resp, body, err := ctx.Client.Do(context.Background(), req)
  318. if err != nil {
  319. if resp == nil {
  320. return nil, fmt.Errorf("Error: %s, Body: %s Query: %s", err.Error(), body, query)
  321. }
  322. return nil, fmt.Errorf("%d (%s) Error: %s Body: %s Query: %s", resp.StatusCode, http.StatusText(resp.StatusCode), body, err.Error(), query)
  323. }
  324. // Unsuccessful Status Code, log body and status
  325. statusCode := resp.StatusCode
  326. statusText := http.StatusText(statusCode)
  327. if resp.StatusCode < 200 || resp.StatusCode >= 300 {
  328. return nil, source.CommErrorf("%d (%s) Body: %s Query: %s", statusCode, statusText, body, query)
  329. }
  330. return body, err
  331. }
  332. func (ctx *Context) queryRange(query string, start, end time.Time, step time.Duration) (interface{}, v1.Warnings, error) {
  333. body, err := ctx.RawQueryRange(query, start, end, step)
  334. if err != nil {
  335. return nil, nil, err
  336. }
  337. var toReturn interface{}
  338. err = json.Unmarshal(body, &toReturn)
  339. if err != nil {
  340. return nil, nil, fmt.Errorf("query '%s' caused unmarshal error: %s", query, err)
  341. }
  342. warnings := warningsFrom(toReturn)
  343. for _, w := range warnings {
  344. // NoStoreAPIWarning is a warning that we would consider an error. It returns partial data relating only to the
  345. // store apis which were reachable. In order to ensure integrity of data across all clusters, we'll need to identify
  346. // this warning and convert it to an error.
  347. if source.IsNoStoreAPIWarning(w) {
  348. return nil, warnings, source.CommErrorf("Error: %s, Body: %s, Query: %s", w, body, query)
  349. }
  350. log.Warnf("fetching query '%s': %s", query, w)
  351. }
  352. return toReturn, warnings, nil
  353. }
  354. // alignWindow will update the start and end times to be aligned with the step duration.
  355. // Current implementation will always floor the start/end times
  356. func (ctx *Context) alignWindow(start time.Time, end time.Time, step time.Duration) (time.Time, time.Time) {
  357. // Convert the step duration from Milliseconds to Seconds to match the Unix timestamp, which is in seconds
  358. stepInSeconds := step.Milliseconds() / 1e3
  359. alignedStart := (start.Unix() / stepInSeconds) * stepInSeconds
  360. alignedEnd := (end.Unix() / stepInSeconds) * stepInSeconds
  361. return time.Unix(alignedStart, 0).UTC(), time.Unix(alignedEnd, 0).UTC()
  362. }
  363. // Extracts the warnings from the resulting json if they exist (part of the prometheus response api).
  364. func warningsFrom(result interface{}) v1.Warnings {
  365. var warnings v1.Warnings
  366. if resultMap, ok := result.(map[string]interface{}); ok {
  367. if warningProp, ok := resultMap["warnings"]; ok {
  368. if w, ok := warningProp.([]string); ok {
  369. warnings = w
  370. }
  371. }
  372. }
  373. return warnings
  374. }