Procházet zdrojové kódy

Added network insights queries to metrics querier, context names, and compute to costmodel

Matt Bolt před 1 rokem
rodič
revize
d2365958ca

+ 9 - 2
core/pkg/source/datasource.go

@@ -78,16 +78,23 @@ type MetricsQuerier interface {
 	QueryPVPricePerGiBHour(start, end time.Time) *Future[PVPricePerGiBHourResult]
 	QueryPVInfo(start, end time.Time) *Future[PVInfoResult]
 
-	// Network
+	// Network Egress
 	QueryNetZoneGiB(start, end time.Time) *Future[NetZoneGiBResult]
 	QueryNetZonePricePerGiB(start, end time.Time) *Future[NetZonePricePerGiBResult]
 	QueryNetRegionGiB(start, end time.Time) *Future[NetRegionGiBResult]
 	QueryNetRegionPricePerGiB(start, end time.Time) *Future[NetRegionPricePerGiBResult]
 	QueryNetInternetGiB(start, end time.Time) *Future[NetInternetGiBResult]
 	QueryNetInternetPricePerGiB(start, end time.Time) *Future[NetInternetPricePerGiBResult]
-	QueryNetReceiveBytes(start, end time.Time) *Future[NetReceiveBytesResult]
+	QueryNetInternetServiceGiB(start, end time.Time) *Future[NetInternetServiceGiBResult]
 	QueryNetTransferBytes(start, end time.Time) *Future[NetTransferBytesResult]
 
+	// Network Ingress
+	QueryNetZoneIngressGiB(start, end time.Time) *Future[NetZoneIngressGiBResult]
+	QueryNetRegionIngressGiB(start, end time.Time) *Future[NetRegionIngressGiBResult]
+	QueryNetInternetIngressGiB(start, end time.Time) *Future[NetInternetIngressGiBResult]
+	QueryNetInternetServiceIngressGiB(start, end time.Time) *Future[NetInternetServiceIngressGiBResult]
+	QueryNetReceiveBytes(start, end time.Time) *Future[NetReceiveBytesResult]
+
 	// Annotations
 	QueryNamespaceAnnotations(start, end time.Time) *Future[NamespaceAnnotationsResult]
 	QueryPodAnnotations(start, end time.Time) *Future[PodAnnotationsResult]

+ 30 - 0
core/pkg/source/decoders.go

@@ -856,6 +856,7 @@ type NetworkGiBResult struct {
 	Cluster   string
 	Namespace string
 	Pod       string
+	Service   string
 
 	Data []*util.Vector
 }
@@ -864,11 +865,13 @@ func DecodeNetworkGiBResult(result *QueryResult) *NetworkGiBResult {
 	cluster, _ := result.GetCluster()
 	namespace, _ := result.GetNamespace()
 	pod, _ := result.GetPod()
+	service, _ := result.GetString("service")
 
 	return &NetworkGiBResult{
 		Cluster:   cluster,
 		Namespace: namespace,
 		Pod:       pod,
+		Service:   service,
 		Data:      result.Values,
 	}
 }
@@ -899,6 +902,13 @@ type NetRegionPricePerGiBResult = NetworkPricePerGiBResult
 type NetInternetGiBResult = NetworkGiBResult
 type NetInternetPricePerGiBResult = NetworkPricePerGiBResult
 
+type NetInternetServiceGiBResult = NetworkGiBResult
+
+type NetZoneIngressGiBResult = NetworkGiBResult
+type NetRegionIngressGiBResult = NetworkGiBResult
+type NetInternetIngressGiBResult = NetworkGiBResult
+type NetInternetServiceIngressGiBResult = NetworkGiBResult
+
 func DecodeNetZoneGiBResult(result *QueryResult) *NetZoneGiBResult {
 	return DecodeNetworkGiBResult(result)
 }
@@ -923,6 +933,26 @@ func DecodeNetInternetPricePerGiBResult(result *QueryResult) *NetInternetPricePe
 	return DecodeNetworkPricePerGiBResult(result)
 }
 
