소스 검색

pull scrape interval from prometheus

Ajay Tripathy 5 년 전
부모
커밋
d2722406e0
4개의 변경된 파일69개의 추가작업 그리고 14개의 파일을 삭제
  1. 1 0
      go.mod
  2. 17 13
      pkg/costmodel/costmodel.go
  3. 22 0
      pkg/costmodel/promparsers.go
  4. 29 1
      pkg/costmodel/router.go

+ 1 - 0
go.mod

@@ -27,6 +27,7 @@ require (
 	golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
 	golang.org/x/sync v0.0.0-20190423024810-112230192c58
 	google.golang.org/api v0.4.0
+	gopkg.in/yaml.v2 v2.2.4
 	k8s.io/api v0.0.0-20190913080256-21721929cffa
 	k8s.io/apimachinery v0.0.0-20190913075812-e119e5e154b6
 	k8s.io/client-go v0.0.0-20190620085101-78d2af792bab

+ 17 - 13
pkg/costmodel/costmodel.go

@@ -47,19 +47,21 @@ const (
 var isCron = regexp.MustCompile(`^(.+)-\d{10}$`)
 
 type CostModel struct {
-	Cache        clustercache.ClusterCache
-	ClusterMap   clusters.ClusterMap
-	RequestGroup *singleflight.Group
+	Cache          clustercache.ClusterCache
+	ClusterMap     clusters.ClusterMap
+	ScrapeInterval time.Duration
+	RequestGroup   *singleflight.Group
 }
 
-func NewCostModel(cache clustercache.ClusterCache, clusterMap clusters.ClusterMap) *CostModel {
+func NewCostModel(cache clustercache.ClusterCache, clusterMap clusters.ClusterMap, scrapeInterval time.Duration) *CostModel {
 	// request grouping to prevent over-requesting the same data prior to caching
 	requestGroup := new(singleflight.Group)
 
 	return &CostModel{
-		Cache:        cache,
-		ClusterMap:   clusterMap,
-		RequestGroup: requestGroup,
+		Cache:          cache,
+		ClusterMap:     clusterMap,
+		RequestGroup:   requestGroup,
+		ScrapeInterval: scrapeInterval,
 	}
 }
 
@@ -189,7 +191,7 @@ const (
 		label_replace(label_replace(
 			sum(
 				sum_over_time(container_memory_allocation_bytes{container!="",container!="POD", node!=""}[%s])
-			) by (namespace,container,pod,node,cluster_id) * (scalar(avg(prometheus_target_interval_length_seconds)) / 60 / 60)
+			) by (namespace,container,pod,node,cluster_id) * %f / 60 / 60
 		, "container_name","$1","container","(.+)"), "pod_name","$1","pod","(.+)")`
 	// queryCPUAllocationVCPUHours yields the total VCPU-hour CPU allocation over the given
 	// window, aggregated by container.
@@ -201,11 +203,11 @@ const (
 		label_replace(label_replace(
 			sum(
 				sum_over_time(container_cpu_allocation{container!="",container!="POD", node!=""}[%s])
-			) by (namespace,container,pod,node,cluster_id) * (scalar(avg(prometheus_target_interval_length_seconds)) / 60 / 60)
+			) by (namespace,container,pod,node,cluster_id) * %f / 60 / 60
 		, "container_name","$1","container","(.+)"), "pod_name","$1","pod","(.+)")`
 	// queryPVCAllocationFmt yields the total byte-hour PVC allocation over the given window.
 	// sum_over_time(each byte) = [byte*scrape] by metric *(scalar(avg(prometheus_target_interval_length_seconds)) = [seconds/scrape] / 60 / 60 =  [hours/scrape] by pod
-	queryPVCAllocationFmt     = `sum(sum_over_time(pod_pvc_allocation[%s])) by (cluster_id, namespace, pod, persistentvolume, persistentvolumeclaim) * scalar(avg(prometheus_target_interval_length_seconds)/60/60)`
+	queryPVCAllocationFmt     = `sum(sum_over_time(pod_pvc_allocation[%s])) by (cluster_id, namespace, pod, persistentvolume, persistentvolumeclaim) * %f/60/60`
 	queryPVHourlyCostFmt      = `avg_over_time(pv_hourly_cost[%s])`
 	queryNSLabels             = `avg_over_time(kube_namespace_labels[%s])`
 	queryPodLabels            = `avg_over_time(kube_pod_labels[%s])`
@@ -1497,17 +1499,19 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 		return CostDataRangeFromSQL("", "", windowString, remoteStartStr, remoteEndStr)
 	}
 
+	scrapeIntervalSeconds := cm.ScrapeInterval.Seconds()
+
 	ctx := prom.NewContext(cli)
 
-	queryRAMAlloc := fmt.Sprintf(queryRAMAllocationByteHours, windowString)
-	queryCPUAlloc := fmt.Sprintf(queryCPUAllocationVCPUHours, windowString)
+	queryRAMAlloc := fmt.Sprintf(queryRAMAllocationByteHours, windowString, scrapeIntervalSeconds)
+	queryCPUAlloc := fmt.Sprintf(queryCPUAllocationVCPUHours, windowString, scrapeIntervalSeconds)
 	queryRAMRequests := fmt.Sprintf(queryRAMRequestsStr, windowString, "", windowString, "")
 	queryRAMUsage := fmt.Sprintf(queryRAMUsageStr, windowString, "", windowString, "")
 	queryCPURequests := fmt.Sprintf(queryCPURequestsStr, windowString, "", windowString, "")
 	queryCPUUsage := fmt.Sprintf(queryCPUUsageStr, windowString, "")
 	queryGPURequests := fmt.Sprintf(queryGPURequestsStr, windowString, "", windowString, "", resolutionHours, windowString, "")
 	queryPVRequests := fmt.Sprintf(queryPVRequestsStr)
-	queryPVCAllocation := fmt.Sprintf(queryPVCAllocationFmt, windowString)
+	queryPVCAllocation := fmt.Sprintf(queryPVCAllocationFmt, windowString, scrapeIntervalSeconds)
 	queryPVHourlyCost := fmt.Sprintf(queryPVHourlyCostFmt, windowString)
 	queryNetZoneRequests := fmt.Sprintf(queryZoneNetworkUsage, windowString, "")
 	queryNetRegionRequests := fmt.Sprintf(queryRegionNetworkUsage, windowString, "")

+ 22 - 0
pkg/costmodel/promparsers.go

@@ -8,8 +8,30 @@ import (
 	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/prom"
 	"github.com/kubecost/cost-model/pkg/util"
+	"gopkg.in/yaml.v2"
 )
 
+const DEFAULT_KUBECOST_JOB_NAME = "kubecost"
+
+type ScrapeConfig struct {
+	JobName        string `yaml:"job_name,omitempty"`
+	ScrapeInterval string `yaml:"scrape_interval,omitempty"`
+}
+
+type PromCfg struct {
+	ScrapeConfigs []ScrapeConfig `yaml:"scrape_configs,omitempty"`
+}
+
+func GetPrometheusConfig(pcfg string) (PromCfg, error) {
+	var promCfg PromCfg
+	err := yaml.Unmarshal([]byte(pcfg), &promCfg)
+	return promCfg, err
+}
+
+func GetKubecostJobName() string {
+	return DEFAULT_KUBECOST_JOB_NAME // TODO: look this up from a prometheus variable?
+}
+
 // TODO niko/prom move parsing functions from costmodel.go
 
 func GetPVInfo(qrs []*prom.QueryResult, defaultClusterID string) (map[string]*PersistentVolumeClaimData, error) {

+ 29 - 1
pkg/costmodel/router.go

@@ -744,8 +744,36 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 
 	timeout := 120 * time.Second
 	keepAlive := 120 * time.Second
+	scrapeInterval, _ := time.ParseDuration("1m")
 
 	promCli, _ := prom.NewPrometheusClient(address, timeout, keepAlive, queryConcurrency, "")
+
+	api := prometheusAPI.NewAPI(promCli)
+	pcfg, err := api.Config(context.Background())
+	if err != nil {
+		klog.Infof("No valid prometheus config file at %s. Error: %s . Troubleshooting help available at: %s. Ignore if using cortex/thanos here.", address, err.Error(), prometheusTroubleshootingEp)
+	} else {
+		klog.V(1).Info("Retrieved a prometheus config file from: " + address)
+		sc, err := GetPrometheusConfig(pcfg.YAML)
+		if err != nil {
+			klog.Infof("Fix YAML error %s", err)
+		}
+		for _, scrapeconfig := range sc.ScrapeConfigs {
+			if scrapeconfig.JobName == GetKubecostJobName() {
+				if scrapeconfig.ScrapeInterval != "" {
+					si := scrapeconfig.ScrapeInterval
+					sid, err := time.ParseDuration(si)
+					if err != nil {
+						klog.Infof("error parseing scrapeConfig for %s", scrapeconfig.JobName)
+					} else {
+						klog.Infof("Found Kubecost job scrape interval of: %s", si)
+						scrapeInterval = sid
+					}
+				}
+			}
+		}
+	}
+
 	m, err := prom.Validate(promCli)
 	if err != nil || m.Running == false {
 		if err != nil {
@@ -1001,7 +1029,7 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 		PersistentVolumePriceRecorder: pvGv,
 		ClusterManagementCostRecorder: ClusterManagementCostRecorder,
 		LBCostRecorder:                LBCostRecorder,
-		Model:                         NewCostModel(k8sCache, clusterMap),
+		Model:                         NewCostModel(k8sCache, clusterMap, scrapeInterval),
 		OutOfClusterCache:             outOfClusterCache,
 	}