2
0
Эх сурвалжийг харах

remove local testing, fix test (#3145)

Signed-off-by: Sean Holcomb <seanholcomb@gmail.com>
Sean Holcomb 1 жил өмнө
parent
commit
c421af982d

+ 2 - 0
core/pkg/clustercache/clustercache.go

@@ -33,6 +33,7 @@ type Pod struct {
 }
 
 type PodStatus struct {
+	PodIP             string
 	Phase             v1.PodPhase
 	ContainerStatuses []v1.ContainerStatus
 }
@@ -195,6 +196,7 @@ func TransformPodContainer(input v1.Container) Container {
 
 func TransformPodStatus(input v1.PodStatus) PodStatus {
 	return PodStatus{
+		PodIP:             input.PodIP,
 		Phase:             input.Phase,
 		ContainerStatuses: input.ContainerStatuses,
 	}

+ 4 - 6
modules/collector-source/pkg/scrape/dcgm.go

@@ -4,6 +4,7 @@ import (
 	"fmt"
 
 	"github.com/opencost/opencost/core/pkg/clustercache"
+	"github.com/opencost/opencost/core/pkg/log"
 	"github.com/opencost/opencost/modules/collector-source/pkg/metric"
 	"github.com/opencost/opencost/modules/collector-source/pkg/scrape/target"
 )
@@ -15,9 +16,7 @@ const (
 )
 
 func newDCGMScrapper(clusterCache clustercache.ClusterCache, updater metric.MetricUpdater) Scraper {
-	//tp := newDCGMTargetProvider(clusterCache)
-	tp := target.NewDefaultTargetProvider(
-		target.NewUrlTarget("http://localhost:9400/metrics"))
+	tp := newDCGMTargetProvider(clusterCache)
 	return newDCGMTargetScraper(tp, updater)
 }
 
@@ -44,18 +43,17 @@ func newDCGMTargetProvider(clusterCache clustercache.ClusterCache) *DCGMTargetPr
 
 func (p *DCGMTargetProvider) GetTargets() []target.ScrapeTarget {
 	svcs := p.clusterCache.GetAllServices()
-
 	var targets []target.ScrapeTarget
 	for _, svc := range svcs {
 		if svc.ClusterIP == "" || svc.SpecSelector == nil {
 			continue
 		}
 		// TODO do something in relation to Thomas' comment https://github.com/opencost/opencost/pull/3110
-		if name := svc.SpecSelector["app.kubernetes.io/name"]; name != "dcm-collector" {
+		if name := svc.SpecSelector["app.kubernetes.io/name"]; name != "dcgm-collector" {
 			continue
 		}
 		port := 9400
-
+		log.Debugf("DCGM: found target: http://%s:%d/metrics", svc.ClusterIP, port)
 		t := target.NewUrlTarget(fmt.Sprintf("http://%s:%d/metrics", svc.ClusterIP, port))
 		targets = append(targets, t)
 	}

+ 26 - 37
modules/collector-source/pkg/scrape/network.go

@@ -1,14 +1,12 @@
 package scrape
 
 import (
-	"context"
 	"fmt"
 
+	"github.com/opencost/opencost/core/pkg/clustercache"
 	"github.com/opencost/opencost/core/pkg/log"
 	"github.com/opencost/opencost/modules/collector-source/pkg/metric"
 	"github.com/opencost/opencost/modules/collector-source/pkg/scrape/target"
-	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
-	"k8s.io/client-go/kubernetes"
 )
 
 // Network Metrics
@@ -20,23 +18,10 @@ const (
 func newNetworkScraper(
 	releaseName string,
 	port int,
-	k8s kubernetes.Interface,
+	clusterCache clustercache.ClusterCache,
 	updater metric.MetricUpdater,
 ) Scraper {
-	// TODO revert this
-	//tp := NewNetworkTargetProvider(releaseName, port, k8s)
-	tp := target.NewDefaultTargetProvider(
-		target.NewUrlTarget("http://localhost:9111/metrics"),
-		target.NewUrlTarget("http://localhost:9112/metrics"),
-		target.NewUrlTarget("http://localhost:9113/metrics"),
-		target.NewUrlTarget("http://localhost:9114/metrics"),
-		target.NewUrlTarget("http://localhost:9115/metrics"),
-		target.NewUrlTarget("http://localhost:9116/metrics"),
-		target.NewUrlTarget("http://localhost:9117/metrics"),
-		target.NewUrlTarget("http://localhost:9118/metrics"),
-		target.NewUrlTarget("http://localhost:9119/metrics"),
-		target.NewUrlTarget("http://localhost:9120/metrics"),
-	)
+	tp := NewNetworkTargetProvider(releaseName, port, clusterCache)
 	return newNetworkTargetScraper(tp, updater)
 }
 
@@ -52,34 +37,38 @@ func newNetworkTargetScraper(provider target.TargetProvider, updater metric.Metr
 }
 
 type NetworkTargetProvider struct {
-	releaseName   string
-	port          int
-	kubeClientSet kubernetes.Interface
+	releaseName  string
+	port         int
+	clusterCache clustercache.ClusterCache
 }
 
-func NewNetworkTargetProvider(releaseName string, port int, k8s kubernetes.Interface) *NetworkTargetProvider {
+func NewNetworkTargetProvider(releaseName string, port int, clusterCache clustercache.ClusterCache) *NetworkTargetProvider {
 	return &NetworkTargetProvider{
-		releaseName:   releaseName,
-		port:          port,
-		kubeClientSet: k8s,
+		releaseName:  releaseName,
+		port:         port,
+		clusterCache: clusterCache,
 	}
 }
 
 func (n *NetworkTargetProvider) GetTargets() []target.ScrapeTarget {
-	k8s := n.kubeClientSet
-
-	pods, err := k8s.CoreV1().Pods("").List(context.Background(), metav1.ListOptions{
-		LabelSelector: fmt.Sprintf("app=%s-network-costs", n.releaseName),
-	})
-	if err != nil {
-		log.Errorf("NetworkTargetProvider: failed to retieve pods from kubernetes client: %s", err.Error())
-		return nil
-	}
+	pods := n.clusterCache.GetAllPods()
+	//pods, err := k8s.CoreV1().Pods("").List(context.Background(), metav1.ListOptions{
+	//	LabelSelector: fmt.Sprintf("app=%s-network-costs", n.releaseName),
+	//})
+	//if err != nil {
+	//	log.Errorf("NetworkTargetProvider: failed to retieve pods from kubernetes client: %s", err.Error())
+	//	return nil
+	//}
 
 	var targets []target.ScrapeTarget
-	for _, pod := range pods.Items {
-		t := target.NewUrlTarget(fmt.Sprintf("http://%s:%d/metrics", pod.Status.PodIP, n.port))
-		targets = append(targets, t)
+	for _, pod := range pods {
+		instance := pod.Labels["app.kubernetes.io/instance"]
+		name := pod.Labels["app.kubernetes.io/name"]
+		if name == "network-costs" && instance == "kubecost" {
+			log.Debugf("Network: found target for %s", name)
+			t := target.NewUrlTarget(fmt.Sprintf("http://%s:%d/metrics", pod.Status.PodIP, n.port))
+			targets = append(targets, t)
+		}
 	}
 
 	return targets

+ 1 - 1
modules/collector-source/pkg/scrape/scrapecontroller.go

@@ -38,7 +38,7 @@ func NewScrapeController(
 	statSummaryScraper := newStatSummaryScraper(statSummaryClient, updater)
 	scrapers = append(scrapers, statSummaryScraper)
 
-	networkScraper := newNetworkScraper(releaseName, networkPort, k8s, updater)
+	networkScraper := newNetworkScraper(releaseName, networkPort, clusterCache, updater)
 	scrapers = append(scrapers, networkScraper)
 
 	dcgmScraper := newDCGMScrapper(clusterCache, updater)

+ 26 - 0
modules/collector-source/pkg/scrape/targetscraper_test.go

@@ -141,6 +141,32 @@ func TestTargetScraper_Scrape(t *testing.T) {
 					Value:     335188219,
 					Timestamp: nil,
 				},
+				{
+					MetricName: KubecostPodNetworkIngressBytesTotal,
+					Labels: map[string]string{
+						"pod_name":    "pod1",
+						"namespace":   "namespace1",
+						"internet":    "true",
+						"same_region": "false",
+						"same_zone":   "false",
+						"service":     "service1",
+					},
+					Value:     17941460,
+					Timestamp: nil,
+				},
+				{
+					MetricName: KubecostPodNetworkIngressBytesTotal,
+					Labels: map[string]string{
+						"pod_name":    "pod2",
+						"namespace":   "namespace1",
+						"internet":    "false",
+						"same_region": "true",
+						"same_zone":   "false",
+						"service":     "",
+					},
+					Value:     13948766,
+					Timestamp: nil,
+				},
 			},
 		},
 		{