Explorar el Código

Sth/kcm 4144 (#3175)

Signed-off-by: Sean Holcomb <seanholcomb@gmail.com>
Sean Holcomb hace 11 meses
padre
commit
6af6acce72

+ 1 - 1
modules/collector-source/pkg/collector/collector.go

@@ -170,7 +170,7 @@ func NewPVCInfoMetricCollector() *metric.MetricCollector {
 			source.PVCLabel,
 			source.StorageClassLabel,
 		},
-		aggregator.Info,
+		aggregator.ActiveMinutes,
 		func(labels map[string]string) bool {
 			return labels[source.VolumeNameLabel] != ""
 		},

+ 15 - 1
modules/collector-source/pkg/metric/repository.go

@@ -134,15 +134,29 @@ func (r *resolutionStores) update(
 		)
 		return
 	}
-	key := r.resolution.Get(updateSet.Timestamp).UnixMilli()
+
+	resolutionStart := r.resolution.Get(updateSet.Timestamp)
+	key := resolutionStart.UnixMilli()
+
 	collector, ok := r.collectors[key]
 	if !ok {
 		collector = r.factory()
 		r.collectors[key] = collector
 	}
+
 	for _, update := range updateSet.Updates {
 		collector.Update(update.Name, update.Labels, update.Value, updateSet.Timestamp, update.AdditionalInfo)
 	}
