thanos.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. package thanos
  2. import (
  3. "crypto/tls"
  4. "fmt"
  5. "net"
  6. "net/http"
  7. "net/url"
  8. "strings"
  9. "sync"
  10. "time"
  11. "github.com/opencost/opencost/pkg/env"
  12. "github.com/opencost/opencost/pkg/prom"
  13. prometheus "github.com/prometheus/client_golang/api"
  14. )
  15. // MaxSourceResulution is the query parameter key used to designate the resolution
  16. // to use when executing a query.
  17. const MaxSourceResulution = "max_source_resolution"
  18. var (
  19. lock = new(sync.Mutex)
  20. enabled = env.IsThanosEnabled()
  21. queryUrl = env.GetThanosQueryUrl()
  22. offset = env.GetThanosOffset()
  23. maxSourceRes = env.GetThanosMaxSourceResolution()
  24. offsetDuration *time.Duration
  25. queryOffset = fmt.Sprintf(" offset %s", offset)
  26. )
  27. // IsEnabled returns true if Thanos is enabled.
  28. func IsEnabled() bool {
  29. return enabled
  30. }
  31. // QueryURL returns true if Thanos is enabled.
  32. func QueryURL() string {
  33. return queryUrl
  34. }
  35. // Offset returns the duration string for the query offset that should be applied to thanos
  36. func Offset() string {
  37. return offset
  38. }
  39. // OffsetDuration returns the Offset as a parsed duration
  40. func OffsetDuration() time.Duration {
  41. lock.Lock()
  42. defer lock.Unlock()
  43. if offsetDuration == nil {
  44. d, err := time.ParseDuration(offset)
  45. if err != nil {
  46. d = 0
  47. }
  48. offsetDuration = &d
  49. }
  50. return *offsetDuration
  51. }
  52. // QueryOffset returns a string in the format: " offset %s" substituting in the Offset() string.
  53. func QueryOffset() string {
  54. return queryOffset
  55. }
  56. func NewThanosClient(address string, config *prom.PrometheusClientConfig) (prometheus.Client, error) {
  57. tc := prometheus.Config{
  58. Address: address,
  59. RoundTripper: &http.Transport{
  60. Proxy: http.ProxyFromEnvironment,
  61. DialContext: (&net.Dialer{
  62. Timeout: config.Timeout,
  63. KeepAlive: config.KeepAlive,
  64. }).DialContext,
  65. TLSHandshakeTimeout: config.TLSHandshakeTimeout,
  66. TLSClientConfig: &tls.Config{
  67. InsecureSkipVerify: config.TLSInsecureSkipVerify,
  68. },
  69. },
  70. }
  71. client, err := prometheus.NewClient(tc)
  72. if err != nil {
  73. return nil, err
  74. }
  75. // max source resolution decorator
  76. maxSourceDecorator := func(path string, queryParams url.Values) url.Values {
  77. if strings.Contains(path, "query") {
  78. queryParams.Set(MaxSourceResulution, maxSourceRes)
  79. }
  80. return queryParams
  81. }
  82. return prom.NewRateLimitedClient(
  83. prom.ThanosClientID,
  84. client,
  85. config.QueryConcurrency,
  86. config.Auth,
  87. maxSourceDecorator,
  88. config.RateLimitRetryOpts,
  89. config.QueryLogFile,
  90. "",
  91. )
  92. }