Explorar el Código

DCGM Scrape Fix + Diagnostics Cleanup (#3344)

Signed-off-by: Matt Bolt <mbolt35@gmail.com>
Matt Bolt hace 8 meses
padre
commit
b2f286a4b7

+ 1 - 2
modules/collector-source/pkg/collector/datasource.go

@@ -80,8 +80,7 @@ func NewCollectorDataSource(
 		}
 		}
 	}
 	}
 
 
-	diagnosticsModule := metric.NewDiagnosticsModule(updater)
-	updater = diagnosticsModule
+	diagnosticsModule := metric.NewDiagnosticsModule()
 	scrapeController := scrape.NewScrapeController(
 	scrapeController := scrape.NewScrapeController(
 		config.ScrapeInterval,
 		config.ScrapeInterval,
 		config.NetworkPort,
 		config.NetworkPort,

+ 1 - 200
modules/collector-source/pkg/metric/diagnostics.go

@@ -2,9 +2,7 @@ package metric
 
 
 import (
 import (
 	"fmt"
 	"fmt"
-	"maps"
 	"sync"
 	"sync"
-	"time"
 
 
 	"github.com/kubecost/events"
 	"github.com/kubecost/events"
 	"github.com/opencost/opencost/core/pkg/collections"
 	"github.com/opencost/opencost/core/pkg/collections"
@@ -14,12 +12,6 @@ import (
 
 
 // Collector Metric Diagnostic IDs
 // Collector Metric Diagnostic IDs
 const (
 const (
-	// OpencostDiagnosticMetricID is the identifier for the metric used to determine if Opencost metrics are being updated
-	OpencostDiagnosticMetricID = "opencostMetric"
-
-	// NodesDiagnosticMetricID is the identifier for the query used to determine if the node CPU cores capacity is being updated
-	NodesDiagnosticMetricID = "nodesCPUMetrics"
-
 	// DcgmScraperDiagnosticID contains the identifier for the the DCGM scraper diagnostic.
 	// DcgmScraperDiagnosticID contains the identifier for the the DCGM scraper diagnostic.
 	DcgmScraperDiagnosticID = event.DCGMScraperName
 	DcgmScraperDiagnosticID = event.DCGMScraperName
 
 
@@ -44,16 +36,6 @@ const (
 	KubernetesPvcsScraperDiagnosticID         = event.KubernetesClusterScraperName + "-" + event.PvcScraperType
 	KubernetesPvcsScraperDiagnosticID         = event.KubernetesClusterScraperName + "-" + event.PvcScraperType
 )
 )
 
 
-// DiagnosticType is used in the definitions to determine which type of implementation to use when representing the
-// diagnostic
-type DiagnosticType int
-
-const (
-	DiagnosticTypeMetric  DiagnosticType = 0
-	DiagnosticTypeScraper DiagnosticType = 1
-	// more diagnostic types?
-)
-
 // diagnostic defintion is the type used to define a deterministic list of specific diagnostics we _expect_ to collect
 // diagnostic defintion is the type used to define a deterministic list of specific diagnostics we _expect_ to collect
 type diagnosticDefinition struct {
 type diagnosticDefinition struct {
 	ID          string
 	ID          string
@@ -61,33 +43,15 @@ type diagnosticDefinition struct {
 	Label       string
 	Label       string
 	Description string
 	Description string
 	DocLink     string
 	DocLink     string
-	DiagType    DiagnosticType
 }
 }
 
 
 // diagnostic definitions mapping holds all of the diagnostic definitions that can be used for collector metrics diagnostics
 // diagnostic definitions mapping holds all of the diagnostic definitions that can be used for collector metrics diagnostics
 var diagnosticDefinitions map[string]*diagnosticDefinition = map[string]*diagnosticDefinition{
 var diagnosticDefinitions map[string]*diagnosticDefinition = map[string]*diagnosticDefinition{
-	NodesDiagnosticMetricID: {
-		ID:          NodesDiagnosticMetricID,
-		MetricName:  KubeNodeStatusCapacityCPUCores,
-		Label:       "Node CPU cores capacity is being scraped",
-		Description: "Determine if the node CPU cores capacity metrics are being updated",
-		DiagType:    DiagnosticTypeMetric,
-	},
-
-	OpencostDiagnosticMetricID: {
-		ID:          OpencostDiagnosticMetricID,
-		MetricName:  NodeTotalHourlyCost,
-		Label:       "Opencost metrics for a node are being scraped",
-		Description: "Determine if opencost metrics for a node are being updated",
-		DiagType:    DiagnosticTypeMetric,
-	},
-
 	DcgmScraperDiagnosticID: {
 	DcgmScraperDiagnosticID: {
 		ID:          DcgmScraperDiagnosticID,
 		ID:          DcgmScraperDiagnosticID,
 		MetricName:  event.DCGMScraperName,
 		MetricName:  event.DCGMScraperName,
 		Label:       "DCGM scraper is available and is being scraped.",
 		Label:       "DCGM scraper is available and is being scraped.",
 		Description: scraperDiagnosticDescriptionFor(event.DCGMScraperName, ""),
 		Description: scraperDiagnosticDescriptionFor(event.DCGMScraperName, ""),
-		DiagType:    DiagnosticTypeScraper,
 	},
 	},
 
 
 	OpenCostScraperDiagnosticID: {
 	OpenCostScraperDiagnosticID: {
@@ -95,7 +59,6 @@ var diagnosticDefinitions map[string]*diagnosticDefinition = map[string]*diagnos
 		MetricName:  event.OpenCostScraperName,
 		MetricName:  event.OpenCostScraperName,
 		Label:       "Opencost metrics scraper is available and is being scraped.",
 		Label:       "Opencost metrics scraper is available and is being scraped.",
 		Description: scraperDiagnosticDescriptionFor(event.OpenCostScraperName, ""),
 		Description: scraperDiagnosticDescriptionFor(event.OpenCostScraperName, ""),
-		DiagType:    DiagnosticTypeScraper,
 	},
 	},
 
 
 	NodeStatsScraperDiagnosticID: {
 	NodeStatsScraperDiagnosticID: {
@@ -103,7 +66,6 @@ var diagnosticDefinitions map[string]*diagnosticDefinition = map[string]*diagnos
 		MetricName:  event.NodeStatsScraperName,
 		MetricName:  event.NodeStatsScraperName,
 		Label:       "Node stats summary scraper is available and is being scraped.",
 		Label:       "Node stats summary scraper is available and is being scraped.",
 		Description: scraperDiagnosticDescriptionFor(event.NodeStatsScraperName, ""),
 		Description: scraperDiagnosticDescriptionFor(event.NodeStatsScraperName, ""),
-		DiagType:    DiagnosticTypeScraper,
 	},
 	},
 
 
 	NetworkCostsScraperDiagnosticID: {
 	NetworkCostsScraperDiagnosticID: {
@@ -111,7 +73,6 @@ var diagnosticDefinitions map[string]*diagnosticDefinition = map[string]*diagnos
 		MetricName:  event.NetworkCostsScraperName,
 		MetricName:  event.NetworkCostsScraperName,
 		Label:       "Network costs daemonset metrics scrapers are available and being scraped.",
 		Label:       "Network costs daemonset metrics scrapers are available and being scraped.",
 		Description: scraperDiagnosticDescriptionFor(event.NetworkCostsScraperName, ""),
 		Description: scraperDiagnosticDescriptionFor(event.NetworkCostsScraperName, ""),
-		DiagType:    DiagnosticTypeScraper,
 	},
 	},
 
 
 	KubernetesNodesScraperDiagnosticID: {
 	KubernetesNodesScraperDiagnosticID: {
@@ -119,7 +80,6 @@ var diagnosticDefinitions map[string]*diagnosticDefinition = map[string]*diagnos
 		MetricName:  KubernetesNodesScraperDiagnosticID,
 		MetricName:  KubernetesNodesScraperDiagnosticID,
 		Label:       fmt.Sprintf("Kubernetes cluster resources: %s are available and being scraped", event.NodeScraperType),
 		Label:       fmt.Sprintf("Kubernetes cluster resources: %s are available and being scraped", event.NodeScraperType),
 		Description: scraperDiagnosticDescriptionFor(event.KubernetesClusterScraperName, event.NodeScraperType),
 		Description: scraperDiagnosticDescriptionFor(event.KubernetesClusterScraperName, event.NodeScraperType),
-		DiagType:    DiagnosticTypeScraper,
 	},
 	},
 
 
 	KubernetesNamespacesScraperDiagnosticID: {
 	KubernetesNamespacesScraperDiagnosticID: {
@@ -127,7 +87,6 @@ var diagnosticDefinitions map[string]*diagnosticDefinition = map[string]*diagnos
 		MetricName:  KubernetesNamespacesScraperDiagnosticID,
 		MetricName:  KubernetesNamespacesScraperDiagnosticID,
 		Label:       fmt.Sprintf("Kubernetes cluster resources: %s are available and being scraped", event.NamespaceScraperType),
 		Label:       fmt.Sprintf("Kubernetes cluster resources: %s are available and being scraped", event.NamespaceScraperType),
 		Description: scraperDiagnosticDescriptionFor(event.KubernetesClusterScraperName, event.NamespaceScraperType),
 		Description: scraperDiagnosticDescriptionFor(event.KubernetesClusterScraperName, event.NamespaceScraperType),
-		DiagType:    DiagnosticTypeScraper,
 	},
 	},
 
 
 	KubernetesReplicaSetsScraperDiagnosticID: {
 	KubernetesReplicaSetsScraperDiagnosticID: {
@@ -135,7 +94,6 @@ var diagnosticDefinitions map[string]*diagnosticDefinition = map[string]*diagnos
 		MetricName:  KubernetesReplicaSetsScraperDiagnosticID,
 		MetricName:  KubernetesReplicaSetsScraperDiagnosticID,
 		Label:       fmt.Sprintf("Kubernetes cluster resources: %s are available and being scraped", event.ReplicaSetScraperType),
 		Label:       fmt.Sprintf("Kubernetes cluster resources: %s are available and being scraped", event.ReplicaSetScraperType),
 		Description: scraperDiagnosticDescriptionFor(event.KubernetesClusterScraperName, event.ReplicaSetScraperType),
 		Description: scraperDiagnosticDescriptionFor(event.KubernetesClusterScraperName, event.ReplicaSetScraperType),
-		DiagType:    DiagnosticTypeScraper,
 	},
 	},
 
 
 	KubernetesDeploymentsScraperDiagnosticID: {
 	KubernetesDeploymentsScraperDiagnosticID: {
@@ -143,7 +101,6 @@ var diagnosticDefinitions map[string]*diagnosticDefinition = map[string]*diagnos
 		MetricName:  KubernetesDeploymentsScraperDiagnosticID,
 		MetricName:  KubernetesDeploymentsScraperDiagnosticID,
 		Label:       fmt.Sprintf("Kubernetes cluster resources: %s are available and being scraped", event.DeploymentScraperType),
 		Label:       fmt.Sprintf("Kubernetes cluster resources: %s are available and being scraped", event.DeploymentScraperType),
 		Description: scraperDiagnosticDescriptionFor(event.KubernetesClusterScraperName, event.DeploymentScraperType),
 		Description: scraperDiagnosticDescriptionFor(event.KubernetesClusterScraperName, event.DeploymentScraperType),
-		DiagType:    DiagnosticTypeScraper,
 	},
 	},
 
 
 	KubernetesStatefulSetsScraperDiagnosticID: {
 	KubernetesStatefulSetsScraperDiagnosticID: {
@@ -151,7 +108,6 @@ var diagnosticDefinitions map[string]*diagnosticDefinition = map[string]*diagnos
 		MetricName:  KubernetesStatefulSetsScraperDiagnosticID,
 		MetricName:  KubernetesStatefulSetsScraperDiagnosticID,
 		Label:       fmt.Sprintf("Kubernetes cluster resources: %s are available and being scraped", event.StatefulSetScraperType),
 		Label:       fmt.Sprintf("Kubernetes cluster resources: %s are available and being scraped", event.StatefulSetScraperType),
 		Description: scraperDiagnosticDescriptionFor(event.KubernetesClusterScraperName, event.StatefulSetScraperType),
 		Description: scraperDiagnosticDescriptionFor(event.KubernetesClusterScraperName, event.StatefulSetScraperType),
-		DiagType:    DiagnosticTypeScraper,
 	},
 	},
 
 
 	KubernetesServicesScraperDiagnosticID: {
 	KubernetesServicesScraperDiagnosticID: {
@@ -159,7 +115,6 @@ var diagnosticDefinitions map[string]*diagnosticDefinition = map[string]*diagnos
 		MetricName:  KubernetesServicesScraperDiagnosticID,
 		MetricName:  KubernetesServicesScraperDiagnosticID,
 		Label:       fmt.Sprintf("Kubernetes cluster resources: %s are available and being scraped", event.ServiceScraperType),
 		Label:       fmt.Sprintf("Kubernetes cluster resources: %s are available and being scraped", event.ServiceScraperType),
 		Description: scraperDiagnosticDescriptionFor(event.KubernetesClusterScraperName, event.ServiceScraperType),
 		Description: scraperDiagnosticDescriptionFor(event.KubernetesClusterScraperName, event.ServiceScraperType),
-		DiagType:    DiagnosticTypeScraper,
 	},
 	},
 
 
 	KubernetesPodsScraperDiagnosticID: {
 	KubernetesPodsScraperDiagnosticID: {
@@ -167,7 +122,6 @@ var diagnosticDefinitions map[string]*diagnosticDefinition = map[string]*diagnos
 		MetricName:  KubernetesPodsScraperDiagnosticID,
 		MetricName:  KubernetesPodsScraperDiagnosticID,
 		Label:       fmt.Sprintf("Kubernetes cluster resources: %s are available and being scraped", event.PodScraperType),
 		Label:       fmt.Sprintf("Kubernetes cluster resources: %s are available and being scraped", event.PodScraperType),
 		Description: scraperDiagnosticDescriptionFor(event.KubernetesClusterScraperName, event.PodScraperType),
 		Description: scraperDiagnosticDescriptionFor(event.KubernetesClusterScraperName, event.PodScraperType),
-		DiagType:    DiagnosticTypeScraper,
 	},
 	},
 
 
 	KubernetesPvsScraperDiagnosticID: {
 	KubernetesPvsScraperDiagnosticID: {
@@ -175,7 +129,6 @@ var diagnosticDefinitions map[string]*diagnosticDefinition = map[string]*diagnos
 		MetricName:  KubernetesPvsScraperDiagnosticID,
 		MetricName:  KubernetesPvsScraperDiagnosticID,
 		Label:       fmt.Sprintf("Kubernetes cluster resources: %s are available and being scraped", event.PvScraperType),
 		Label:       fmt.Sprintf("Kubernetes cluster resources: %s are available and being scraped", event.PvScraperType),
 		Description: scraperDiagnosticDescriptionFor(event.KubernetesClusterScraperName, event.PvScraperType),
 		Description: scraperDiagnosticDescriptionFor(event.KubernetesClusterScraperName, event.PvScraperType),
-		DiagType:    DiagnosticTypeScraper,
 	},
 	},
 
 
 	KubernetesPvcsScraperDiagnosticID: {
 	KubernetesPvcsScraperDiagnosticID: {
@@ -183,7 +136,6 @@ var diagnosticDefinitions map[string]*diagnosticDefinition = map[string]*diagnos
 		MetricName:  KubernetesPvcsScraperDiagnosticID,
 		MetricName:  KubernetesPvcsScraperDiagnosticID,
 		Label:       fmt.Sprintf("Kubernetes cluster resources: %s are available and being scraped", event.PvcScraperType),
 		Label:       fmt.Sprintf("Kubernetes cluster resources: %s are available and being scraped", event.PvcScraperType),
 		Description: scraperDiagnosticDescriptionFor(event.KubernetesClusterScraperName, event.PvcScraperType),
 		Description: scraperDiagnosticDescriptionFor(event.KubernetesClusterScraperName, event.PvcScraperType),
-		DiagType:    DiagnosticTypeScraper,
 	},
 	},
 }
 }
 
 
@@ -216,73 +168,6 @@ type CollectorDiagnostic interface {
 	Details() map[string]any
 	Details() map[string]any
 }
 }
 
 
-// metric diagnostic is checked on metrics update -- it maintains a historic record of all the instants
-// a specific metric was updated, and reports a diagnostic on the validity of that history.
-type metricDiagnostic struct {
-	diagnostic       *diagnosticDefinition
-	updateTimestamps []time.Time
-	result           map[string]float64
-}
-
-// creates a new metric diagnostic
-func newMetricDiagnostic(diagnostic *diagnosticDefinition) *metricDiagnostic {
-	return &metricDiagnostic{
-		diagnostic: diagnostic,
-		result:     make(map[string]float64),
-	}
-}
-
-// Id returns the identifier for the metric diagnostic type -- this just proxies from the diagnostic
-// definition.
-func (md *metricDiagnostic) Id() string {
-	return md.diagnostic.ID
-}
-
-// Name returns the name of the metric being run for the metric diagnostic type -- this just proxies from
-// the diagnostic definition.
-func (md *metricDiagnostic) Name() string {
-	return md.diagnostic.MetricName
-}
-
-// Details generates an exportable detail map for the specific diagnostic, and resets any of its internal
-// state for the current cycle.
-func (md *metricDiagnostic) Details() map[string]any {
-	// for all timestamps that occurred during our update cycle,
-	// if any timestamps for our metric do not exist, then we
-	// say that the diagnostic failed. if there are no timestamps
-	// marked in the result, then we also say the diagnostic failed.
-	passed := true
-	if len(md.result) == 0 {
-		passed = false
-	} else {
-		for _, t := range md.updateTimestamps {
-			key := t.Format(time.RFC3339)
-
-			_, hasTimestamp := md.result[key]
-			if !hasTimestamp {
-				passed = false
-				break
-			}
-		}
-	}
-
-	details := map[string]any{
-		"query":   md.Name(),
-		"label":   md.diagnostic.Label,
-		"docLink": md.diagnostic.DocLink,
-		"result":  maps.Clone(md.result),
-		"passed":  passed,
-	}
-
-	// reset the update timestamps and results
-	md.updateTimestamps = []time.Time{}
-	for k := range md.result {
-		delete(md.result, k)
-	}
-
-	return details
-}
-
 // scrapeDiagnostic maintains the latest state of each scrape event that occurs. scrape
 // scrapeDiagnostic maintains the latest state of each scrape event that occurs. scrape
 // events can be registered for any event, but only the specific scrapes with diagnostic
 // events can be registered for any event, but only the specific scrapes with diagnostic
 // definitions defined will export as diagnostics.
 // definitions defined will export as diagnostics.
@@ -377,25 +262,14 @@ func (sd *scrapeDiagnostic) Details() map[string]any {
 type DiagnosticsModule struct {
 type DiagnosticsModule struct {
 	lock            sync.RWMutex
 	lock            sync.RWMutex
 	diagnostics     *collections.IdNameMap[CollectorDiagnostic]
 	diagnostics     *collections.IdNameMap[CollectorDiagnostic]
-	updater         Updater
 	scrapeHandlerId events.HandlerID // scrape event handler identifier for removal
 	scrapeHandlerId events.HandlerID // scrape event handler identifier for removal
 }
 }
 
 
 // NewDiagnosticsModule creates a new `DiagnosticsModule` instance to be used with a collector data source
 // NewDiagnosticsModule creates a new `DiagnosticsModule` instance to be used with a collector data source
-func NewDiagnosticsModule(updater Updater) *DiagnosticsModule {
-	// initialize all metric diagnostics IFF the diagnostic type is "metrics"
-	// NOTE: scraper diagnostics are dynamically created as scrape results arrive
+func NewDiagnosticsModule() *DiagnosticsModule {
 	diagnostics := collections.NewIdNameMap[CollectorDiagnostic]()
 	diagnostics := collections.NewIdNameMap[CollectorDiagnostic]()
-	for _, def := range diagnosticDefinitions {
-		// only insert metric diagnostic types
-		if def.DiagType == DiagnosticTypeMetric {
-			diagnostics.Insert(newMetricDiagnostic(def))
-		}
-	}
-
 	dm := &DiagnosticsModule{
 	dm := &DiagnosticsModule{
 		diagnostics: diagnostics,
 		diagnostics: diagnostics,
-		updater:     updater,
 	}
 	}
 
 
 	scrapeEvents := events.GlobalDispatcherFor[event.ScrapeEvent]()
 	scrapeEvents := events.GlobalDispatcherFor[event.ScrapeEvent]()
@@ -422,79 +296,6 @@ func (d *DiagnosticsModule) onScrapeEvent(event event.ScrapeEvent) {
 	d.diagnostics.Insert(newScrapeDiagnostic(event, def))
 	d.diagnostics.Insert(newScrapeDiagnostic(event, def))
 }
 }
 
 
-func (d *DiagnosticsModule) Update(updateSet *UpdateSet) {
-	if updateSet == nil {
-		return
-	}
-
-	// This is done so that the update func is marked complete when both the updater and diagnostics are done
-	// Otherwise we might face a race condition when calling the diagnostics details func before the diagnostics are done
-	var wg sync.WaitGroup
-	wg.Add(2) // 1 for updater, 1 for diagnostics
-
-	go func() {
-		defer wg.Done()
-
-		d.lock.Lock()
-		defer d.lock.Unlock()
-
-		// add the timestamp to all metric diagnostic instances (see notes on addUpdateTimestamp)
-		ts := updateSet.Timestamp
-		d.addUpdateTimestamp(ts)
-
-		timestamp := ts.Format(time.RFC3339)
-
-		for _, update := range updateSet.Updates {
-			if metric, ok := d.diagnostics.ByName(update.Name); ok {
-				// this is unfortunately necessary due to the way our diangostic collectors
-				// differ in functionality -- it makes more sense to duck type here rather
-				// than maintain a separate map of just the metric types, or add metric
-				// specific implementation details to the CollectorDiagnostic interface.
-				// generally, we _should_ be able to make this assertion -- but we'll check in case.
-				if metricDiag, isType := metric.(*metricDiagnostic); isType {
-					// mark the timestamp as "seen" with the value
-					metricDiag.result[timestamp] = update.Value
-				}
-			}
-		}
-	}()
-
-	// We are still maintaining the order in which the updates to the repo are called
-	// as this function gets the new call only when both these go routines are done
-	go func() {
-		defer wg.Done()
-		d.updater.Update(updateSet)
-	}()
-
-	wg.Wait()
-}
-
-// appends an update timestamp on each of the metric diagnostics -- we need to write
-// every timestamp that the update makes unfortunately. There isn't a way to determine
-// if a diagnostic service "cycle" is complete, so it's not really possible to maintain
-// a most recent timestamps on the DiagnosticsModule (the optimal solution). we're not
-// far from a solid design here, just might need some more support on the diagnostic
-// service side.
-func (d *DiagnosticsModule) addUpdateTimestamp(t time.Time) {
-	for _, def := range diagnosticDefinitions {
-		if def.DiagType != DiagnosticTypeMetric {
-			continue
-		}
-
-		diag, ok := d.diagnostics.ById(def.ID)
-		if !ok {
-			continue
-		}
-
-		// More duck typing sadly -- there are some fundamental design incompatibilities
-		// with the way DiagnosticService was written and this cached diagnostic approach
-		// that make things like "cycle" resets a bit difficult
-		if metricDiag, ok := diag.(*metricDiagnostic); ok {
-			metricDiag.updateTimestamps = append(metricDiag.updateTimestamps, t)
-		}
-	}
-}
-
 // DiagnosticDefinitions returns a deterministic mapping of pre-defined diagnostics used with the collector.
 // DiagnosticDefinitions returns a deterministic mapping of pre-defined diagnostics used with the collector.
 func (d *DiagnosticsModule) DiagnosticsDefinitions() map[string]*diagnosticDefinition {
 func (d *DiagnosticsModule) DiagnosticsDefinitions() map[string]*diagnosticDefinition {
 	return diagnosticDefinitions
 	return diagnosticDefinitions

+ 1 - 190
modules/collector-source/pkg/metric/diagnostics_test.go

@@ -9,57 +9,8 @@ import (
 	"github.com/opencost/opencost/modules/collector-source/pkg/event"
 	"github.com/opencost/opencost/modules/collector-source/pkg/event"
 )
 )
 
 
-// MockUpdater implements the Updater interface for testing
-type MockUpdater struct {
-}
-
-func (m *MockUpdater) Update(updateSet *UpdateSet) {
-}
-
-// Test Update func in DiagnosticsModule and check if diagnostics pass
-func TestDiagnosticsModule_Update(t *testing.T) {
-	mockUpdater := &MockUpdater{}
-	module := NewDiagnosticsModule(mockUpdater)
-
-	// Test with valid update set containing node metrics
-	timestamp := time.Now()
-	updateSet := &UpdateSet{
-		Timestamp: timestamp,
-		Updates: []Update{
-			{
-				Name:  KubeNodeStatusCapacityCPUCores,
-				Value: 4.0,
-			},
-			{
-				Name:  NodeTotalHourlyCost,
-				Value: 0.50,
-			},
-		},
-	}
-
-	module.Update(updateSet)
-
-	// Check both diagnostics
-	nodeDetails, err := module.DiagnosticsDetails(NodesDiagnosticMetricID)
-	if err != nil {
-		t.Error("Expected no error for valid diagnostic ID")
-	}
-	if nodeDetails["passed"] != true {
-		t.Error("Expected node diagnostic to pass")
-	}
-
-	opencostDetails, err := module.DiagnosticsDetails(OpencostDiagnosticMetricID)
-	if err != nil {
-		t.Error("Expected no error for valid diagnostic ID")
-	}
-	if opencostDetails["passed"] != true {
-		t.Error("Expected kubecost diagnostic to pass")
-	}
-}
-
 func TestDiagnosticsModule_ScrapeDiagnostics(t *testing.T) {
 func TestDiagnosticsModule_ScrapeDiagnostics(t *testing.T) {
-	mockUpdater := &MockUpdater{}
-	module := NewDiagnosticsModule(mockUpdater)
+	module := NewDiagnosticsModule()
 
 
 	// dispatch some faux scrape events
 	// dispatch some faux scrape events
 	events.Dispatch(event.ScrapeEvent{
 	events.Dispatch(event.ScrapeEvent{
@@ -154,143 +105,3 @@ func TestDiagnosticsModule_ScrapeDiagnostics(t *testing.T) {
 		return
 		return
 	}
 	}
 }
 }
-
-// Test Update func in DiagnosticsModule with missing metrics and test if diagnostics fail
-func TestDiagnosticsModule_UpdateWithMissingMetrics(t *testing.T) {
-	mockUpdater := &MockUpdater{}
-	module := NewDiagnosticsModule(mockUpdater)
-
-	timestamp := time.Now()
-	updateSet := &UpdateSet{
-		Timestamp: timestamp,
-		Updates: []Update{
-			{
-				Name:  "some_other_metric",
-				Value: 1.0,
-			},
-		},
-	}
-
-	module.Update(updateSet)
-
-	// Check that diagnostics fail when their metrics are missing
-	nodeDetails, err := module.DiagnosticsDetails(NodesDiagnosticMetricID)
-	if err != nil {
-		t.Error("Expected no error for valid diagnostic ID")
-	}
-	if nodeDetails["passed"] != false {
-		t.Error("Expected node diagnostic to fail when metric is missing")
-	}
-
-	kubecostDetails, err := module.DiagnosticsDetails(OpencostDiagnosticMetricID)
-	if err != nil {
-		t.Error("Expected no error for valid diagnostic ID")
-	}
-	if kubecostDetails["passed"] != false {
-		t.Error("Expected kubecost diagnostic to fail when metric is missing")
-	}
-}
-
-// Test DiagnosticsDetails func in DiagnosticsModule with invalid and valid diagnostic IDs
-func TestDiagnosticsModule_DiagnosticsDetails(t *testing.T) {
-	mockUpdater := &MockUpdater{}
-	module := NewDiagnosticsModule(mockUpdater)
-
-	// Test with invalid diagnostic ID
-	_, err := module.DiagnosticsDetails("invalid_id")
-	if err.Error() != "invalid diagnostic id: invalid_id not found" {
-		t.Error("Expected error for invalid diagnostic ID")
-	}
-
-	// Test with valid diagnostic ID
-	details, err := module.DiagnosticsDetails(NodesDiagnosticMetricID)
-	if err != nil {
-		t.Error("Expected no error for valid diagnostic ID")
-	}
-	if details["error"] != nil {
-		t.Error("Expected no error for valid diagnostic ID")
-	}
-
-	// Check required fields
-	requiredFields := []string{"query", "label", "result", "passed", "docLink"}
-	for _, field := range requiredFields {
-		if details[field] == nil {
-			t.Errorf("Expected field %s to be present", field)
-		}
-	}
-}
-
-// Test concurrent access(race condition) to DiagnosticsModule
-func TestDiagnosticsModule_ConcurrentAccess(t *testing.T) {
-	mockUpdater := &MockUpdater{}
-	module := NewDiagnosticsModule(mockUpdater)
-
-	// Test concurrent access to diagnostics
-	done := make(chan bool, 2)
-
-	go func() {
-		for i := 0; i < 100; i++ {
-			module.DiagnosticsDefinitions()
-		}
-		done <- true
-	}()
-
-	go func() {
-		for i := 0; i < 100; i++ {
-			timestamp := time.Now()
-			updateSet := &UpdateSet{
-				Timestamp: timestamp,
-				Updates: []Update{
-					{
-						Name:  KubeNodeStatusCapacityCPUCores,
-						Value: float64(i),
-					},
-				},
-			}
-			module.Update(updateSet)
-		}
-		done <- true
-	}()
-
-	<-done
-	<-done
-	// If we get here without a race condition, the test passes
-}
-
-// Test reset of diagnostics after details are retrieved
-func TestDiagnosticsModule_ResetAfterDetails(t *testing.T) {
-	mockUpdater := &MockUpdater{}
-	module := NewDiagnosticsModule(mockUpdater)
-
-	// Add some data
-	timestamp := time.Now()
-	updateSet := &UpdateSet{
-		Timestamp: timestamp,
-		Updates: []Update{
-			{
-				Name:  KubeNodeStatusCapacityCPUCores,
-				Value: 4.0,
-			},
-		},
-	}
-
-	module.Update(updateSet)
-
-	// Get details (this should reset the diagnostic)
-	details, err := module.DiagnosticsDetails(NodesDiagnosticMetricID)
-	if err != nil {
-		t.Error("Expected no error for valid diagnostic ID")
-	}
-	if details["passed"] != true {
-		t.Error("Expected diagnostic to pass before reset")
-	}
-
-	// Get details again (should be reset)
-	details2, err := module.DiagnosticsDetails(NodesDiagnosticMetricID)
-	if err != nil {
-		t.Error("Expected no error for valid diagnostic ID")
-	}
-	if details2["passed"] != false {
-		t.Error("Expected diagnostic to be reset after first details call")
-	}
-}

+ 16 - 8
modules/collector-source/pkg/scrape/dcgm.go

@@ -9,6 +9,7 @@ import (
 	"github.com/opencost/opencost/modules/collector-source/pkg/event"
 	"github.com/opencost/opencost/modules/collector-source/pkg/event"
 	"github.com/opencost/opencost/modules/collector-source/pkg/metric"
 	"github.com/opencost/opencost/modules/collector-source/pkg/metric"
 	"github.com/opencost/opencost/modules/collector-source/pkg/scrape/target"
 	"github.com/opencost/opencost/modules/collector-source/pkg/scrape/target"
+	v1 "k8s.io/api/core/v1"
 )
 )
 
 
 var dcgmRegex = regexp.MustCompile("(?i)(.*dcgm-exporter.*)")
 var dcgmRegex = regexp.MustCompile("(?i)(.*dcgm-exporter.*)")
@@ -31,25 +32,32 @@ func newDCGMTargetScraper(provider target.TargetProvider) *TargetScraper {
 
 
 type DCGMTargetProvider struct {
 type DCGMTargetProvider struct {
 	clusterCache clustercache.ClusterCache
 	clusterCache clustercache.ClusterCache
+	port         int
 }
 }
 
 
 func newDCGMTargetProvider(clusterCache clustercache.ClusterCache) *DCGMTargetProvider {
 func newDCGMTargetProvider(clusterCache clustercache.ClusterCache) *DCGMTargetProvider {
 	return &DCGMTargetProvider{
 	return &DCGMTargetProvider{
 		clusterCache: clusterCache,
 		clusterCache: clusterCache,
+		port:         9400,
 	}
 	}
 }
 }
 
 
 func (p *DCGMTargetProvider) GetTargets() []target.ScrapeTarget {
 func (p *DCGMTargetProvider) GetTargets() []target.ScrapeTarget {
-	svcs := p.clusterCache.GetAllServices()
+	// NOTE: The proper way to discover these targets is to first identify a Service that
+	// NOTE: matches a specific selector. Then, locate the Endpoints kubernetes resource associated
+	// NOTE: with that Service. This Endpoints resource has a list of all the targetted pods and their
+	// NOTE: addresses. We do _not_ have the Endpoints resource on our cluster cache at the moment,
+	// NOTE: so we'll perform this lookup ourselves.
+	pods := p.clusterCache.GetAllPods()
+
 	var targets []target.ScrapeTarget
 	var targets []target.ScrapeTarget
-	for _, svc := range svcs {
-		if svc.ClusterIP == "" || !isDCGM(svc.SpecSelector) {
-			continue
+	for _, pod := range pods {
+		if pod.Status.Phase == v1.PodRunning && isDCGM(pod.Labels) {
+			log.Debugf("DCGM: found target: http://%s:%d/metrics", pod.Status.PodIP, p.port)
+
+			t := target.NewUrlTarget(fmt.Sprintf("http://%s:%d/metrics", pod.Status.PodIP, p.port))
+			targets = append(targets, t)
 		}
 		}
-		port := 9400
-		log.Debugf("DCGM: found target: http://%s:%d/metrics", svc.ClusterIP, port)
-		t := target.NewUrlTarget(fmt.Sprintf("http://%s:%d/metrics", svc.ClusterIP, port))
-		targets = append(targets, t)
 	}
 	}
 
 
 	return targets
 	return targets

+ 18 - 3
modules/collector-source/pkg/scrape/network.go

@@ -8,6 +8,12 @@ import (
 	"github.com/opencost/opencost/modules/collector-source/pkg/event"
 	"github.com/opencost/opencost/modules/collector-source/pkg/event"
 	"github.com/opencost/opencost/modules/collector-source/pkg/metric"
 	"github.com/opencost/opencost/modules/collector-source/pkg/metric"
 	"github.com/opencost/opencost/modules/collector-source/pkg/scrape/target"
 	"github.com/opencost/opencost/modules/collector-source/pkg/scrape/target"
+	v1 "k8s.io/api/core/v1"
+)
+
+const (
+	NetworkCostsNameLabel     = "network-costs"
+	NetworkCostsInstanceLabel = "kubecost"
 )
 )
 
 
 func newNetworkScraper(
 func newNetworkScraper(
@@ -42,14 +48,18 @@ func NewNetworkTargetProvider(port int, clusterCache clustercache.ClusterCache)
 }
 }
 
 
 func (n *NetworkTargetProvider) GetTargets() []target.ScrapeTarget {
 func (n *NetworkTargetProvider) GetTargets() []target.ScrapeTarget {
+	// NOTE: The proper way to discover these targets is to first identify a Service that
+	// NOTE: matches a specific selector. Then, locate the Endpoints kubernetes resource associated
+	// NOTE: with that Service. This Endpoints resource has a list of all the targetted pods and their
+	// NOTE: addresses. We do _not_ have the Endpoints resource on our cluster cache at the moment,
+	// NOTE: so we'll perform this lookup ourselves.
 	pods := n.clusterCache.GetAllPods()
 	pods := n.clusterCache.GetAllPods()
 
 
 	var targets []target.ScrapeTarget
 	var targets []target.ScrapeTarget
 	for _, pod := range pods {
 	for _, pod := range pods {
-		instance := pod.Labels["app.kubernetes.io/instance"]
-		name := pod.Labels["app.kubernetes.io/name"]
-		if name == "network-costs" && instance == "kubecost" && pod.Status.Phase == "Running" {
+		if pod.Status.Phase == v1.PodRunning && isNetworkCosts(pod.Labels) {
 			log.Debugf("Network: found target for http://%s:%d/metrics", pod.Status.PodIP, n.port)
 			log.Debugf("Network: found target for http://%s:%d/metrics", pod.Status.PodIP, n.port)
+
 			t := target.NewUrlTarget(fmt.Sprintf("http://%s:%d/metrics", pod.Status.PodIP, n.port))
 			t := target.NewUrlTarget(fmt.Sprintf("http://%s:%d/metrics", pod.Status.PodIP, n.port))
 			targets = append(targets, t)
 			targets = append(targets, t)
 		}
 		}
@@ -57,3 +67,8 @@ func (n *NetworkTargetProvider) GetTargets() []target.ScrapeTarget {
 
 
 	return targets
 	return targets
 }
 }
+
+func isNetworkCosts(labels map[string]string) bool {
+	return labels["app.kubernetes.io/name"] == NetworkCostsNameLabel &&
+		labels["app.kubernetes.io/instance"] == NetworkCostsInstanceLabel
+}