+
+	// check if update needs to be applied to previous collector, because some aggregators are inclusive
+	if resolutionStart.Equal(updateSet.Timestamp) {
+		prevKey := r.resolution.Get(updateSet.Timestamp.Add(-1)).UnixMilli()
+		if prevCollector, ok := r.collectors[prevKey]; ok {
+			for _, update := range updateSet.Updates {
+				prevCollector.Update(update.Name, update.Labels, update.Value, updateSet.Timestamp, update.AdditionalInfo)
+			}
+		}
+	}
 }
 
 func (r *resolutionStores) getCollector(t time.Time) (MetricStore, error) {

+ 35 - 21
modules/collector-source/pkg/scrape/statsummary.go

@@ -71,29 +71,21 @@ func (s *StatSummaryScraper) Scrape() []metric.Update {
 			podUID := pod.PodRef.UID
 
 			if pod.Network != nil {
-				if pod.Network.RxBytes != nil {
-					scrapeResults = append(scrapeResults, metric.Update{
-						Name: ContainerNetworkReceiveBytesTotal,
-						Labels: map[string]string{
-							source.UIDLabel:       podUID,
-							source.PodLabel:       podName,
-							source.NamespaceLabel: namespace,
-						},
-						Value: float64(*pod.Network.RxBytes),
-					})
+				networkLabels := map[string]string{
+					source.UIDLabel:       podUID,
+					source.PodLabel:       podName,
+					source.NamespaceLabel: namespace,
 				}
-
-				if pod.Network.TxBytes != nil {
-					scrapeResults = append(scrapeResults, metric.Update{
-						Name: ContainerNetworkTransmitBytesTotal,
-						Labels: map[string]string{
-							source.UIDLabel:       podUID,
-							source.PodLabel:       podName,
-							source.NamespaceLabel: namespace,
-						},
-						Value: float64(*pod.Network.TxBytes),
-					})
+				// The network may contain a list of stats or itself be a single stat, if the list is not present
+				// scrape the object itself
+				if pod.Network.Interfaces != nil {
+					for _, networkStat := range pod.Network.Interfaces {
+						scrapeNetworkStats(&scrapeResults, networkLabels, networkStat)
+					}
+				} else {
+					scrapeNetworkStats(&scrapeResults, networkLabels, pod.Network.InterfaceStats)
 				}
+
 			}
 
 			for _, volumeStats := range pod.VolumeStats {
@@ -157,3 +149,25 @@ func (s *StatSummaryScraper) Scrape() []metric.Update {
 	}
 	return scrapeResults
 }
+
+func scrapeNetworkStats(scrapeResults *[]metric.Update, labels map[string]string, networkStats stats.InterfaceStats) {
+	// Skip stats for cni0 which tracks internal cluster traffic
+	if networkStats.Name == "cni0" {
+		return
+	}
+	if networkStats.RxBytes != nil {
+		*scrapeResults = append(*scrapeResults, metric.Update{
+			Name:   ContainerNetworkReceiveBytesTotal,
+			Labels: labels,
+			Value:  float64(*networkStats.RxBytes),
+		})
+	}
+
+	if networkStats.TxBytes != nil {
+		*scrapeResults = append(*scrapeResults, metric.Update{
+			Name:   ContainerNetworkTransmitBytesTotal,
+			Labels: labels,
+			Value:  float64(*networkStats.TxBytes),
+		})
+	}
+}

+ 40 - 30
modules/prometheus-source/pkg/prom/metricsquerier.go

@@ -905,17 +905,18 @@ func (pds *PrometheusMetricsQuerier) QueryPVInfo(start, end time.Time) *source.F
 }
 
 func (pds *PrometheusMetricsQuerier) QueryNetZoneGiB(start, end time.Time) *source.Future[source.NetZoneGiBResult] {
-	const queryFmtNetZoneGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", same_zone="false", same_region="true", %s}[%s])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
+	const queryFmtNetZoneGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", same_zone="false", same_region="true", %s}[%s:%dm])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
 	// env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel()
 
 	cfg := pds.promConfig
+	minsPerResolution := cfg.DataResolutionMinutes
 
-	durStr := timeutil.DurationString(end.Sub(start))
+	durStr := pds.durationStringFor(start, end, minsPerResolution)
 	if durStr == "" {
 		panic("failed to parse duration string passed to QueryNetZoneGiB")
 	}
 
-	queryNetZoneGiB := fmt.Sprintf(queryFmtNetZoneGiB, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
+	queryNetZoneGiB := fmt.Sprintf(queryFmtNetZoneGiB, cfg.ClusterFilter, durStr, minsPerResolution, cfg.ClusterLabel)
 	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
 	return source.NewFuture(source.DecodeNetZoneGiBResult, ctx.QueryAtTime(queryNetZoneGiB, end))
 }
@@ -937,17 +938,18 @@ func (pds *PrometheusMetricsQuerier) QueryNetZonePricePerGiB(start, end time.Tim
 }
 
 func (pds *PrometheusMetricsQuerier) QueryNetRegionGiB(start, end time.Time) *source.Future[source.NetRegionGiBResult] {
-	const queryFmtNetRegionGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", same_zone="false", same_region="false", %s}[%s])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
+	const queryFmtNetRegionGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", same_zone="false", same_region="false", %s}[%s:%dm])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
 	// env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel()
 
 	cfg := pds.promConfig
+	minsPerResolution := cfg.DataResolutionMinutes
 
-	durStr := timeutil.DurationString(end.Sub(start))
+	durStr := pds.durationStringFor(start, end, minsPerResolution)
 	if durStr == "" {
 		panic("failed to parse duration string passed to QueryNetRegionGiB")
 	}
 
-	queryNetRegionGiB := fmt.Sprintf(queryFmtNetRegionGiB, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
+	queryNetRegionGiB := fmt.Sprintf(queryFmtNetRegionGiB, cfg.ClusterFilter, durStr, minsPerResolution, cfg.ClusterLabel)
 	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
 	return source.NewFuture(source.DecodeNetRegionGiBResult, ctx.QueryAtTime(queryNetRegionGiB, end))
 }
@@ -969,17 +971,18 @@ func (pds *PrometheusMetricsQuerier) QueryNetRegionPricePerGiB(start, end time.T
 }
 
 func (pds *PrometheusMetricsQuerier) QueryNetInternetGiB(start, end time.Time) *source.Future[source.NetInternetGiBResult] {
-	const queryFmtNetInternetGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="true", %s}[%s])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
+	const queryFmtNetInternetGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="true", %s}[%s:%dm])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
 	// env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel()
 
 	cfg := pds.promConfig
+	minsPerResolution := cfg.DataResolutionMinutes
 
-	durStr := timeutil.DurationString(end.Sub(start))
+	durStr := pds.durationStringFor(start, end, minsPerResolution)
 	if durStr == "" {
 		panic("failed to parse duration string passed to QueryNetInternetGiB")
 	}
 
-	queryNetInternetGiB := fmt.Sprintf(queryFmtNetInternetGiB, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
+	queryNetInternetGiB := fmt.Sprintf(queryFmtNetInternetGiB, cfg.ClusterFilter, durStr, minsPerResolution, cfg.ClusterLabel)
 	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
 	return source.NewFuture(source.DecodeNetInternetGiBResult, ctx.QueryAtTime(queryNetInternetGiB, end))
 }
@@ -1001,111 +1004,118 @@ func (pds *PrometheusMetricsQuerier) QueryNetInternetPricePerGiB(start, end time
 }
 
 func (pds *PrometheusMetricsQuerier) QueryNetInternetServiceGiB(start, end time.Time) *source.Future[source.NetInternetServiceGiBResult] {
-	const queryFmtNetInternetGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="true", %s}[%s])) by (pod_name, namespace, service, %s) / 1024 / 1024 / 1024`
+	const queryFmtNetInternetGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="true", %s}[%s:%dm])) by (pod_name, namespace, service, %s) / 1024 / 1024 / 1024`
 	// env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel()
 
 	cfg := pds.promConfig
