config.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. package prom
  2. import (
  3. "crypto/x509"
  4. "fmt"
  5. "time"
  6. coreenv "github.com/opencost/opencost/core/pkg/env"
  7. "github.com/opencost/opencost/core/pkg/log"
  8. "github.com/opencost/opencost/core/pkg/source"
  9. "github.com/opencost/opencost/modules/prometheus-source/pkg/env"
  10. restclient "k8s.io/client-go/rest"
  11. certutil "k8s.io/client-go/util/cert"
  12. )
  13. const (
  14. ServiceCA = `/var/run/secrets/kubernetes.io/serviceaccount/service-ca.crt`
  15. )
  16. func NewPrometheusLabelMappingFromEnv() source.FieldMapper {
  17. check := func(err error) {
  18. if err != nil {
  19. panic(fmt.Sprintf("Failed to create PrometheusLabelMapping from environment: %s", err))
  20. }
  21. }
  22. rfm := source.NewReverseFieldMapper()
  23. check(rfm.Set(source.ClusterIDLabel, env.GetPromClusterLabel()))
  24. check(rfm.Set(source.NamespaceLabel, env.GetPromNamespaceLabel()...))
  25. check(rfm.Set(source.NodeLabel, env.GetPromNodeLabel()...))
  26. check(rfm.Set(source.InstanceLabel, env.GetPromInstanceLabel()...))
  27. check(rfm.Set(source.InstanceTypeLabel, env.GetPromInstanceTypeLabel()...))
  28. check(rfm.Set(source.ContainerLabel, env.GetPromContainerLabel()...))
  29. check(rfm.Set(source.PodLabel, env.GetPromPodLabel()...))
  30. check(rfm.Set(source.ProviderIDLabel, env.GetPromProviderIDLabel()...))
  31. check(rfm.Set(source.DeviceLabel, env.GetPromDeviceLabel()...))
  32. check(rfm.Set(source.PVCLabel, env.GetPromPVCLabel()...))
  33. check(rfm.Set(source.PVLabel, env.GetPromPVLabel()...))
  34. check(rfm.Set(source.StorageClassLabel, env.GetPromStorageClassLabel()...))
  35. check(rfm.Set(source.VolumeNameLabel, env.GetPromVolumeNameLabel()...))
  36. check(rfm.Set(source.ServiceLabel, env.GetPromServiceLabel()...))
  37. check(rfm.Set(source.IngressIPLabel, env.GetPromIngressIPLabel()...))
  38. check(rfm.Set(source.ProvisionerNameLabel, env.GetPromProvisionerNameLabel()...))
  39. check(rfm.Set(source.UIDLabel, env.GetPromUIDLabel()...))
  40. check(rfm.Set(source.KubernetesNodeLabel, env.GetPromKubernetesNodeLabel()...))
  41. check(rfm.Set(source.ModeLabel, env.GetPromModeLabel()...))
  42. check(rfm.Set(source.ModelNameLabel, env.GetPromModelNameLabel()...))
  43. check(rfm.Set(source.UUIDLabel, env.GetPromUUIDLabel()...))
  44. check(rfm.Set(source.ResourceLabel, env.GetPromResourceLabel()...))
  45. check(rfm.Set(source.DeploymentLabel, env.GetPromDeploymentLabel()...))
  46. check(rfm.Set(source.StatefulSetLabel, env.GetPromStatefulSetLabel()...))
  47. check(rfm.Set(source.ReplicaSetLabel, env.GetPromReplicaSetLabel()...))
  48. check(rfm.Set(source.OwnerNameLabel, env.GetPromOwnerNameLabel()...))
  49. check(rfm.Set(source.OwnerKindLabel, env.GetPromOwnerKindLabel()...))
  50. check(rfm.Set(source.UnitLabel, env.GetPromUnitLabel()...))
  51. check(rfm.Set(source.InternetLabel, env.GetPromInternetLabel()...))
  52. check(rfm.Set(source.SameZoneLabel, env.GetPromSameZoneLabel()...))
  53. check(rfm.Set(source.SameRegionLabel, env.GetPromSameRegionLabel()...))
  54. return rfm
  55. }
  56. type OpenCostPrometheusConfig struct {
  57. ServerEndpoint string
  58. Version string
  59. IsOffsetResolution bool
  60. ClientConfig *PrometheusClientConfig
  61. ScrapeInterval time.Duration
  62. JobName string
  63. Offset string
  64. QueryOffset time.Duration
  65. MaxQueryDuration time.Duration
  66. ClusterLabel string
  67. ClusterID string
  68. ClusterFilter string
  69. DataResolution time.Duration
  70. DataResolutionMinutes int
  71. LabelMapping source.FieldMapper
  72. }
  73. func (ocpc *OpenCostPrometheusConfig) IsRateLimitRetryEnabled() bool {
  74. return ocpc.ClientConfig.RateLimitRetryOpts != nil
  75. }
  76. // NewOpenCostPrometheusConfigFromEnv creates a new OpenCostPrometheusConfig from environment variables.
  77. func NewOpenCostPrometheusConfigFromEnv() (*OpenCostPrometheusConfig, error) {
  78. serverEndpoint := env.GetPrometheusServerEndpoint()
  79. if serverEndpoint == "" {
  80. return nil, fmt.Errorf("no address for prometheus set in $%s", env.PrometheusServerEndpointEnvVar)
  81. }
  82. queryConcurrency := env.GetMaxQueryConcurrency()
  83. log.Debugf("[Prometheus]: Client Max Concurrency set to: %d", queryConcurrency)
  84. timeout := env.GetPrometheusQueryTimeout()
  85. keepAlive := env.GetPrometheusKeepAlive()
  86. tlsHandshakeTimeout := env.GetPrometheusTLSHandshakeTimeout()
  87. jobName := env.GetJobName()
  88. scrapeInterval := env.GetScrapeInterval()
  89. maxQueryDuration := env.GetPrometheusMaxQueryDuration()
  90. clusterId := coreenv.GetClusterID()
  91. clusterLabel := env.GetPromClusterLabel()
  92. clusterFilter := env.GetPromClusterFilter()
  93. var rateLimitRetryOpts *RateLimitRetryOpts = nil
  94. if env.IsPrometheusRetryOnRateLimitResponse() {
  95. rateLimitRetryOpts = &RateLimitRetryOpts{
  96. MaxRetries: env.GetPrometheusRetryOnRateLimitMaxRetries(),
  97. DefaultRetryWait: env.GetPrometheusRetryOnRateLimitDefaultWait(),
  98. }
  99. }
  100. auth := &ClientAuth{
  101. Username: env.GetDBBasicAuthUsername(),
  102. Password: env.GetDBBasicAuthUserPassword(),
  103. BearerToken: env.GetDBBearerToken(),
  104. }
  105. // We will use the service account token and service-ca.crt to authenticate with the Prometheus server via kube-rbac-proxy.
  106. // We need to ensure that the service account has the necessary permissions to access the Prometheus server by binding it to the appropriate role.
  107. var tlsCaCert *x509.CertPool
  108. if env.IsKubeRbacProxyEnabled() {
  109. restConfig, err := restclient.InClusterConfig()
  110. if err != nil {
  111. log.Errorf("%s was set to true but failed to get in-cluster config: %s", env.KubeRbacProxyEnabledEnvVar, err)
  112. }
  113. auth.BearerToken = restConfig.BearerToken
  114. tlsCaCert, err = certutil.NewPool(ServiceCA)
  115. if err != nil {
  116. log.Errorf("%s was set to true but failed to load service-ca.crt: %s", env.KubeRbacProxyEnabledEnvVar, err)
  117. }
  118. }
  119. dataResolution := env.GetPrometheusQueryResolution()
  120. // Ensuring if data resolution is less than 60s default it to 1m
  121. resolutionMinutes := int(dataResolution.Minutes())
  122. if resolutionMinutes == 0 {
  123. resolutionMinutes = 1
  124. }
  125. labelMapping := NewPrometheusLabelMappingFromEnv()
  126. clientConfig := &PrometheusClientConfig{
  127. Timeout: timeout,
  128. KeepAlive: keepAlive,
  129. TLSHandshakeTimeout: tlsHandshakeTimeout,
  130. TLSInsecureSkipVerify: env.IsInsecureSkipVerify(),
  131. RootCAs: tlsCaCert,
  132. RateLimitRetryOpts: rateLimitRetryOpts,
  133. Auth: auth,
  134. QueryConcurrency: queryConcurrency,
  135. QueryLogFile: "",
  136. HeaderXScopeOrgId: env.GetPrometheusHeaderXScopeOrgId(),
  137. }
  138. return &OpenCostPrometheusConfig{
  139. ServerEndpoint: serverEndpoint,
  140. Version: "0.0.0",
  141. IsOffsetResolution: false,
  142. ClientConfig: clientConfig,
  143. ScrapeInterval: scrapeInterval,
  144. JobName: jobName,
  145. Offset: "",
  146. QueryOffset: time.Duration(0),
  147. MaxQueryDuration: maxQueryDuration,
  148. ClusterLabel: clusterLabel,
  149. ClusterID: clusterId,
  150. ClusterFilter: clusterFilter,
  151. DataResolution: dataResolution,
  152. DataResolutionMinutes: resolutionMinutes,
  153. LabelMapping: labelMapping,
  154. }, nil
  155. }