decode.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456
  1. // Copyright 2015 The Prometheus Authors
  2. // Licensed under the Apache License, Version 2.0 (the "License");
  3. // you may not use this file except in compliance with the License.
  4. // You may obtain a copy of the License at
  5. //
  6. // http://www.apache.org/licenses/LICENSE-2.0
  7. //
  8. // Unless required by applicable law or agreed to in writing, software
  9. // distributed under the License is distributed on an "AS IS" BASIS,
  10. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. // See the License for the specific language governing permissions and
  12. // limitations under the License.
  13. package expfmt
  14. import (
  15. "bufio"
  16. "fmt"
  17. "io"
  18. "math"
  19. "mime"
  20. "net/http"
  21. dto "github.com/prometheus/client_model/go"
  22. "google.golang.org/protobuf/encoding/protodelim"
  23. "github.com/prometheus/common/model"
  24. )
  25. // Decoder types decode an input stream into metric families.
  26. type Decoder interface {
  27. Decode(*dto.MetricFamily) error
  28. }
  29. // DecodeOptions contains options used by the Decoder and in sample extraction.
  30. type DecodeOptions struct {
  31. // Timestamp is added to each value from the stream that has no explicit timestamp set.
  32. Timestamp model.Time
  33. }
  34. // ResponseFormat extracts the correct format from a HTTP response header.
  35. // If no matching format can be found FormatUnknown is returned.
  36. func ResponseFormat(h http.Header) Format {
  37. ct := h.Get(hdrContentType)
  38. mediatype, params, err := mime.ParseMediaType(ct)
  39. if err != nil {
  40. return FmtUnknown
  41. }
  42. const textType = "text/plain"
  43. switch mediatype {
  44. case ProtoType:
  45. if p, ok := params["proto"]; ok && p != ProtoProtocol {
  46. return FmtUnknown
  47. }
  48. if e, ok := params["encoding"]; ok && e != "delimited" {
  49. return FmtUnknown
  50. }
  51. return FmtProtoDelim
  52. case textType:
  53. if v, ok := params["version"]; ok && v != TextVersion {
  54. return FmtUnknown
  55. }
  56. return FmtText
  57. }
  58. return FmtUnknown
  59. }
  60. // NewDecoder returns a new decoder based on the given input format. Metric
  61. // names are validated based on the provided Format -- if the format requires
  62. // escaping, raditional Prometheues validity checking is used. Otherwise, names
  63. // are checked for UTF-8 validity. Supported formats include delimited protobuf
  64. // and Prometheus text format. For historical reasons, this decoder fallbacks
  65. // to classic text decoding for any other format. This decoder does not fully
  66. // support OpenMetrics although it may often succeed due to the similarities
  67. // between the formats. This decoder may not support the latest features of
  68. // Prometheus text format and is not intended for high-performance applications.
  69. // See: https://github.com/prometheus/common/issues/812
  70. func NewDecoder(r io.Reader, format Format) Decoder {
  71. scheme := model.LegacyValidation
  72. if format.ToEscapingScheme() == model.NoEscaping {
  73. scheme = model.UTF8Validation
  74. }
  75. switch format.FormatType() {
  76. case TypeProtoDelim:
  77. return &protoDecoder{r: bufio.NewReader(r), s: scheme}
  78. case TypeProtoText, TypeProtoCompact:
  79. return &errDecoder{err: fmt.Errorf("format %s not supported for decoding", format)}
  80. }
  81. return &textDecoder{r: r, s: scheme}
  82. }
  83. // protoDecoder implements the Decoder interface for protocol buffers.
  84. type protoDecoder struct {
  85. r protodelim.Reader
  86. s model.ValidationScheme
  87. }
  88. // Decode implements the Decoder interface.
  89. func (d *protoDecoder) Decode(v *dto.MetricFamily) error {
  90. opts := protodelim.UnmarshalOptions{
  91. MaxSize: -1,
  92. }
  93. if err := opts.UnmarshalFrom(d.r, v); err != nil {
  94. return err
  95. }
  96. if !d.s.IsValidMetricName(v.GetName()) {
  97. return fmt.Errorf("invalid metric name %q", v.GetName())
  98. }
  99. for _, m := range v.GetMetric() {
  100. if m == nil {
  101. continue
  102. }
  103. for _, l := range m.GetLabel() {
  104. if l == nil {
  105. continue
  106. }
  107. if !model.LabelValue(l.GetValue()).IsValid() {
  108. return fmt.Errorf("invalid label value %q", l.GetValue())
  109. }
  110. if !d.s.IsValidLabelName(l.GetName()) {
  111. return fmt.Errorf("invalid label name %q", l.GetName())
  112. }
  113. }
  114. }
  115. return nil
  116. }
  117. // errDecoder is an error-state decoder that always returns the same error.
  118. type errDecoder struct {
  119. err error
  120. }
  121. func (d *errDecoder) Decode(*dto.MetricFamily) error {
  122. return d.err
  123. }
  124. // textDecoder implements the Decoder interface for the text protocol.
  125. type textDecoder struct {
  126. r io.Reader
  127. fams map[string]*dto.MetricFamily
  128. s model.ValidationScheme
  129. err error
  130. }
  131. // Decode implements the Decoder interface.
  132. func (d *textDecoder) Decode(v *dto.MetricFamily) error {
  133. if d.err == nil {
  134. // Read all metrics in one shot.
  135. p := NewTextParser(d.s)
  136. d.fams, d.err = p.TextToMetricFamilies(d.r)
  137. // If we don't get an error, store io.EOF for the end.
  138. if d.err == nil {
  139. d.err = io.EOF
  140. }
  141. }
  142. // Pick off one MetricFamily per Decode until there's nothing left.
  143. for key, fam := range d.fams {
  144. v.Name = fam.Name
  145. v.Help = fam.Help
  146. v.Type = fam.Type
  147. v.Metric = fam.Metric
  148. delete(d.fams, key)
  149. return nil
  150. }
  151. return d.err
  152. }
  153. // SampleDecoder wraps a Decoder to extract samples from the metric families
  154. // decoded by the wrapped Decoder.
  155. type SampleDecoder struct {
  156. Dec Decoder
  157. Opts *DecodeOptions
  158. f dto.MetricFamily
  159. }
  160. // Decode calls the Decode method of the wrapped Decoder and then extracts the
  161. // samples from the decoded MetricFamily into the provided model.Vector.
  162. func (sd *SampleDecoder) Decode(s *model.Vector) error {
  163. err := sd.Dec.Decode(&sd.f)
  164. if err != nil {
  165. return err
  166. }
  167. *s, err = extractSamples(&sd.f, sd.Opts)
  168. return err
  169. }
  170. // ExtractSamples builds a slice of samples from the provided metric
  171. // families. If an error occurs during sample extraction, it continues to
  172. // extract from the remaining metric families. The returned error is the last
  173. // error that has occurred.
  174. func ExtractSamples(o *DecodeOptions, fams ...*dto.MetricFamily) (model.Vector, error) {
  175. var (
  176. all model.Vector
  177. lastErr error
  178. )
  179. for _, f := range fams {
  180. some, err := extractSamples(f, o)
  181. if err != nil {
  182. lastErr = err
  183. continue
  184. }
  185. all = append(all, some...)
  186. }
  187. return all, lastErr
  188. }
  189. func extractSamples(f *dto.MetricFamily, o *DecodeOptions) (model.Vector, error) {
  190. switch f.GetType() {
  191. case dto.MetricType_COUNTER:
  192. return extractCounter(o, f), nil
  193. case dto.MetricType_GAUGE:
  194. return extractGauge(o, f), nil
  195. case dto.MetricType_SUMMARY:
  196. return extractSummary(o, f), nil
  197. case dto.MetricType_UNTYPED:
  198. return extractUntyped(o, f), nil
  199. case dto.MetricType_HISTOGRAM:
  200. return extractHistogram(o, f), nil
  201. }
  202. return nil, fmt.Errorf("expfmt.extractSamples: unknown metric family type %v", f.GetType())
  203. }
  204. func extractCounter(o *DecodeOptions, f *dto.MetricFamily) model.Vector {
  205. samples := make(model.Vector, 0, len(f.Metric))
  206. for _, m := range f.Metric {
  207. if m.Counter == nil {
  208. continue
  209. }
  210. lset := make(model.LabelSet, len(m.Label)+1)
  211. for _, p := range m.Label {
  212. lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
  213. }
  214. lset[model.MetricNameLabel] = model.LabelValue(f.GetName())
  215. smpl := &model.Sample{
  216. Metric: model.Metric(lset),
  217. Value: model.SampleValue(m.Counter.GetValue()),
  218. }
  219. if m.TimestampMs != nil {
  220. smpl.Timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000)
  221. } else {
  222. smpl.Timestamp = o.Timestamp
  223. }
  224. samples = append(samples, smpl)
  225. }
  226. return samples
  227. }
  228. func extractGauge(o *DecodeOptions, f *dto.MetricFamily) model.Vector {
  229. samples := make(model.Vector, 0, len(f.Metric))
  230. for _, m := range f.Metric {
  231. if m.Gauge == nil {
  232. continue
  233. }
  234. lset := make(model.LabelSet, len(m.Label)+1)
  235. for _, p := range m.Label {
  236. lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
  237. }
  238. lset[model.MetricNameLabel] = model.LabelValue(f.GetName())
  239. smpl := &model.Sample{
  240. Metric: model.Metric(lset),
  241. Value: model.SampleValue(m.Gauge.GetValue()),
  242. }
  243. if m.TimestampMs != nil {
  244. smpl.Timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000)
  245. } else {
  246. smpl.Timestamp = o.Timestamp
  247. }
  248. samples = append(samples, smpl)
  249. }
  250. return samples
  251. }
  252. func extractUntyped(o *DecodeOptions, f *dto.MetricFamily) model.Vector {
  253. samples := make(model.Vector, 0, len(f.Metric))
  254. for _, m := range f.Metric {
  255. if m.Untyped == nil {
  256. continue
  257. }
  258. lset := make(model.LabelSet, len(m.Label)+1)
  259. for _, p := range m.Label {
  260. lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
  261. }
  262. lset[model.MetricNameLabel] = model.LabelValue(f.GetName())
  263. smpl := &model.Sample{
  264. Metric: model.Metric(lset),
  265. Value: model.SampleValue(m.Untyped.GetValue()),
  266. }
  267. if m.TimestampMs != nil {
  268. smpl.Timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000)
  269. } else {
  270. smpl.Timestamp = o.Timestamp
  271. }
  272. samples = append(samples, smpl)
  273. }
  274. return samples
  275. }
  276. func extractSummary(o *DecodeOptions, f *dto.MetricFamily) model.Vector {
  277. samples := make(model.Vector, 0, len(f.Metric))
  278. for _, m := range f.Metric {
  279. if m.Summary == nil {
  280. continue
  281. }
  282. timestamp := o.Timestamp
  283. if m.TimestampMs != nil {
  284. timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000)
  285. }
  286. for _, q := range m.Summary.Quantile {
  287. lset := make(model.LabelSet, len(m.Label)+2)
  288. for _, p := range m.Label {
  289. lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
  290. }
  291. // BUG(matt): Update other names to "quantile".
  292. lset[model.LabelName(model.QuantileLabel)] = model.LabelValue(fmt.Sprint(q.GetQuantile()))
  293. lset[model.MetricNameLabel] = model.LabelValue(f.GetName())
  294. samples = append(samples, &model.Sample{
  295. Metric: model.Metric(lset),
  296. Value: model.SampleValue(q.GetValue()),
  297. Timestamp: timestamp,
  298. })
  299. }
  300. lset := make(model.LabelSet, len(m.Label)+1)
  301. for _, p := range m.Label {
  302. lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
  303. }
  304. lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_sum")
  305. samples = append(samples, &model.Sample{
  306. Metric: model.Metric(lset),
  307. Value: model.SampleValue(m.Summary.GetSampleSum()),
  308. Timestamp: timestamp,
  309. })
  310. lset = make(model.LabelSet, len(m.Label)+1)
  311. for _, p := range m.Label {
  312. lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
  313. }
  314. lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_count")
  315. samples = append(samples, &model.Sample{
  316. Metric: model.Metric(lset),
  317. Value: model.SampleValue(m.Summary.GetSampleCount()),
  318. Timestamp: timestamp,
  319. })
  320. }
  321. return samples
  322. }
  323. func extractHistogram(o *DecodeOptions, f *dto.MetricFamily) model.Vector {
  324. samples := make(model.Vector, 0, len(f.Metric))
  325. for _, m := range f.Metric {
  326. if m.Histogram == nil {
  327. continue
  328. }
  329. timestamp := o.Timestamp
  330. if m.TimestampMs != nil {
  331. timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000)
  332. }
  333. infSeen := false
  334. for _, q := range m.Histogram.Bucket {
  335. lset := make(model.LabelSet, len(m.Label)+2)
  336. for _, p := range m.Label {
  337. lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
  338. }
  339. lset[model.LabelName(model.BucketLabel)] = model.LabelValue(fmt.Sprint(q.GetUpperBound()))
  340. lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_bucket")
  341. if math.IsInf(q.GetUpperBound(), +1) {
  342. infSeen = true
  343. }
  344. samples = append(samples, &model.Sample{
  345. Metric: model.Metric(lset),
  346. Value: model.SampleValue(q.GetCumulativeCount()),
  347. Timestamp: timestamp,
  348. })
  349. }
  350. lset := make(model.LabelSet, len(m.Label)+1)
  351. for _, p := range m.Label {
  352. lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
  353. }
  354. lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_sum")
  355. samples = append(samples, &model.Sample{
  356. Metric: model.Metric(lset),
  357. Value: model.SampleValue(m.Histogram.GetSampleSum()),
  358. Timestamp: timestamp,
  359. })
  360. lset = make(model.LabelSet, len(m.Label)+1)
  361. for _, p := range m.Label {
  362. lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
  363. }
  364. lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_count")
  365. count := &model.Sample{
  366. Metric: model.Metric(lset),
  367. Value: model.SampleValue(m.Histogram.GetSampleCount()),
  368. Timestamp: timestamp,
  369. }
  370. samples = append(samples, count)
  371. if !infSeen {
  372. // Append an infinity bucket sample.
  373. lset := make(model.LabelSet, len(m.Label)+2)
  374. for _, p := range m.Label {
  375. lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
  376. }
  377. lset[model.LabelName(model.BucketLabel)] = model.LabelValue("+Inf")
  378. lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_bucket")
  379. samples = append(samples, &model.Sample{
  380. Metric: model.Metric(lset),
  381. Value: count.Value,
  382. Timestamp: timestamp,
  383. })
  384. }
  385. }
  386. return samples
  387. }