+	minsPerResolution := cfg.DataResolutionMinutes
 
-	durStr := timeutil.DurationString(end.Sub(start))
+	durStr := pds.durationStringFor(start, end, minsPerResolution)
 	if durStr == "" {
 		panic("failed to parse duration string passed to QueryNetInternetGiB")
 	}
 
-	queryNetInternetGiB := fmt.Sprintf(queryFmtNetInternetGiB, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
+	queryNetInternetGiB := fmt.Sprintf(queryFmtNetInternetGiB, cfg.ClusterFilter, durStr, minsPerResolution, cfg.ClusterLabel)
 	ctx := pds.promContexts.NewNamedContext(NetworkInsightsContextName)
 	return source.NewFuture(source.DecodeNetInternetServiceGiBResult, ctx.QueryAtTime(queryNetInternetGiB, end))
 }
 
 func (pds *PrometheusMetricsQuerier) QueryNetTransferBytes(start, end time.Time) *source.Future[source.NetTransferBytesResult] {
-	const queryFmtNetTransferBytes = `sum(increase(container_network_transmit_bytes_total{pod!="", %s}[%s])) by (pod_name, pod, namespace, %s)`
+	const queryFmtNetTransferBytes = `sum(increase(container_network_transmit_bytes_total{pod!="", %s}[%s:%dm])) by (pod_name, pod, namespace, %s)`
 	// env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
 
 	cfg := pds.promConfig
+	minsPerResolution := cfg.DataResolutionMinutes
 
-	durStr := timeutil.DurationString(end.Sub(start))
+	durStr := pds.durationStringFor(start, end, minsPerResolution)
 	if durStr == "" {
 		panic("failed to parse duration string passed to QueryNetTransferBytes")
 	}
 
-	queryNetTransferBytes := fmt.Sprintf(queryFmtNetTransferBytes, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
+	queryNetTransferBytes := fmt.Sprintf(queryFmtNetTransferBytes, cfg.ClusterFilter, durStr, minsPerResolution, cfg.ClusterLabel)
 	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
 	return source.NewFuture(source.DecodeNetTransferBytesResult, ctx.QueryAtTime(queryNetTransferBytes, end))
 }
 
 func (pds *PrometheusMetricsQuerier) QueryNetZoneIngressGiB(start, end time.Time) *source.Future[source.NetZoneIngressGiBResult] {
-	const queryFmtIngNetZoneGiB = `sum(increase(kubecost_pod_network_ingress_bytes_total{internet="false", same_zone="false", same_region="true", %s}[%s])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
+	const queryFmtIngNetZoneGiB = `sum(increase(kubecost_pod_network_ingress_bytes_total{internet="false", same_zone="false", same_region="true", %s}[%s:%dm])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
 
 	cfg := pds.promConfig
+	minsPerResolution := cfg.DataResolutionMinutes
 
-	durStr := timeutil.DurationString(end.Sub(start))
+	durStr := pds.durationStringFor(start, end, minsPerResolution)
 	if durStr == "" {
 		panic("failed to parse duration string passed to QueryNetZoneIngressGiB")
 	}
 
-	queryNetZoneCostPerGiB := fmt.Sprintf(queryFmtIngNetZoneGiB, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
+	queryNetZoneCostPerGiB := fmt.Sprintf(queryFmtIngNetZoneGiB, cfg.ClusterFilter, durStr, minsPerResolution, cfg.ClusterLabel)
 	ctx := pds.promContexts.NewNamedContext(NetworkInsightsContextName)
 	return source.NewFuture(source.DecodeNetZoneIngressGiBResult, ctx.QueryAtTime(queryNetZoneCostPerGiB, end))
 }
 
 func (pds *PrometheusMetricsQuerier) QueryNetRegionIngressGiB(start, end time.Time) *source.Future[source.NetRegionIngressGiBResult] {
-	const queryFmtIngNetRegionGiB = `sum(increase(kubecost_pod_network_ingress_bytes_total{internet="false", same_zone="false", same_region="false", %s}[%s])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
+	const queryFmtIngNetRegionGiB = `sum(increase(kubecost_pod_network_ingress_bytes_total{internet="false", same_zone="false", same_region="false", %s}[%s:%dm])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
 
 	cfg := pds.promConfig
+	minsPerResolution := cfg.DataResolutionMinutes
 
-	durStr := timeutil.DurationString(end.Sub(start))
+	durStr := pds.durationStringFor(start, end, minsPerResolution)
 	if durStr == "" {
 		panic("failed to parse duration string passed to QueryNetRegionIngressGiB")
 	}
 
-	queryNetRegionIngGiB := fmt.Sprintf(queryFmtIngNetRegionGiB, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
+	queryNetRegionIngGiB := fmt.Sprintf(queryFmtIngNetRegionGiB, cfg.ClusterFilter, durStr, minsPerResolution, cfg.ClusterLabel)
 	ctx := pds.promContexts.NewNamedContext(NetworkInsightsContextName)
 	return source.NewFuture(source.DecodeNetRegionIngressGiBResult, ctx.QueryAtTime(queryNetRegionIngGiB, end))
 }
 
 func (pds *PrometheusMetricsQuerier) QueryNetInternetIngressGiB(start, end time.Time) *source.Future[source.NetInternetIngressGiBResult] {
-	const queryFmtNetIngInternetGiB = `sum(increase(kubecost_pod_network_ingress_bytes_total{internet="true", %s}[%s])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
+	const queryFmtNetIngInternetGiB = `sum(increase(kubecost_pod_network_ingress_bytes_total{internet="true", %s}[%s:%dm])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
 	// env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel()
 
 	cfg := pds.promConfig
+	minsPerResolution := cfg.DataResolutionMinutes
 
-	durStr := timeutil.DurationString(end.Sub(start))
+	durStr := pds.durationStringFor(start, end, minsPerResolution)
 	if durStr == "" {
 		panic("failed to parse duration string passed to QueryNetInternetIngressGiB")
 	}
 
-	queryNetIngInternetGiB := fmt.Sprintf(queryFmtNetIngInternetGiB, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
+	queryNetIngInternetGiB := fmt.Sprintf(queryFmtNetIngInternetGiB, cfg.ClusterFilter, durStr, minsPerResolution, cfg.ClusterLabel)
 	ctx := pds.promContexts.NewNamedContext(NetworkInsightsContextName)
 	return source.NewFuture(source.DecodeNetInternetIngressGiBResult, ctx.QueryAtTime(queryNetIngInternetGiB, end))
 }
 
 func (pds *PrometheusMetricsQuerier) QueryNetInternetServiceIngressGiB(start, end time.Time) *source.Future[source.NetInternetServiceIngressGiBResult] {
-	const queryFmtIngNetInternetGiB = `sum(increase(kubecost_pod_network_ingress_bytes_total{internet="true", %s}[%s])) by (pod_name, namespace, service, %s) / 1024 / 1024 / 1024`
+	const queryFmtIngNetInternetGiB = `sum(increase(kubecost_pod_network_ingress_bytes_total{internet="true", %s}[%s:%dm])) by (pod_name, namespace, service, %s) / 1024 / 1024 / 1024`
 	// env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel()
 
 	cfg := pds.promConfig
+	minsPerResolution := cfg.DataResolutionMinutes
 
-	durStr := timeutil.DurationString(end.Sub(start))
+	durStr := pds.durationStringFor(start, end, minsPerResolution)
 	if durStr == "" {
 		panic("failed to parse duration string passed to QueryNetInternetServiceIngressGiB")
 	}
 
-	queryNetIngInternetGiB := fmt.Sprintf(queryFmtIngNetInternetGiB, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
+	queryNetIngInternetGiB := fmt.Sprintf(queryFmtIngNetInternetGiB, cfg.ClusterFilter, durStr, minsPerResolution, cfg.ClusterLabel)
 	ctx := pds.promContexts.NewNamedContext(NetworkInsightsContextName)
 	return source.NewFuture(source.DecodeNetInternetServiceIngressGiBResult, ctx.QueryAtTime(queryNetIngInternetGiB, end))
 }
 
 func (pds *PrometheusMetricsQuerier) QueryNetReceiveBytes(start, end time.Time) *source.Future[source.NetReceiveBytesResult] {
-	const queryFmtNetReceiveBytes = `sum(increase(container_network_receive_bytes_total{pod!="", %s}[%s])) by (pod_name, pod, namespace, %s)`
+	const queryFmtNetReceiveBytes = `sum(increase(container_network_receive_bytes_total{pod!="", %s}[%s:%dm])) by (pod_name, pod, namespace, %s)`
 	// env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
 
 	cfg := pds.promConfig
+	minsPerResolution := cfg.DataResolutionMinutes
 
-	durStr := timeutil.DurationString(end.Sub(start))
+	durStr := pds.durationStringFor(start, end, minsPerResolution)
 	if durStr == "" {
 		panic("failed to parse duration string passed to QueryNetReceiveBytes")
 	}
 
-	queryNetReceiveBytes := fmt.Sprintf(queryFmtNetReceiveBytes, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
+	queryNetReceiveBytes := fmt.Sprintf(queryFmtNetReceiveBytes, cfg.ClusterFilter, durStr, minsPerResolution, cfg.ClusterLabel)
 	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
 	return source.NewFuture(source.DecodeNetReceiveBytesResult, ctx.QueryAtTime(queryNetReceiveBytes, end))
 }

+ 3 - 3
pkg/costmodel/allocation_helpers.go

@@ -1897,11 +1897,11 @@ func (cm *CostModel) getNodePricing(nodeMap map[nodeKey]*nodePricing, nodeKey no
 	// them as strings like this?
 
 	if node.CostPerCPUHr == 0 || math.IsNaN(node.CostPerCPUHr) {
-		log.Warnf("CostModel: node pricing has illegal CostPerCPUHr; replacing with custom pricing: %s", nodeKey)
 		cpuCostStr := customPricingConfig.CPU
 		if node.Preemptible {
 			cpuCostStr = customPricingConfig.SpotCPU
 		}
+		log.Warnf("CostModel: node pricing has illegal CostPerCPUHr; replacing with custom pricing: %s %s", nodeKey, cpuCostStr)
 		costPerCPUHr, err := strconv.ParseFloat(cpuCostStr, 64)
 		if err != nil {
 			log.Warnf("CostModel: custom pricing has illegal CPU cost: %s", cpuCostStr)
@@ -1911,11 +1911,11 @@ func (cm *CostModel) getNodePricing(nodeMap map[nodeKey]*nodePricing, nodeKey no
 	}
 
 	if math.IsNaN(node.CostPerGPUHr) {
-		log.Warnf("CostModel: node pricing has illegal CostPerGPUHr; replacing with custom pricing: %s", nodeKey)
 		gpuCostStr := customPricingConfig.GPU
 		if node.Preemptible {
 			gpuCostStr = customPricingConfig.SpotGPU
 		}
+		log.Warnf("CostModel: node pricing has illegal CostPerGPUHr; replacing with custom pricing: %s %s", nodeKey, gpuCostStr)
 		costPerGPUHr, err := strconv.ParseFloat(gpuCostStr, 64)
 		if err != nil {
 			log.Warnf("CostModel: custom pricing has illegal GPU cost: %s", gpuCostStr)
@@ -1925,11 +1925,11 @@ func (cm *CostModel) getNodePricing(nodeMap map[nodeKey]*nodePricing, nodeKey no
 	}
 
 	if node.CostPerRAMGiBHr == 0 || math.IsNaN(node.CostPerRAMGiBHr) {
-		log.Warnf("CostModel: node pricing has illegal CostPerRAMHr; replacing with custom pricing: %s", nodeKey)
 		ramCostStr := customPricingConfig.RAM
 		if node.Preemptible {
 			ramCostStr = customPricingConfig.SpotRAM
 		}
+		log.Warnf("CostModel: node pricing has illegal CostPerRAMHr; replacing with custom pricing: %s %s", nodeKey, ramCostStr)
 		costPerRAMHr, err := strconv.ParseFloat(ramCostStr, 64)
 		if err != nil {
 			log.Warnf("CostModel: custom pricing has illegal RAM cost: %s", ramCostStr)