|
|
@@ -9,7 +9,6 @@ import (
|
|
|
|
|
|
"github.com/Masterminds/semver/v3"
|
|
|
"github.com/julienschmidt/httprouter"
|
|
|
- "github.com/opencost/opencost/modules/prometheus-source/pkg/env"
|
|
|
|
|
|
"github.com/opencost/opencost/core/pkg/clusters"
|
|
|
"github.com/opencost/opencost/core/pkg/diagnostics"
|
|
|
@@ -82,10 +81,6 @@ type PrometheusDataSource struct {
|
|
|
promClient prometheus.Client
|
|
|
promContexts *ContextFactory
|
|
|
|
|
|
- thanosConfig *OpenCostThanosConfig
|
|
|
- thanosClient prometheus.Client
|
|
|
- thanosContexts *ContextFactory
|
|
|
-
|
|
|
metricsQuerier *PrometheusMetricsQuerier
|
|
|
clusterMap clusters.ClusterMap
|
|
|
clusterInfo clusters.ClusterInfoProvider
|
|
|
@@ -100,20 +95,11 @@ func NewDefaultPrometheusDataSource(clusterInfoProvider clusters.ClusterInfoProv
|
|
|
return nil, fmt.Errorf("failed to create prometheus config from env: %w", err)
|
|
|
}
|
|
|
|
|
|
- var thanosConfig *OpenCostThanosConfig
|
|
|
- if env.IsThanosEnabled() {
|
|
|
- // thanos initialization is not fatal, so we log the error and continue
|
|
|
- thanosConfig, err = NewOpenCostThanosConfigFromEnv()
|
|
|
- if err != nil {
|
|
|
- log.Warnf("Thanos was enabled, but failed to create thanos config from env: %s. Continuing...", err.Error())
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- return NewPrometheusDataSource(clusterInfoProvider, config, thanosConfig)
|
|
|
+ return NewPrometheusDataSource(clusterInfoProvider, config)
|
|
|
}
|
|
|
|
|
|
// NewPrometheusDataSource initializes clients for Prometheus and Thanos, and returns a new PrometheusDataSource.
|
|
|
-func NewPrometheusDataSource(infoProvider clusters.ClusterInfoProvider, promConfig *OpenCostPrometheusConfig, thanosConfig *OpenCostThanosConfig) (*PrometheusDataSource, error) {
|
|
|
+func NewPrometheusDataSource(infoProvider clusters.ClusterInfoProvider, promConfig *OpenCostPrometheusConfig) (*PrometheusDataSource, error) {
|
|
|
promClient, err := NewPrometheusClient(promConfig.ServerEndpoint, promConfig.ClientConfig)
|
|
|
if err != nil {
|
|
|
return nil, fmt.Errorf("failed to build prometheus client: %w", err)
|
|
|
@@ -167,68 +153,26 @@ func NewPrometheusDataSource(infoProvider clusters.ClusterInfoProvider, promConf
|
|
|
|
|
|
promContexts := NewContextFactory(promClient, promConfig)
|
|
|
|
|
|
- var thanosClient prometheus.Client
|
|
|
- var thanosContexts *ContextFactory
|
|
|
-
|
|
|
- // if the thanos configuration is non-nil, we assume intent to use thanos. However, failure to
|
|
|
- // initialize the thanos client is not fatal, and we will log the error and continue.
|
|
|
- if thanosConfig != nil {
|
|
|
- thanosHost := thanosConfig.ServerEndpoint
|
|
|
- if thanosHost != "" {
|
|
|
- thanosCli, _ := NewThanosClient(thanosHost, thanosConfig)
|
|
|
-
|
|
|
- _, err = Validate(thanosCli, thanosConfig.OpenCostPrometheusConfig)
|
|
|
- if err != nil {
|
|
|
- log.Warnf("Failed to query Thanos at %s. Error: %s.", thanosHost, err.Error())
|
|
|
- thanosClient = thanosCli
|
|
|
- } else {
|
|
|
- log.Infof("Success: retrieved the 'up' query against Thanos at: %s", thanosHost)
|
|
|
-
|
|
|
- thanosClient = thanosCli
|
|
|
- }
|
|
|
-
|
|
|
- thanosContexts = NewContextFactory(thanosClient, thanosConfig.OpenCostPrometheusConfig)
|
|
|
- } else {
|
|
|
- log.Infof("Error resolving environment variable: $%s", env.ThanosQueryUrlEnvVar)
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
// metadata creation for cluster info
|
|
|
- thanosEnabled := thanosClient != nil
|
|
|
metadata := map[string]string{
|
|
|
- clusters.ClusterInfoThanosEnabledKey: fmt.Sprintf("%t", thanosEnabled),
|
|
|
- }
|
|
|
- if thanosEnabled {
|
|
|
- metadata[clusters.ClusterInfoThanosOffsetKey] = thanosConfig.Offset
|
|
|
+ clusters.ClusterInfoThanosEnabledKey: "false",
|
|
|
}
|
|
|
|
|
|
// cluster info provider
|
|
|
clusterInfoProvider := clusters.NewClusterInfoDecorator(infoProvider, metadata)
|
|
|
-
|
|
|
- var clusterMap clusters.ClusterMap
|
|
|
- if thanosEnabled {
|
|
|
- clusterMap = newPrometheusClusterMap(thanosContexts, clusterInfoProvider, 10*time.Minute)
|
|
|
- } else {
|
|
|
- clusterMap = newPrometheusClusterMap(promContexts, clusterInfoProvider, 5*time.Minute)
|
|
|
- }
|
|
|
+ clusterMap := newPrometheusClusterMap(promContexts, clusterInfoProvider, 5*time.Minute)
|
|
|
|
|
|
// create metrics querier implementation for prometheus and thanos
|
|
|
metricsQuerier := newPrometheusMetricsQuerier(
|
|
|
promConfig,
|
|
|
promClient,
|
|
|
promContexts,
|
|
|
- thanosConfig,
|
|
|
- thanosClient,
|
|
|
- thanosContexts,
|
|
|
)
|
|
|
|
|
|
return &PrometheusDataSource{
|
|
|
promConfig: promConfig,
|
|
|
promClient: promClient,
|
|
|
promContexts: promContexts,
|
|
|
- thanosConfig: thanosConfig,
|
|
|
- thanosClient: thanosClient,
|
|
|
- thanosContexts: thanosContexts,
|
|
|
metricsQuerier: metricsQuerier,
|
|
|
clusterMap: clusterMap,
|
|
|
clusterInfo: clusterInfoProvider,
|
|
|
@@ -388,82 +332,6 @@ func (pds *PrometheusDataSource) prometheusQueryRange(w http.ResponseWriter, r *
|
|
|
w.Write(body)
|
|
|
}
|
|
|
|
|
|
-// thanosQuery is a proxy for /query against thanos
|
|
|
-func (pds *PrometheusDataSource) thanosQuery(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
|
|
|
- w.Header().Set("Content-Type", "application/json")
|
|
|
- w.Header().Set("Access-Control-Allow-Origin", "*")
|
|
|
-
|
|
|
- if pds.thanosClient == nil {
|
|
|
- proto.WriteResponse(w, proto.ToResponse(nil, fmt.Errorf("ThanosDisabled")))
|
|
|
- return
|
|
|
- }
|
|
|
-
|
|
|
- qp := httputil.NewQueryParams(r.URL.Query())
|
|
|
- query := qp.Get("query", "")
|
|
|
- if query == "" {
|
|
|
- proto.WriteResponse(w, proto.ToResponse(nil, fmt.Errorf("Query Parameter 'query' is unset'")))
|
|
|
- return
|
|
|
- }
|
|
|
-
|
|
|
- // Attempt to parse time as either a unix timestamp or as an RFC3339 value
|
|
|
- var timeVal time.Time
|
|
|
- timeStr := qp.Get("time", "")
|
|
|
- if len(timeStr) > 0 {
|
|
|
- if t, err := strconv.ParseInt(timeStr, 10, 64); err == nil {
|
|
|
- timeVal = time.Unix(t, 0)
|
|
|
- } else if t, err := time.Parse(time.RFC3339, timeStr); err == nil {
|
|
|
- timeVal = t
|
|
|
- }
|
|
|
-
|
|
|
- // If time is given, but not parse-able, return an error
|
|
|
- if timeVal.IsZero() {
|
|
|
- http.Error(w, fmt.Sprintf("time must be a unix timestamp or RFC3339 value; illegal value given: %s", timeStr), http.StatusBadRequest)
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- ctx := pds.thanosContexts.NewNamedContext(FrontendContextName)
|
|
|
- body, err := ctx.RawQuery(query, timeVal)
|
|
|
- if err != nil {
|
|
|
- proto.WriteResponse(w, proto.ToResponse(nil, fmt.Errorf("Error running query %s. Error: %s", query, err)))
|
|
|
- return
|
|
|
- }
|
|
|
-
|
|
|
- w.Write(body)
|
|
|
-}
|
|
|
-
|
|
|
-// thanosQueryRange is a proxy for /query_range against thanos
|
|
|
-func (pds *PrometheusDataSource) thanosQueryRange(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
|
|
|
- w.Header().Set("Content-Type", "application/json")
|
|
|
- w.Header().Set("Access-Control-Allow-Origin", "*")
|
|
|
-
|
|
|
- if pds.thanosClient == nil {
|
|
|
- proto.WriteResponse(w, proto.ToResponse(nil, fmt.Errorf("ThanosDisabled")))
|
|
|
- return
|
|
|
- }
|
|
|
-
|
|
|
- qp := httputil.NewQueryParams(r.URL.Query())
|
|
|
- query := qp.Get("query", "")
|
|
|
- if query == "" {
|
|
|
- fmt.Fprintf(w, "Error parsing query from request parameters.")
|
|
|
- return
|
|
|
- }
|
|
|
-
|
|
|
- start, end, duration, err := toStartEndStep(qp)
|
|
|
- if err != nil {
|
|
|
- fmt.Fprintf(w, "error: %s", err)
|
|
|
- return
|
|
|
- }
|
|
|
-
|
|
|
- ctx := pds.thanosContexts.NewNamedContext(FrontendContextName)
|
|
|
- body, err := ctx.RawQueryRange(query, start, end, duration)
|
|
|
- if err != nil {
|
|
|
- fmt.Fprintf(w, "Error running query %s. Error: %s", query, err)
|
|
|
- return
|
|
|
- }
|
|
|
-
|
|
|
- w.Write(body)
|
|
|
-}
|
|
|
-
|
|
|
// promtheusQueueState returns the current state of the prometheus and thanos request queues
|
|
|
func (pds *PrometheusDataSource) prometheusQueueState(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
|
@@ -479,15 +347,6 @@ func (pds *PrometheusDataSource) prometheusQueueState(w http.ResponseWriter, _ *
|
|
|
"prometheus": promQueueState,
|
|
|
}
|
|
|
|
|
|
- if pds.thanosClient != nil {
|
|
|
- thanosQueueState, err := GetPrometheusQueueState(pds.thanosClient, pds.thanosConfig.OpenCostPrometheusConfig)
|
|
|
- if err != nil {
|
|
|
- log.Warnf("Error getting Thanos queue state: %s", err)
|
|
|
- } else {
|
|
|
- result["thanos"] = thanosQueueState
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
proto.WriteResponse(w, proto.ToResponse(result, nil))
|
|
|
}
|
|
|
|
|
|
@@ -502,11 +361,6 @@ func (pds *PrometheusDataSource) prometheusMetrics(w http.ResponseWriter, _ *htt
|
|
|
"prometheus": promMetrics,
|
|
|
}
|
|
|
|
|
|
- if pds.thanosClient != nil {
|
|
|
- thanosMetrics := GetPrometheusMetrics(pds.thanosClient, pds.thanosConfig.OpenCostPrometheusConfig, pds.thanosConfig.Offset)
|
|
|
- result["thanos"] = thanosMetrics
|
|
|
- }
|
|
|
-
|
|
|
proto.WriteResponse(w, proto.ToResponse(result, nil))
|
|
|
}
|
|
|
|
|
|
@@ -522,18 +376,6 @@ func (pds *PrometheusDataSource) PrometheusContexts() *ContextFactory {
|
|
|
return pds.promContexts
|
|
|
}
|
|
|
|
|
|
-func (pds *PrometheusDataSource) ThanosClient() prometheus.Client {
|
|
|
- return pds.thanosClient
|
|
|
-}
|
|
|
-
|
|
|
-func (pds *PrometheusDataSource) ThanosConfig() *OpenCostThanosConfig {
|
|
|
- return pds.thanosConfig
|
|
|
-}
|
|
|
-
|
|
|
-func (pds *PrometheusDataSource) ThanosContexts() *ContextFactory {
|
|
|
- return pds.thanosContexts
|
|
|
-}
|
|
|
-
|
|
|
func (pds *PrometheusDataSource) RegisterEndPoints(router *httprouter.Router) {
|
|
|
// endpoints migrated from server
|
|
|
router.GET("/validatePrometheus", pds.prometheusMetadata)
|
|
|
@@ -545,8 +387,6 @@ func (pds *PrometheusDataSource) RegisterEndPoints(router *httprouter.Router) {
|
|
|
// prom query proxies
|
|
|
router.GET("/prometheusQuery", pds.prometheusQuery)
|
|
|
router.GET("/prometheusQueryRange", pds.prometheusQueryRange)
|
|
|
- router.GET("/thanosQuery", pds.thanosQuery)
|
|
|
- router.GET("/thanosQueryRange", pds.thanosQueryRange)
|
|
|
|
|
|
// diagnostics
|
|
|
router.GET("/diagnostics/requestQueue", pds.prometheusQueueState)
|