+func DecodeNetInternetServiceGiBResult(result *QueryResult) *NetInternetServiceGiBResult {
+	return DecodeNetworkGiBResult(result)
+}
+
+func DecodeNetZoneIngressGiBResult(result *QueryResult) *NetZoneIngressGiBResult {
+	return DecodeNetworkGiBResult(result)
+}
+
+func DecodeNetRegionIngressGiBResult(result *QueryResult) *NetRegionIngressGiBResult {
+	return DecodeNetworkGiBResult(result)
+}
+
+func DecodeNetInternetIngressGiBResult(result *QueryResult) *NetInternetIngressGiBResult {
+	return DecodeNetworkGiBResult(result)
+}
+
+func DecodeNetInternetServiceIngressGiBResult(result *QueryResult) *NetInternetServiceIngressGiBResult {
+	return DecodeNetworkGiBResult(result)
+}
+
 type NetReceiveBytesResult struct {
 	Cluster   string
 	Namespace string

+ 3 - 0
modules/prometheus-source/pkg/prom/contextnames.go

@@ -28,4 +28,7 @@ const (
 	// ContainerStatsContextName is the name we assign queries that build
 	// container stats aggregations.
 	ContainerStatsContextName = "container-stats"
+
+	// NetworkInsightsContextName is the name we assign the network insights query context [metadata]
+	NetworkInsightsContextName = "networkinsight"
 )

+ 85 - 7
modules/prometheus-source/pkg/prom/metricsquerier.go

@@ -1027,20 +1027,20 @@ func (pds *PrometheusMetricsQuerier) QueryNetInternetPricePerGiB(start, end time
 	return source.NewFuture(source.DecodeNetInternetPricePerGiBResult, ctx.QueryAtTime(queryNetInternetCostPerGiB, end))
 }
 
-func (pds *PrometheusMetricsQuerier) QueryNetReceiveBytes(start, end time.Time) *source.Future[source.NetReceiveBytesResult] {
-	const queryFmtNetReceiveBytes = `sum(increase(container_network_receive_bytes_total{pod!="", %s}[%s])) by (pod_name, pod, namespace, %s)`
-	// env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
+func (pds *PrometheusMetricsQuerier) QueryNetInternetServiceGiB(start, end time.Time) *source.Future[source.NetInternetServiceGiBResult] {
+	const queryFmtNetInternetGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="true", %s}[%s])) by (pod_name, namespace, service, %s) / 1024 / 1024 / 1024`
+	// env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel()
 
 	cfg := pds.promConfig
 
 	durStr := timeutil.DurationString(end.Sub(start))
 	if durStr == "" {
-		panic("failed to parse duration string passed to QueryNetReceiveBytes")
+		panic("failed to parse duration string passed to QueryNetInternetGiB")
 	}
 
-	queryNetReceiveBytes := fmt.Sprintf(queryFmtNetReceiveBytes, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
-	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
-	return source.NewFuture(source.DecodeNetReceiveBytesResult, ctx.QueryAtTime(queryNetReceiveBytes, end))
+	queryNetInternetGiB := fmt.Sprintf(queryFmtNetInternetGiB, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
+	ctx := pds.promContexts.NewNamedContext(NetworkInsightsContextName)
+	return source.NewFuture(source.DecodeNetInternetServiceGiBResult, ctx.QueryAtTime(queryNetInternetGiB, end))
 }
 
 func (pds *PrometheusMetricsQuerier) QueryNetTransferBytes(start, end time.Time) *source.Future[source.NetTransferBytesResult] {
@@ -1059,6 +1059,84 @@ func (pds *PrometheusMetricsQuerier) QueryNetTransferBytes(start, end time.Time)
 	return source.NewFuture(source.DecodeNetTransferBytesResult, ctx.QueryAtTime(queryNetTransferBytes, end))
 }
 
+func (pds *PrometheusMetricsQuerier) QueryNetZoneIngressGiB(start, end time.Time) *source.Future[source.NetZoneIngressGiBResult] {
+	const queryFmtIngNetZoneGiB = `sum(increase(kubecost_pod_network_ingress_bytes_total{internet="false", same_zone="false", same_region="true", %s}[%s])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
+
+	cfg := pds.promConfig
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryNetZoneIngressGiB")
+	}
+
+	queryNetZoneCostPerGiB := fmt.Sprintf(queryFmtIngNetZoneGiB, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
+	ctx := pds.promContexts.NewNamedContext(NetworkInsightsContextName)
+	return source.NewFuture(source.DecodeNetZoneIngressGiBResult, ctx.QueryAtTime(queryNetZoneCostPerGiB, end))
+}
+
+func (pds *PrometheusMetricsQuerier) QueryNetRegionIngressGiB(start, end time.Time) *source.Future[source.NetRegionIngressGiBResult] {
+	const queryFmtIngNetRegionGiB = `sum(increase(kubecost_pod_network_ingress_bytes_total{internet="false", same_zone="false", same_region="false", %s}[%s])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
+
+	cfg := pds.promConfig
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryNetRegionIngressGiB")
+	}
+
+	queryNetRegionIngGiB := fmt.Sprintf(queryFmtIngNetRegionGiB, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
+	ctx := pds.promContexts.NewNamedContext(NetworkInsightsContextName)
+	return source.NewFuture(source.DecodeNetRegionIngressGiBResult, ctx.QueryAtTime(queryNetRegionIngGiB, end))
+}
+
+func (pds *PrometheusMetricsQuerier) QueryNetInternetIngressGiB(start, end time.Time) *source.Future[source.NetInternetIngressGiBResult] {
+	const queryFmtNetIngInternetGiB = `sum(increase(kubecost_pod_network_ingress_bytes_total{internet="true", %s}[%s])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
+	// env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel()
+
+	cfg := pds.promConfig
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryNetInternetIngressGiB")
+	}
+
+	queryNetIngInternetGiB := fmt.Sprintf(queryFmtNetIngInternetGiB, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
+	ctx := pds.promContexts.NewNamedContext(NetworkInsightsContextName)
+	return source.NewFuture(source.DecodeNetInternetIngressGiBResult, ctx.QueryAtTime(queryNetIngInternetGiB, end))
+}
+
+func (pds *PrometheusMetricsQuerier) QueryNetInternetServiceIngressGiB(start, end time.Time) *source.Future[source.NetInternetServiceIngressGiBResult] {
+	const queryFmtIngNetInternetGiB = `sum(increase(kubecost_pod_network_ingress_bytes_total{internet="true", %s}[%s])) by (pod_name, namespace, service, %s) / 1024 / 1024 / 1024`
+	// env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel()
+
+	cfg := pds.promConfig
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryNetInternetServiceIngressGiB")
+	}
+
+	queryNetIngInternetGiB := fmt.Sprintf(queryFmtIngNetInternetGiB, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
+	ctx := pds.promContexts.NewNamedContext(NetworkInsightsContextName)
+	return source.NewFuture(source.DecodeNetInternetServiceIngressGiBResult, ctx.QueryAtTime(queryNetIngInternetGiB, end))
+}
+
+func (pds *PrometheusMetricsQuerier) QueryNetReceiveBytes(start, end time.Time) *source.Future[source.NetReceiveBytesResult] {
+	const queryFmtNetReceiveBytes = `sum(increase(container_network_receive_bytes_total{pod!="", %s}[%s])) by (pod_name, pod, namespace, %s)`
+	// env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
+
+	cfg := pds.promConfig
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryNetReceiveBytes")
+	}
+
+	queryNetReceiveBytes := fmt.Sprintf(queryFmtNetReceiveBytes, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	return source.NewFuture(source.DecodeNetReceiveBytesResult, ctx.QueryAtTime(queryNetReceiveBytes, end))
+}
+
 func (pds *PrometheusMetricsQuerier) QueryNamespaceLabels(start, end time.Time) *source.Future[source.NamespaceLabelsResult] {
 	const queryFmtNamespaceLabels = `avg_over_time(kube_namespace_labels{%s}[%s])`
 	// env.GetPromClusterFilter(), durStr

+ 198 - 0
pkg/costmodel/networkinsight.go

@@ -0,0 +1,198 @@
+package costmodel
+
+import (
+	"fmt"
+	"time"
+
+	"github.com/opencost/opencost/core/pkg/log"
+	"github.com/opencost/opencost/core/pkg/opencost"
+	"github.com/opencost/opencost/core/pkg/source"
+	"github.com/opencost/opencost/pkg/env"
+)
+
+func (cm *CostModel) Compute(start, end time.Time, resolution time.Duration) (*opencost.NetworkInsightSet, error) {
+	log.Debugf("Network Insight compute called on prometheus source for window  %s", opencost.NewClosedWindow(start, end).String())
+
+	// If the duration is short enough, compute the network insight directly
+	if end.Sub(start) <= cm.BatchDuration {
+		return cm.GetNetworkInsightSet(start, end)
+	}
+
+	// Incase prometheus max query duration is less than the resolution
+
+	// s and e track the coverage of the entire given window over multiple
+	// internal queries.
+	s, e := start, start
+
+	window := opencost.NewClosedWindow(start, end)
+	totalNis := opencost.NewNetworkInsightSet(start, end)
+
+	for e.Before(end) {
+		duration := end.Sub(e)
+		if duration > cm.BatchDuration {
+			duration = cm.BatchDuration
+		}
+		e = s.Add(duration)
+		nis, err := cm.GetNetworkInsightSet(start, end)
+		if err != nil {
+			return &opencost.NetworkInsightSet{}, fmt.Errorf("error computing network insight for %s: %v", window.String(), err)
+		}
+		totalNis.Accumulate(nis, []opencost.NetworkInsightProperty{})
+		s = e
+	}
+	return totalNis, fmt.Errorf("unable to query prometheus for large duration")
+}
+
+func (cm *CostModel) GetNetworkInsightSet(start, end time.Time) (*opencost.NetworkInsightSet, error) {
+	resultingSet := &opencost.NetworkInsightSet{}
+	resultingSet.Window = opencost.NewClosedWindow(start, end)
+
+	querier := cm.DataSource.Metrics()
+	grp := source.NewQueryGroup()
+
+	// Egress Cross Zone
+	resChNetZoneGiB := source.WithGroup(grp, querier.QueryNetZoneGiB(start, end))
+	resNetZoneGiB, _ := resChNetZoneGiB.Await()
+
+	resChNetZoneCostPerGiB := source.WithGroup(grp, querier.QueryNetZonePricePerGiB(start, end))
+	resNetZoneCostPerGiB, _ := resChNetZoneCostPerGiB.Await()
+
+	// Egress Cross Region
+	resChNetRegionGiB := source.WithGroup(grp, querier.QueryNetRegionGiB(start, end))
+	resNetRegionGiB, _ := resChNetRegionGiB.Await()
+
+	resChNetRegionCostPerGiB := source.WithGroup(grp, querier.QueryNetRegionPricePerGiB(start, end))
+	resNetRegionCostPerGiB, _ := resChNetRegionCostPerGiB.Await()
+
+	// Egress Internet
+	resChNetInternetGiB := source.WithGroup(grp, querier.QueryNetInternetServiceGiB(start, end))
+	resNetInternetGiB, _ := resChNetInternetGiB.Await()
+
+	resChNetInternetCostPerGiB := source.WithGroup(grp, querier.QueryNetInternetPricePerGiB(start, end))
+	resNetInternetCostPerGiB, _ := resChNetInternetCostPerGiB.Await()
+
+	// Ingress Cross Zone
+	resChIngNetZoneGiB := source.WithGroup(grp, querier.QueryNetZoneIngressGiB(start, end))
+	resIngNetZoneGiB, _ := resChIngNetZoneGiB.Await()
+
+	// There's no prometheus cost at the moment for Ingress
+	resIngNetZoneCostPerGiB := []*source.NetworkPricePerGiBResult{}
+
+	// Ingress Cross Region
+	resChIngNetRegionGiB := source.WithGroup(grp, querier.QueryNetRegionIngressGiB(start, end))
+	resIngNetRegionGiB, _ := resChIngNetRegionGiB.Await()
+
+	// There's no prometheus cost at the moment for Ingress
+	resIngNetRegionCostPerGiB := []*source.NetworkPricePerGiBResult{}
+
+	// Ingress Internet
+	resChIngNetInternetGiB := source.WithGroup(grp, querier.QueryNetInternetServiceIngressGiB(start, end))
+	resIngNetInternetGiB, _ := resChIngNetInternetGiB.Await()
+
+	// There's no prometheus cost at the moment for Ingress
+	resIngNetInternetCostPerGiB := []*source.NetworkPricePerGiBResult{}
+
+	// apply Egress cross zone network details
+	applyNetworkCosts(resultingSet, resNetZoneGiB, resNetZoneCostPerGiB, opencost.NetworkTrafficTypeCrossZone, opencost.NetworkTrafficDirectionEgress)
+
+	// apply Egress cross region network details
+	applyNetworkCosts(resultingSet, resNetRegionGiB, resNetRegionCostPerGiB, opencost.NetworkTrafficTypeCrossRegion, opencost.NetworkTrafficDirectionEgress)
+
+	// apply Egress internet network details
+	applyNetworkCosts(resultingSet, resNetInternetGiB, resNetInternetCostPerGiB, opencost.NetworkTrafficTypeInternet, opencost.NetworkTrafficDirectionEgress)
+
+	// apply Ingress cross zone network details
+	applyNetworkCosts(resultingSet, resIngNetZoneGiB, resIngNetZoneCostPerGiB, opencost.NetworkTrafficTypeCrossZone, opencost.NetworkTrafficDirectionIngress)
+
+	// apply Ingress cross region network details
+	applyNetworkCosts(resultingSet, resIngNetRegionGiB, resIngNetRegionCostPerGiB, opencost.NetworkTrafficTypeCrossRegion, opencost.NetworkTrafficDirectionIngress)
+
+	// apply Ingress internet network details
+	applyNetworkCosts(resultingSet, resIngNetInternetGiB, resIngNetInternetCostPerGiB, opencost.NetworkTrafficTypeInternet, opencost.NetworkTrafficDirectionIngress)
+
+	return resultingSet, nil
+}
+
+func applyNetworkCosts(
+	ns *opencost.NetworkInsightSet,
+	resNetworkGiB []*source.NetworkGiBResult,
+	resNetworkCostPerGiB []*source.NetworkPricePerGiBResult,
+	networkType opencost.NetworkTrafficType,
+	trafficType opencost.NetworkTrafficDirection,
+) error {
+	var cost float64
+	// All ingress cost are comming out empty at the moment?
+	// do we charge at all here?
+	if len(resNetworkCostPerGiB) == 0 {
+		cost = 0
+	} else {
+		cost = resNetworkCostPerGiB[0].Data[0].Value
+	}
+
+	for _, res := range resNetworkGiB {
+		bytes := res.Data[0].Value
+		// dont really care about bytes <=0
+		if bytes <= 0 {
+			continue
+		}
+
+		cluster := res.Cluster
+		if cluster == "" {
+			cluster = env.GetClusterID()
+		}
+		namespace := res.Namespace
+		pod := res.Pod
+		service := res.Service
+		if service == "" {
+			service = opencost.NetworkInsightsServiceUnknown
+		}
+
+		totalByteCost := bytes * cost
+		// sameZone, sameRegion, internet := getNetworkBools(networkType)
+		nds := make(opencost.NetworkDetailsSet, 1)
+		nd := &opencost.NetworkDetail{
+			Cost:             totalByteCost,
+			Bytes:            bytes,
+			EndPoint:         service,
+			TrafficType:      networkType,
+			TrafficDirection: trafficType,
+		}
+
+		nds.Add(nd)
+
+		crossZoneCost, crossRegionCost, internetCost, totalCost := getNetworkCost(networkType, totalByteCost)
+
+		ni := &opencost.NetworkInsight{
+			Cluster:                cluster,
+			Namespace:              namespace,
+			Controller:             "",
+			Pod:                    pod,
+			Node:                   "",
+			Labels:                 make(map[string]string),
+			Region:                 "",
+			Zone:                   "",
+			NetworkTotalCost:       totalCost,
+			NetworkCrossZoneCost:   crossZoneCost,
+			NetworkCrossRegionCost: crossRegionCost,
+			NetworkInternetCost:    internetCost,
+			NetworkDetails:         nds,
+		}
+
+		ns.Insert(ni, []opencost.NetworkInsightProperty{})
+	}
+	return nil
+}
+
+func getNetworkCost(networkType opencost.NetworkTrafficType, cost float64) (crossZoneCost, crossRegionCost, internetCost, totalCost float64) {
+	switch networkType {
+	case opencost.NetworkTrafficTypeCrossZone:
+		return cost, 0.0, 0.0, cost
+	case opencost.NetworkTrafficTypeCrossRegion:
+		return 0.0, cost, 0.0, cost
+	case opencost.NetworkTrafficTypeInternet:
+		return 0.0, 0.0, cost, cost
+	default:
+		log.Warnf("unknown string passed: %s", networkType)
+		return 0.0, 0.0, 0.0, 0.0
+	}
+}