2
0
Эх сурвалжийг харах

Utilities updates to work with iterators, better unit tests, data source updates to support diagnostics, and diagnostics exporter

Matt Bolt 1 жил өмнө
parent
commit
872082ac65

+ 1 - 1
core/go.mod

@@ -1,6 +1,6 @@
 module github.com/opencost/opencost/core
 
-go 1.23.0
+go 1.24.2
 
 require (
 	cloud.google.com/go/storage v1.36.0

+ 85 - 0
core/pkg/diagnostics/diagnostics.go

@@ -0,0 +1,85 @@
+package diagnostics
+
+import (
+	"context"
+	"time"
+)
+
+// DiagnosticsEventName is used to represent the name of the diagnostics export pipeline event to categorize for storage.
+const DiagnosticsEventName string = "diagnostics"
+
+// DiagnosticResult represent the result of a diagnostic run, and contains basic diagnostic information and additional
+// custom diagnostic information appended by the specific runner.
+type DiagnosticResult struct {
+	// Unique Identifier for the diagnostic run result.
+	ID string `json:"id"`
+
+	// Name of the diagnostic that ran.
+	Name string `json:"name"`
+
+	// Description of the diagnostic run, human readable description of what the diagnostic shows.
+	Description string `json:"description"`
+
+	// Category of the diagnostic run, which can be used to group similar diagnostics together.
+	Category string `json:"category"`
+
+	// Timestamp containing the time when the diagnostic run was executed.
+	Timestamp time.Time `json:"timestamp"`
+
+	// Error message if the diagnostic run failed. If this field is non-empty, the diagnostic run should be
+	// considered a failure.
+	Error string `json:"error,omitempty"`
+
+	// Details contains additional custom information about the diagnostic run that can be added by the diagnostic
+	// runner.
+	Details map[string]any `json:"details,omitempty"`
+}
+
+// DiagnosticsRunReport is a struct that contains the start time of the diagnostics run, and all of the results.
+type DiagnosticsRunReport struct {
+	// StartTime contains the time when the full diagnostics run started
+	StartTime time.Time `json:"startTime"`
+
+	// Results contains all of the results of the diagnostics run.
+	Results []*DiagnosticResult `json:"results"`
+}
+
+// DiagnosticRunner is a function that executes a diagnostic and returns the result. The function should return a map containing
+// any additional information about the diagnostic run, and a detailed error if the run failed.
+type DiagnosticRunner func(context.Context) (map[string]any, error)
+
+// Diagnostic is a struct that contains the basic information about a registed diagnostic within a DiagnosticService.
+type Diagnostic struct {
+	// Name of the diagnostic that is registered.
+	Name string
+
+	// Description of the diagnostic that is registered.
+	Description string
+
+	// Category of the diagnostic that is registered.
+	Category string
+}
+
+// DiagnosticService is an interface that defines the basic contract for a service that registers and runs diagnostics on demand and provides
+// the results.
+type DiagnosticService interface {
+	// Register registers a new diagnostic runner implementation with the service that will run the next time diagnostics are requested.
+	// An error is returned if a runner failed to register. Note that category _and_ name must be a unique combination.
+	Register(name, description, category string, runner DiagnosticRunner) error
+
+	// Unregister unregisters a diagnostic runner implementation with the service. True is returned if the runner was unregistered successfully,
+	// false otherwise.
+	Unregister(name, category string) bool
+
+	// Run executes all registered diagnostics and returns the results.
+	Run(ctx context.Context) []*DiagnosticResult
+
+	// RunCategory executes all registered diagnostics in the provided category.
+	RunCategory(ctx context.Context, category string) []*DiagnosticResult
+
+	// RunDiagnostic executes a specific diagnostic by category and name. If the diagnostic does not exist, nil is returned.
+	RunDiagnostic(ctx context.Context, category, name string) *DiagnosticResult
+
+	// Diagnostics returns a list of all registered diagnostics.
+	Diagnostics() []Diagnostic
+}

+ 15 - 0
core/pkg/diagnostics/exporter/controller.go

@@ -0,0 +1,15 @@
+package exporter
+
+import (
+	"github.com/opencost/opencost/core/pkg/diagnostics"
+	"github.com/opencost/opencost/core/pkg/exporter"
+	"github.com/opencost/opencost/core/pkg/storage"
+)
+
+// NewDiagnosticsExportController creates a new EventExportController for DiagnosticsRunReport events.
+func NewDiagnosticsExportController(clusterId string, store storage.Storage, service diagnostics.DiagnosticService) *exporter.EventExportController[diagnostics.DiagnosticsRunReport] {
+	return exporter.NewEventExportController(
+		NewDiagnosticSource(service),
+		NewDiagnosticExporter(clusterId, store),
+	)
+}

+ 11 - 0
core/pkg/diagnostics/exporter/encoder.go

@@ -0,0 +1,11 @@
+package exporter
+
+import (
+	"github.com/opencost/opencost/core/pkg/diagnostics"
+	"github.com/opencost/opencost/core/pkg/exporter"
+)
+
+// NewDiagnosticsEncoder returns a JSON encoder used to encode DiagnosticsRunReport events.
+func NewDiagnosticsEncoder() exporter.Encoder[diagnostics.DiagnosticsRunReport] {
+	return exporter.NewJSONEncoder[diagnostics.DiagnosticsRunReport]()
+}

+ 25 - 0
core/pkg/diagnostics/exporter/exporter.go

@@ -0,0 +1,25 @@
+package exporter
+
+import (
+	"github.com/opencost/opencost/core/pkg/diagnostics"
+	"github.com/opencost/opencost/core/pkg/exporter"
+	"github.com/opencost/opencost/core/pkg/exporter/pathing"
+	"github.com/opencost/opencost/core/pkg/log"
+	"github.com/opencost/opencost/core/pkg/storage"
+)
+
+// NewDiagnosticExporter creates a new `StorageExporter[DiagnosticsRunReport]` instance for exporting diagnostic run events.
+func NewDiagnosticExporter(clusterId string, storage storage.Storage) *exporter.StorageExporter[diagnostics.DiagnosticsRunReport] {
+	pathing, err := pathing.NewEventStoragePathFormatter("", clusterId, diagnostics.DiagnosticsEventName)
+	if err != nil {
+		log.Errorf("failed to create pathing formatter: %v", err)
+		return nil
+	}
+
+	return exporter.NewStorageExporter(
+		diagnostics.DiagnosticsEventName,
+		pathing,
+		NewDiagnosticsEncoder(),
+		storage,
+	)
+}

+ 35 - 0
core/pkg/diagnostics/exporter/source.go

@@ -0,0 +1,35 @@
+package exporter
+
+import (
+	"context"
+	"time"
+
+	"github.com/opencost/opencost/core/pkg/diagnostics"
+)
+
+// DiagnosticSource is an `export.ExportSource` implementation that provides the basic data for a `DiagnosticResult` payload.
+type DiagnosticSource struct {
+	diagnosticService diagnostics.DiagnosticService
+}
+
+// NewDiagnosticSource creates a new `HeartbeatSource` instance. The `provider` parameter is used to inject custom metadata,
+// but can be set to `nil` if no metadata is needed.
+func NewDiagnosticSource(diagnosticService diagnostics.DiagnosticService) *DiagnosticSource {
+	return &DiagnosticSource{
+		diagnosticService: diagnosticService,
+	}
+}
+
+// Make creates a new `DiagnosticsRunReport` instance with the provided current time.
+func (ds *DiagnosticSource) Make(t time.Time) *diagnostics.DiagnosticsRunReport {
+	ctx := context.Background()
+
+	return &diagnostics.DiagnosticsRunReport{
+		StartTime: t,
+		Results:   ds.diagnosticService.Run(ctx),
+	}
+}
+
+func (ds *DiagnosticSource) Name() string {
+	return diagnostics.DiagnosticsEventName + "-source"
+}

+ 173 - 0
core/pkg/diagnostics/service.go

@@ -0,0 +1,173 @@
+package diagnostics
+
+import (
+	"context"
+	"fmt"
+	"iter"
+	"maps"
+	"slices"
+	"sync"
+	"time"
+
+	"github.com/google/uuid"
+	"github.com/opencost/opencost/core/pkg/util/maputil"
+	"github.com/opencost/opencost/core/pkg/util/worker"
+)
+
+// basic composite type for diagnostics and the runner function
+type runner struct {
+	diagnostic Diagnostic
+	run        DiagnosticRunner
+}
+
+// OpencostDiagnosticsService is an implementation of the `DiagnosticService` contract that provides concurrent diagnostic
+// execution and result collection.
+type OpencostDiagnosticService struct {
+	lock    sync.RWMutex
+	runners map[string]map[string]*runner
+}
+
+func NewDiagnosticService() DiagnosticService {
+	return &OpencostDiagnosticService{
+		runners: make(map[string]map[string]*runner),
+	}
+}
+
+// Register registers a new diagnostic runner implementation with the service that will run the next time diagnostics are requested.
+// An error is returned if a runner failed to register. Note that category _and_ name must be a unique combination.
+func (ocds *OpencostDiagnosticService) Register(name string, description string, category string, r DiagnosticRunner) error {
+	ocds.lock.Lock()
+	defer ocds.lock.Unlock()
+
+	categoryRunners, exists := ocds.runners[category]
+	if !exists {
+		categoryRunners = make(map[string]*runner)
+		ocds.runners[category] = categoryRunners
+	}
+
+	if _, exists := categoryRunners[name]; exists {
+		return fmt.Errorf("runner with name %s already exists in category %s", name, category)
+	}
+
+	categoryRunners[name] = &runner{
+		diagnostic: Diagnostic{
+			Name:        name,
+			Description: description,
+			Category:    category,
+		},
+		run: r,
+	}
+
+	return nil
+}
+
+// Unregister unregisters a diagnostic runner implementation with the service. True is returned if the runner was unregistered successfully,
+// false otherwise.
+func (ocds *OpencostDiagnosticService) Unregister(name string, category string) bool {
+	ocds.lock.Lock()
+	defer ocds.lock.Unlock()
+
+	categoryRunners, exists := ocds.runners[category]
+	if !exists {
+		return false
+	}
+
+	if _, exists := categoryRunners[name]; !exists {
+		return false
+	}
+
+	delete(categoryRunners, name)
+	if len(categoryRunners) == 0 {
+		delete(ocds.runners, category)
+	}
+
+	return true
+}
+
+// Run executes all registered diagnostics and returns the results.
+func (ocds *OpencostDiagnosticService) Run(ctx context.Context) []*DiagnosticResult {
+	ocds.lock.RLock()
+	defer ocds.lock.RUnlock()
+
+	return runAll(ctx, maputil.Flatten(ocds.runners))
+}
+
+// RunCategory executes all registered diagnostics in the provided category.
+func (ocds *OpencostDiagnosticService) RunCategory(ctx context.Context, category string) []*DiagnosticResult {
+	ocds.lock.RLock()
+	defer ocds.lock.RUnlock()
+
+	categoryRunners, exists := ocds.runners[category]
+	if !exists {
+		return nil
+	}
+
+	return runAll(ctx, maps.Values(categoryRunners))
+}
+
+// RunDiagnostic executes a specific diagnostic by category and name. If the diagnostic does not exist, nil is returned.
+func (ocds *OpencostDiagnosticService) RunDiagnostic(ctx context.Context, category, name string) *DiagnosticResult {
+	ocds.lock.RLock()
+	defer ocds.lock.RUnlock()
+
+	categoryRunners, exists := ocds.runners[category]
+	if !exists {
+		return nil
+	}
+
+	r, exists := categoryRunners[name]
+	if !exists {
+		return nil
+	}
+
+	diagRunner := diagRunnerFor(ctx)
+
+	return diagRunner(r)
+}
+
+// runAll executes all runners in the provided iterator with a specific worker pool size,
+// and returns the results when all diagnostic runners have completed.
+func runAll(ctx context.Context, runners iter.Seq[*runner]) []*DiagnosticResult {
+	allContext, cancel := context.WithCancel(ctx)
+	defer cancel()
+
+	return worker.ConcurrentIterCollect(5, diagRunnerFor(allContext), runners)
+}
+
+// diagRunnerFor returns a diagnostic runner function that executes the diagnostic and creates the DiagnosticResult
+// leveraging the provided context as a parent.
+func diagRunnerFor(ctx context.Context) func(*runner) *DiagnosticResult {
+	return func(r *runner) *DiagnosticResult {
+		result := &DiagnosticResult{
+			ID:          uuid.Must(uuid.NewV7()).String(),
+			Name:        r.diagnostic.Name,
+			Description: r.diagnostic.Description,
+			Category:    r.diagnostic.Category,
+		}
+
+		c, cancelDiag := context.WithTimeout(ctx, 5*time.Second)
+		defer cancelDiag()
+
+		details, err := r.run(c)
+		if err != nil {
+			result.Error = err.Error()
+		} else {
+			result.Details = details
+		}
+
+		result.Timestamp = time.Now().UTC()
+		return result
+	}
+}
+
+// Diagnostics returns a list of all registered diagnostics.
+func (ocds *OpencostDiagnosticService) Diagnostics() []Diagnostic {
+	ocds.lock.RLock()
+	defer ocds.lock.RUnlock()
+
+	diagnostics := maputil.FlatMap(ocds.runners, func(r *runner) Diagnostic {
+		return r.diagnostic
+	})
+
+	return slices.Collect(diagnostics)
+}

+ 430 - 0
core/pkg/diagnostics/service_test.go

@@ -0,0 +1,430 @@
+package diagnostics
+
+import (
+	"cmp"
+	"context"
+	"fmt"
+	"slices"
+	"testing"
+	"time"
+
+	"github.com/opencost/opencost/core/pkg/util/json"
+)
+
+const (
+	TestDiagnosticNameA       = "TestDiagnosticA"
+	TestDiagnosticNameB       = "TestDiagnosticB"
+	TestDiagnosticNameC       = "TestDiagnosticC"
+	TestDiagnosticNameD       = "TestDiagnosticD"
+	TestDiagnosticNameE       = "TestDiagnosticE"
+	TestDiagnosticNameF       = "TestDiagnosticF"
+	TestDiagnosticNameTimeout = "TestDiagnosticTimeout"
+
+	TestDiagnosticDescriptionA       = "Diagnostic A Description..."
+	TestDiagnosticDescriptionB       = "Diagnostic B Description..."
+	TestDiagnosticDescriptionC       = "Diagnostic C Description..."
+	TestDiagnosticDescriptionD       = "Diagnostic D Description..."
+	TestDiagnosticDescriptionE       = "Diagnostic E Description..."
+	TestDiagnosticDescriptionF       = "Diagnostic F Description..."
+	TestDiagnosticDescriptionTimeout = "Diagnostic Timeout will run for longer than 5 seconds..."
+
+	TestDiagnosticCategoryBlue  = "TestCategoryBlue"
+	TestDiagnosticCategoryRed   = "TestCategoryRed"
+	TestDiagnosticCategoryGreen = "TestCategoryGreen"
+)
+
+// TestDiagnostic is a general structure used to capture test diagnostic data
+type TestDiagnostic struct {
+	Name        string
+	Description string
+	Category    string
+	Run         DiagnosticRunner
+}
+
+// generate a runner func that will run for the provided duration and return a map with the key: "test"
+// and the value of testName provided.
+func runnerFor(testName string, duration time.Duration) DiagnosticRunner {
+	return func(ctx context.Context) (map[string]any, error) {
+		fmt.Printf("Running Diagnostic: %s\n", testName)
+		defer fmt.Printf("Finished Diagnostic: %s\n", testName)
+
+		select {
+		case <-ctx.Done():
+			fmt.Printf("context cancelled: %v\n", ctx.Err())
+			return nil, ctx.Err()
+		case <-time.After(duration):
+			return map[string]any{
+				"test": testName,
+			}, nil
+		}
+	}
+}
+
+var (
+	TestDiagnosticA = TestDiagnostic{
+		Name:        TestDiagnosticNameA,
+		Description: TestDiagnosticDescriptionA,
+		Category:    TestDiagnosticCategoryRed,
+		Run:         runnerFor(TestDiagnosticNameA, 250*time.Millisecond),
+	}
+	TestDiagnosticB = TestDiagnostic{
+		Name:        TestDiagnosticNameB,
+		Description: TestDiagnosticDescriptionB,
+		Category:    TestDiagnosticCategoryRed,
+		Run:         runnerFor(TestDiagnosticNameB, 150*time.Millisecond),
+	}
+	TestDiagnosticC = TestDiagnostic{
+		Name:        TestDiagnosticNameC,
+		Description: TestDiagnosticDescriptionC,
+		Category:    TestDiagnosticCategoryBlue,
+		Run:         runnerFor(TestDiagnosticNameC, 350*time.Millisecond),
+	}
+	TestDiagnosticD = TestDiagnostic{
+		Name:        TestDiagnosticNameD,
+		Description: TestDiagnosticDescriptionD,
+		Category:    TestDiagnosticCategoryBlue,
+		Run:         runnerFor(TestDiagnosticNameD, 450*time.Millisecond),
+	}
+	TestDiagnosticE = TestDiagnostic{
+		Name:        TestDiagnosticNameE,
+		Description: TestDiagnosticDescriptionE,
+		Category:    TestDiagnosticCategoryGreen,
+		Run:         runnerFor(TestDiagnosticNameE, 550*time.Millisecond),
+	}
+	TestDiagnosticF = TestDiagnostic{
+		Name:        TestDiagnosticNameF,
+		Description: TestDiagnosticDescriptionF,
+		Category:    TestDiagnosticCategoryGreen,
+		Run:         runnerFor(TestDiagnosticNameF, 650*time.Millisecond),
+	}
+	TestDiagnosticTimeout = TestDiagnostic{
+		Name:        TestDiagnosticNameTimeout,
+		Description: TestDiagnosticDescriptionTimeout,
+		Category:    TestDiagnosticCategoryGreen,
+		Run:         runnerFor(TestDiagnosticNameTimeout, 6*time.Second),
+	}
+)
+
+func TestDiagnosticsRegisterAndRun(t *testing.T) {
+	t.Parallel()
+
+	d := NewDiagnosticService()
+
+	diags := []TestDiagnostic{
+		TestDiagnosticA,
+		TestDiagnosticB,
+		TestDiagnosticC,
+		TestDiagnosticD,
+		TestDiagnosticE,
+		TestDiagnosticF,
+	}
+
+	for _, diag := range diags {
+		if err := d.Register(diag.Name, diag.Description, diag.Category, diag.Run); err != nil {
+			t.Fatalf("failed to register diagnostic %s: %v", diag.Name, err)
+		}
+	}
+
+	// Register a duplicate diagnostic and expect an error
+	err := d.Register(TestDiagnosticA.Name, TestDiagnosticA.Description, TestDiagnosticA.Category, TestDiagnosticA.Run)
+	if err == nil {
+		t.Fatalf("expected error when registering duplicate diagnostic %s", TestDiagnosticA.Name)
+	}
+
+	c := context.Background()
+	results := d.Run(c)
+
+	if len(results) != len(diags) {
+		t.Fatalf("expected %d results, got %d", len(diags), len(results))
+	}
+
+	for _, result := range results {
+		if result.Error != "" {
+			t.Errorf("expected no error, got %s", result.Error)
+		}
+
+		if result.Category == "" {
+			t.Errorf("expected category, got empty")
+		}
+
+		if result.Name == "" {
+			t.Errorf("expected name, got empty")
+		}
+
+		if result.Timestamp.IsZero() {
+			t.Errorf("expected timestamp, got zero")
+		}
+
+		if result.Details == nil {
+			t.Errorf("expected details, got nil")
+		}
+
+		if result.Details["test"] != result.Name {
+			t.Errorf("expected test name %s, got %s", result.Name, result.Details["test"])
+		}
+
+		j, err := json.Marshal(result)
+		if err != nil {
+			t.Errorf("failed to marshal result: %v", err)
+		}
+		js := string(j)
+		if js == "" {
+			t.Errorf("expected non-empty JSON, got empty")
+		}
+
+		t.Logf("%s", js)
+	}
+}
+
+func TestDiagnosticsServiceTimeout(t *testing.T) {
+	t.Parallel()
+
+	d := NewDiagnosticService()
+
+	diags := []TestDiagnostic{
+		TestDiagnosticA,
+		TestDiagnosticB,
+		TestDiagnosticC,
+		TestDiagnosticTimeout,
+	}
+
+	for _, diag := range diags {
+		if err := d.Register(diag.Name, diag.Description, diag.Category, diag.Run); err != nil {
+			t.Fatalf("failed to register diagnostic %s: %v", diag.Name, err)
+		}
+	}
+
+	c := context.Background()
+	results := d.Run(c)
+
+	if len(results) != len(diags) {
+		t.Fatalf("expected %d results, got %d", len(diags), len(results))
+	}
+
+	foundTimeoutDiagnostic := false
+
+	for _, result := range results {
+		if result.Name == TestDiagnosticNameTimeout {
+			foundTimeoutDiagnostic = true
+			if result.Error == "" {
+				t.Errorf("expected timeout error, but got empty error")
+			} else {
+				t.Logf("Diagnostic %s/%s completed with error as expected: %s", result.Category, result.Name, result.Error)
+			}
+		} else {
+			t.Logf("Diagnostic %s/%s completed successfully", result.Category, result.Name)
+		}
+	}
+
+	if !foundTimeoutDiagnostic {
+		t.Errorf("expected to find timeout diagnostic, but it was not found")
+	}
+}
+
+func TestDiagnosticsList(t *testing.T) {
+	t.Parallel()
+
+	d := NewDiagnosticService()
+
+	diags := []TestDiagnostic{
+		TestDiagnosticA,
+		TestDiagnosticB,
+		TestDiagnosticC,
+		TestDiagnosticD,
+		TestDiagnosticE,
+		TestDiagnosticF,
+	}
+
+	for _, diag := range diags {
+		if err := d.Register(diag.Name, diag.Description, diag.Category, diag.Run); err != nil {
+			t.Fatalf("failed to register diagnostic %s: %v", diag.Name, err)
+		}
+	}
+
+	diagList := d.Diagnostics()
+	slices.SortFunc(diagList, func(a, b Diagnostic) int {
+		return cmp.Compare(a.Category+"/"+a.Name, b.Category+"/"+b.Name)
+	})
+
+	slices.SortFunc(diags, func(a, b TestDiagnostic) int {
+		return cmp.Compare(a.Category+"/"+a.Name, b.Category+"/"+b.Name)
+	})
+
+	if !slices.EqualFunc(diags, diagList, isEqual) {
+		t.Errorf("expected diagnostics list to match registered diagnostics")
+	}
+
+	for _, diagItem := range diagList {
+		t.Logf("Diagnostic: %+v", diagItem)
+	}
+}
+
+func TestUnregisterDiagnostic(t *testing.T) {
+	t.Parallel()
+
+	d := NewDiagnosticService()
+
+	diags := []TestDiagnostic{
+		TestDiagnosticA,
+		TestDiagnosticB,
+		TestDiagnosticC,
+		TestDiagnosticD,
+		TestDiagnosticE,
+		TestDiagnosticF,
+	}
+
+	for _, diag := range diags {
+		if err := d.Register(diag.Name, diag.Description, diag.Category, diag.Run); err != nil {
+			t.Fatalf("failed to register diagnostic %s: %v", diag.Name, err)
+		}
+	}
+
+	if !d.Unregister(TestDiagnosticNameA, TestDiagnosticCategoryRed) {
+		t.Errorf("failed to unregister diagnostic %s/%s", TestDiagnosticCategoryRed, TestDiagnosticNameA)
+	}
+
+	if d.Unregister(TestDiagnosticNameA, TestDiagnosticCategoryRed) {
+		t.Errorf("unregistering diagnostic %s/%s again should fail", TestDiagnosticCategoryRed, TestDiagnosticNameA)
+	}
+
+	if d.Unregister(TestDiagnosticNameB, "nonexistent") {
+		t.Errorf("unregistering nonexistent diagnostic should fail")
+	}
+
+	results := d.Run(context.Background())
+	if len(results) != len(diags)-1 {
+		t.Fatalf("expected %d results, got %d", len(diags)-1, len(results))
+	}
+
+	for _, result := range results {
+		if result.Name == TestDiagnosticNameA {
+			t.Errorf("expected diagnostic %s/%s to be unregistered", TestDiagnosticCategoryRed, TestDiagnosticNameA)
+		}
+	}
+}
+
+func TestUnregisterAllFromCategory(t *testing.T) {
+	t.Parallel()
+
+	d := NewDiagnosticService()
+
+	diags := []TestDiagnostic{
+		TestDiagnosticA,
+		TestDiagnosticB,
+		TestDiagnosticC,
+		TestDiagnosticD,
+		TestDiagnosticE,
+		TestDiagnosticF,
+	}
+
+	for _, diag := range diags {
+		if err := d.Register(diag.Name, diag.Description, diag.Category, diag.Run); err != nil {
+			t.Fatalf("failed to register diagnostic %s: %v", diag.Name, err)
+		}
+	}
+
+	if !d.Unregister(TestDiagnosticNameA, TestDiagnosticCategoryRed) {
+		t.Errorf("failed to unregister diagnostic %s/%s", TestDiagnosticCategoryRed, TestDiagnosticNameA)
+	}
+
+	if !d.Unregister(TestDiagnosticNameB, TestDiagnosticCategoryRed) {
+		t.Errorf("failed to unregister diagnostic %s/%s", TestDiagnosticCategoryRed, TestDiagnosticNameB)
+	}
+
+	results := d.RunCategory(context.Background(), TestDiagnosticCategoryRed)
+	if len(results) != 0 {
+		t.Fatalf("expected 0 results for category %s, got %d", TestDiagnosticCategoryRed, len(results))
+	}
+}
+
+func TestRunCategoryDiagnostics(t *testing.T) {
+	t.Parallel()
+
+	d := NewDiagnosticService()
+
+	diags := []TestDiagnostic{
+		TestDiagnosticA,
+		TestDiagnosticB,
+		TestDiagnosticC,
+		TestDiagnosticD,
+		TestDiagnosticE,
+		TestDiagnosticF,
+	}
+
+	for _, diag := range diags {
+		if err := d.Register(diag.Name, diag.Description, diag.Category, diag.Run); err != nil {
+			t.Fatalf("failed to register diagnostic %s: %v", diag.Name, err)
+		}
+	}
+
+	c := context.Background()
+	results := d.RunCategory(c, TestDiagnosticCategoryBlue)
+
+	if len(results) != 2 {
+		t.Fatalf("expected 2 results for category %s, got %d", TestDiagnosticCategoryBlue, len(results))
+	}
+
+	for _, result := range results {
+		if result.Category != TestDiagnosticCategoryBlue {
+			t.Errorf("expected category %s, got %s", TestDiagnosticCategoryBlue, result.Category)
+		}
+	}
+}
+
+func TestRunSingleDiagnostic(t *testing.T) {
+	t.Parallel()
+
+	d := NewDiagnosticService()
+
+	diags := []TestDiagnostic{
+		TestDiagnosticA,
+		TestDiagnosticB,
+		TestDiagnosticC,
+		TestDiagnosticD,
+		TestDiagnosticE,
+		TestDiagnosticF,
+	}
+
+	for _, diag := range diags {
+		if err := d.Register(diag.Name, diag.Description, diag.Category, diag.Run); err != nil {
+			t.Fatalf("failed to register diagnostic %s: %v", diag.Name, err)
+		}
+	}
+
+	c := context.Background()
+	result := d.RunDiagnostic(c, TestDiagnosticCategoryGreen, TestDiagnosticNameF)
+
+	if result == nil {
+		t.Fatalf("expected a result for diagnostic %s, got nil", TestDiagnosticNameF)
+	}
+
+	if result.Name != TestDiagnosticNameF {
+		t.Errorf("expected name %s, got %s", TestDiagnosticNameF, result.Name)
+	}
+
+	// Run category without name
+	result = d.RunDiagnostic(c, TestDiagnosticCategoryGreen, "not-a-valid-diagnostic-name")
+	if result != nil {
+		t.Fatalf("expected nil result for invalid diagnostic name, got %v", result)
+	}
+
+	// Run without category
+	result = d.RunDiagnostic(c, "not-a-valid-category", TestDiagnosticNameF)
+	if result != nil {
+		t.Fatalf("expected nil result for invalid category, got %v", result)
+	}
+
+}
+
+func isEqual(a TestDiagnostic, b Diagnostic) bool {
+	if a.Name != b.Name {
+		return false
+	}
+	if a.Description != b.Description {
+		return false
+	}
+	if a.Category != b.Category {
+		return false
+	}
+	return true
+}

+ 0 - 8
core/pkg/heartbeat/encoder.go

@@ -1,8 +0,0 @@
-package heartbeat
-
-import "github.com/opencost/opencost/core/pkg/exporter"
-
-// NewHeartbeatEncoder returns a JSON encoder used to encode Heartbeat events.
-func NewHeartbeatEncoder() exporter.Encoder[Heartbeat] {
-	return exporter.NewJSONEncoder[Heartbeat]()
-}

+ 3 - 2
core/pkg/heartbeat/controller.go → core/pkg/heartbeat/exporter/controller.go

@@ -1,13 +1,14 @@
-package heartbeat
+package exporter
 
 import (
 	"github.com/opencost/opencost/core/pkg/exporter"
+	"github.com/opencost/opencost/core/pkg/heartbeat"
 	"github.com/opencost/opencost/core/pkg/storage"
 )
 
 // NewHeartbeatExportController creates a new EventExportController for Heartbeat events.
 // A HeartbeatMetadataProvider can optionally be provided to append metadata to the Heartbeat payload.
-func NewHeartbeatExportController(clusterId string, store storage.Storage, provider HeartbeatMetadataProvider) *exporter.EventExportController[Heartbeat] {
+func NewHeartbeatExportController(clusterId string, store storage.Storage, provider HeartbeatMetadataProvider) *exporter.EventExportController[heartbeat.Heartbeat] {
 	return exporter.NewEventExportController(
 		NewHeartbeatSource(provider),
 		NewHeartbeatExporter(clusterId, store),

+ 11 - 0
core/pkg/heartbeat/exporter/encoder.go

@@ -0,0 +1,11 @@
+package exporter
+
+import (
+	"github.com/opencost/opencost/core/pkg/exporter"
+	"github.com/opencost/opencost/core/pkg/heartbeat"
+)
+
+// NewHeartbeatEncoder returns a JSON encoder used to encode Heartbeat events.
+func NewHeartbeatEncoder() exporter.Encoder[heartbeat.Heartbeat] {
+	return exporter.NewJSONEncoder[heartbeat.Heartbeat]()
+}

+ 5 - 4
core/pkg/heartbeat/exporter.go → core/pkg/heartbeat/exporter/exporter.go

@@ -1,22 +1,23 @@
-package heartbeat
+package exporter
 
 import (
 	"github.com/opencost/opencost/core/pkg/exporter"
 	"github.com/opencost/opencost/core/pkg/exporter/pathing"
+	"github.com/opencost/opencost/core/pkg/heartbeat"
 	"github.com/opencost/opencost/core/pkg/log"
 	"github.com/opencost/opencost/core/pkg/storage"
 )
 
 // NewHeartbeatExporter creates a new `StorageExporter[Heartbeat]` instance for exporting Heartbeat events.
-func NewHeartbeatExporter(clusterId string, storage storage.Storage) *exporter.StorageExporter[Heartbeat] {
-	pathing, err := pathing.NewEventStoragePathFormatter("", clusterId, HeartbeatEventName)
+func NewHeartbeatExporter(clusterId string, storage storage.Storage) *exporter.StorageExporter[heartbeat.Heartbeat] {
+	pathing, err := pathing.NewEventStoragePathFormatter("", clusterId, heartbeat.HeartbeatEventName)
 	if err != nil {
 		log.Errorf("failed to create pathing formatter: %v", err)
 		return nil
 	}
 
 	return exporter.NewStorageExporter(
-		HeartbeatEventName,
+		heartbeat.HeartbeatEventName,
 		pathing,
 		NewHeartbeatEncoder(),
 		storage,

+ 3 - 2
core/pkg/heartbeat/heartbeat_test.go → core/pkg/heartbeat/exporter/heartbeat_test.go

@@ -1,4 +1,4 @@
-package heartbeat
+package exporter
 
 import (
 	"encoding/json"
@@ -8,6 +8,7 @@ import (
 	"testing"
 	"time"
 
+	"github.com/opencost/opencost/core/pkg/heartbeat"
 	"github.com/opencost/opencost/core/pkg/storage"
 	"github.com/opencost/opencost/core/pkg/util/sliceutil"
 )
@@ -59,7 +60,7 @@ func TestHeartbeatExporter(t *testing.T) {
 			t.Fatalf("Failed to read file %s: %v", fpath, err)
 		}
 
-		hb := new(Heartbeat)
+		hb := new(heartbeat.Heartbeat)
 		if err := json.Unmarshal(data, hb); err != nil {
 			t.Fatalf("Failed to unmarshal heartbeat data: %v", err)
 		}

+ 5 - 4
core/pkg/heartbeat/source.go → core/pkg/heartbeat/exporter/source.go

@@ -1,10 +1,11 @@
-package heartbeat
+package exporter
 
 import (
 	"time"
 
 	"github.com/google/uuid"
 	"github.com/opencost/opencost/core/pkg/clusters"
+	"github.com/opencost/opencost/core/pkg/heartbeat"
 	"github.com/opencost/opencost/core/pkg/log"
 	"github.com/opencost/opencost/core/pkg/version"
 )
@@ -60,7 +61,7 @@ func NewHeartbeatSource(provider HeartbeatMetadataProvider) *HeartbeatSource {
 }
 
 // Make creates a new `Heartbeat` instance with the provided current time.
-func (h *HeartbeatSource) Make(t time.Time) *Heartbeat {
+func (h *HeartbeatSource) Make(t time.Time) *heartbeat.Heartbeat {
 	uid, err := uuid.NewV7()
 	if err != nil {
 		log.Warnf("failed to generate v7 UUID, replacing with UUID v4: %s", err)
@@ -76,9 +77,9 @@ func (h *HeartbeatSource) Make(t time.Time) *Heartbeat {
 		metadata = h.metadataProvider.GetMetadata()
 	}
 
-	return NewHeartbeat(id, t, uptime, v, metadata)
+	return heartbeat.NewHeartbeat(id, t, uptime, v, metadata)
 }
 
 func (h *HeartbeatSource) Name() string {
-	return HeartbeatEventName + "-source"
+	return heartbeat.HeartbeatEventName + "-source"
 }

+ 5 - 0
core/pkg/source/datasource.go

@@ -5,6 +5,7 @@ import (
 
 	"github.com/julienschmidt/httprouter"
 	"github.com/opencost/opencost/core/pkg/clusters"
+	"github.com/opencost/opencost/core/pkg/diagnostics"
 )
 
 type MetricsQuerier interface {
@@ -122,6 +123,10 @@ type OpenCostDataSource interface {
 	// RegisterEndPoints registers any custom endpoints that can be used for diagnostics or debug purposes.
 	RegisterEndPoints(router *httprouter.Router)
 
+	// RegisterDiagnostics registers any custom data source diagnostics with the `DiagnosticService` that can
+	// be used to report externally.
+	RegisterDiagnostics(diagService diagnostics.DiagnosticService)
+
 	// Metrics returns a MetricsQuerier that can be used to query historical metrics data from the data source.
 	Metrics() MetricsQuerier
 

+ 52 - 0
core/pkg/util/iterutil/iterutil.go

@@ -0,0 +1,52 @@
+package iterutil
+
+import "iter"
+
+// Combine takes two iterator sequences and combines them into a single iterator sequence of pairs.
+// This iterator will only yield as many values as the smallest of the two sequences.
+func Combine[T any, U any](seq1 iter.Seq[T], seq2 iter.Seq[U]) iter.Seq2[T, U] {
+	return func(yield func(T, U) bool) {
+		n1, s1 := iter.Pull(seq1)
+		n2, s2 := iter.Pull(seq2)
+
+		defer s1()
+		defer s2()
+
+		for {
+			first, fOk := n1()
+			if !fOk {
+				return
+			}
+
+			second, sOk := n2()
+			if !sOk {
+				return
+			}
+
+			if !yield(first, second) {
+				return
+			}
+		}
+	}
+}
+
+// Concat takes multiple iterator sequences and concatenates them into a single iterator sequence.
+// This iterator will yield all values from the first sequence, followed by all values from the second
+// sequence, and so on.
+func Concat[T any](seqs ...iter.Seq[T]) iter.Seq[T] {
+	return func(yield func(T) bool) {
+		for _, seq := range seqs {
+			func() {
+				n, s := iter.Pull(seq)
+				defer s()
+
+				for {
+					v, ok := n()
+					if !ok || !yield(v) {
+						return
+					}
+				}
+			}()
+		}
+	}
+}

+ 140 - 0
core/pkg/util/iterutil/iterutil_test.go

@@ -0,0 +1,140 @@
+package iterutil
+
+import (
+	"iter"
+	"testing"
+)
+
+// toSeq maintains order in the sequence
+func toSeq[T any](s []T) iter.Seq[T] {
+	return func(yield func(T) bool) {
+		for _, v := range s {
+			if !yield(v) {
+				return
+			}
+		}
+	}
+}
+
+type pair struct {
+	first  int
+	second string
+}
+
+func TestCombine(t *testing.T) {
+	type testCase struct {
+		name     string
+		input1   []int
+		input2   []string
+		expected []pair
+	}
+
+	tests := []testCase{
+		{
+			name:     "empty slices",
+			input1:   []int{},
+			input2:   []string{},
+			expected: []pair{},
+		},
+		{
+			name:   "different string length slice",
+			input1: []int{1, 2, 3},
+			input2: []string{"a", "b"},
+			expected: []pair{
+				{1, "a"},
+				{2, "b"},
+			},
+		},
+		{
+			name:   "different int length slice",
+			input1: []int{1, 2},
+			input2: []string{"a", "b", "c"},
+			expected: []pair{
+				{1, "a"},
+				{2, "b"},
+			},
+		},
+		{
+			name:   "same length slices",
+			input1: []int{1, 2, 3},
+			input2: []string{"a", "b", "c"},
+			expected: []pair{
+				{1, "a"},
+				{2, "b"},
+				{3, "c"},
+			},
+		},
+	}
+
+	for _, test := range tests {
+		t.Run(test.name, func(t *testing.T) {
+			result := Combine(toSeq(test.input1), toSeq(test.input2))
+			for f, s := range result {
+				if !isPairIn(test.expected, pair{f, s}) {
+					t.Errorf("expected %v, got %v", test.expected, pair{f, s})
+				}
+			}
+		})
+	}
+}
+
+func TestConcat(t *testing.T) {
+	type testCase struct {
+		name     string
+		input1   []int
+		input2   []int
+		expected []int
+	}
+
+	tests := []testCase{
+		{
+			name:     "empty slices",
+			input1:   []int{},
+			input2:   []int{},
+			expected: []int{},
+		},
+		{
+			name:     "non-empty first slice",
+			input1:   []int{1, 2, 3},
+			input2:   []int{},
+			expected: []int{1, 2, 3},
+		},
+		{
+			name:     "non-empty second slice",
+			input1:   []int{},
+			input2:   []int{4, 5, 6},
+			expected: []int{4, 5, 6},
+		},
+		{
+			name:     "non-empty both slices",
+			input1:   []int{1, 2, 3},
+			input2:   []int{4, 5, 6},
+			expected: []int{1, 2, 3, 4, 5, 6},
+		},
+	}
+
+	for _, test := range tests {
+		t.Run(test.name, func(t *testing.T) {
+			results := Concat(toSeq(test.input1), toSeq(test.input2))
+
+			// it is safe to compare this way due to the way a slice sequence iterator
+			// obeys the ordering of the slice
+			index := 0
+			for result := range results {
+				if result != test.expected[index] {
+					t.Errorf("expected %v, got %v", test.expected[index], result)
+				}
+				index++
+			}
+		})
+	}
+}
+
+func isPairIn(pairs []pair, p pair) bool {
+	for _, pair := range pairs {
+		if pair.first == p.first && pair.second == p.second {
+			return true
+		}
+	}
+	return false
+}

+ 28 - 0
core/pkg/util/maputil/maputil.go

@@ -1,5 +1,7 @@
 package maputil
 
+import "iter"
+
 // Map applies a transformation function to each value within a map to get a new map containing the
 // transformed values.
 func Map[K comparable, V any, T any](m map[K]V, transform func(V) T) map[K]T {
@@ -9,3 +11,29 @@ func Map[K comparable, V any, T any](m map[K]V, transform func(V) T) map[K]T {
 	}
 	return result
 }
+
+// Flatten returns an iterator that will iterate over a nested map.
+func Flatten[Map ~map[T]Inner, Inner ~map[T]U, T comparable, U any](m Map) iter.Seq[U] {
+	return func(yield func(U) bool) {
+		for _, inner := range m {
+			for _, value := range inner {
+				if !yield(value) {
+					return
+				}
+			}
+		}
+	}
+}
+
+// FlatMap returns an iterator that will iterate over a nested map, and apply a transformation to a different type.
+func FlatMap[Map ~map[T]Inner, Inner ~map[T]U, T comparable, U any, V any](m Map, transform func(U) V) iter.Seq[V] {
+	return func(yield func(V) bool) {
+		for _, inner := range m {
+			for _, value := range inner {
+				if !yield(transform(value)) {
+					return
+				}
+			}
+		}
+	}
+}

+ 129 - 0
core/pkg/util/maputil/maputil_test.go

@@ -0,0 +1,129 @@
+package maputil
+
+import (
+	"testing"
+)
+
+type set[T comparable] struct {
+	m map[T]struct{}
+}
+
+func newSet[T comparable](values ...T) *set[T] {
+	s := &set[T]{
+		m: make(map[T]struct{}, len(values)),
+	}
+
+	for _, v := range values {
+		s.m[v] = struct{}{}
+	}
+
+	return s
+}
+
+func (s *set[T]) contains(value T) bool {
+	_, ok := s.m[value]
+	return ok
+}
+
+func (s *set[T]) remove(value T) {
+	delete(s.m, value)
+}
+
+func TestFlatten(t *testing.T) {
+	m := map[string]map[string]int{
+		"A": {
+			"b": 1,
+			"c": 2,
+			"d": 3,
+		},
+		"B": {
+			"e": 4,
+			"f": 5,
+		},
+		"C": {
+			"g": 6,
+			"h": 7,
+			"i": 8,
+			"j": 9,
+		},
+	}
+
+	expected := newSet(1, 2, 3, 4, 5, 6, 7, 8, 9)
+
+	flattened := Flatten(m)
+	for value := range flattened {
+		if !expected.contains(value) {
+			t.Errorf("expected values did not contain the value: %d", value)
+		}
+
+		expected.remove(value)
+	}
+}
+
+func TestAliasedMapFlatten(t *testing.T) {
+	type IntMap map[string]int
+	type StringIntMap map[string]IntMap
+
+	m := StringIntMap(map[string]IntMap{
+		"A": IntMap(map[string]int{
+			"b": 1,
+			"c": 2,
+			"d": 3,
+		}),
+		"B": IntMap(map[string]int{
+			"e": 4,
+			"f": 5,
+		}),
+		"C": IntMap(map[string]int{
+			"g": 6,
+			"h": 7,
+			"i": 8,
+			"j": 9,
+		}),
+	})
+
+	expected := newSet(1, 2, 3, 4, 5, 6, 7, 8, 9)
+
+	flattened := Flatten(m)
+	for value := range flattened {
+		if !expected.contains(value) {
+			t.Errorf("expected values did not contain the value: %d", value)
+		}
+
+		expected.remove(value)
+	}
+}
+
+func TestFlatMap(t *testing.T) {
+	m := map[string]map[string]int{
+		"A": {
+			"b": 1,
+			"c": 2,
+			"d": 3,
+		},
+		"B": {
+			"e": 4,
+			"f": 5,
+		},
+		"C": {
+			"g": 6,
+			"h": 7,
+			"i": 8,
+			"j": 9,
+		},
+	}
+
+	expected := newSet(2, 4, 6, 8, 10, 12, 14, 16, 18)
+
+	flatMap := FlatMap(m, func(value int) int {
+		return value * 2
+	})
+
+	for value := range flatMap {
+		if !expected.contains(value) {
+			t.Errorf("expected values did not contain the value: %d", value)
+		}
+
+		expected.remove(value)
+	}
+}

+ 34 - 0
core/pkg/util/sliceutil/sliceutil.go

@@ -1,5 +1,10 @@
 package sliceutil
 
+import (
+	"iter"
+	"slices"
+)
+
 // Map accepts a slice of T and applies a transformation function to each index of a
 // slice, which are inserted into a new slice of type U.
 func Map[T any, U any](s []T, transform func(T) U) []U {
@@ -9,3 +14,32 @@ func Map[T any, U any](s []T, transform func(T) U) []U {
 	}
 	return result
 }
+
+// AsSeq converts a slice of T into an iterator sequence only yielding the values. This should be used
+// to convert a slice into an iterator sequence for APIs that accept iterators only.
+func AsSeq[T any](s []T) iter.Seq[T] {
+	return func(yield func(T) bool) {
+		for _, v := range s {
+			if !yield(v) {
+				return
+			}
+		}
+	}
+}
+
+// AsSeq2 converts a slice of T into an iterator sequence yielding the index and value. This should be used
+// to convert a slice into an iterator sequence for APIs that accept iterators only.
+func AsSeq2[T any](s []T) iter.Seq2[int, T] {
+	return func(yield func(int, T) bool) {
+		for i, v := range s {
+			if !yield(i, v) {
+				return
+			}
+		}
+	}
+}
+
+// SeqToSlice converts an iterator sequence into a slice of T.
+func SeqToSlice[T any](s iter.Seq[T]) []T {
+	return slices.Collect(s)
+}

+ 158 - 0
core/pkg/util/sliceutil/sliceutil_test.go

@@ -0,0 +1,158 @@
+package sliceutil
+
+import (
+	"maps"
+	"slices"
+	"testing"
+)
+
+func TestSliceMap(t *testing.T) {
+	tests := []struct {
+		name     string
+		input    []int
+		expected []int
+	}{
+		{
+			name:     "empty slice",
+			input:    []int{},
+			expected: []int{},
+		},
+		{
+			name:     "single element",
+			input:    []int{1},
+			expected: []int{2},
+		},
+		{
+			name:     "multiple elements",
+			input:    []int{1, 2, 3},
+			expected: []int{2, 4, 6},
+		},
+	}
+
+	for _, test := range tests {
+		t.Run(test.name, func(t *testing.T) {
+			result := Map(test.input, func(i int) int { return i * 2 })
+			for i, v := range result {
+				if v != test.expected[i] {
+					t.Errorf("expected %v, got %v", test.expected[i], v)
+				}
+			}
+		})
+	}
+}
+
+type seqTestCase[T comparable] struct {
+	name  string
+	input []T
+}
+
+func runSeqTest[T comparable](test seqTestCase[T]) func(*testing.T) {
+	return func(t *testing.T) {
+		result := AsSeq(test.input)
+
+		i := 0
+		for v := range result {
+			if v != test.input[i] {
+				t.Errorf("expected %v, got %v", test.input[i], v)
+			}
+			i++
+		}
+	}
+}
+
+func runSeqTests[T comparable](t *testing.T, testCases []seqTestCase[T]) {
+	t.Helper()
+
+	for _, test := range testCases {
+		t.Run(test.name, runSeqTest(test))
+	}
+}
+
+func TestToSeq(t *testing.T) {
+	intTests := []seqTestCase[int]{
+		{
+			name:  "int empty slice",
+			input: []int{},
+		},
+		{
+			name:  "int single element",
+			input: []int{1},
+		},
+		{
+			name:  "int multiple elements",
+			input: []int{1, 2, 3},
+		},
+	}
+
+	floatTests := []seqTestCase[float64]{
+		{
+			name:  "float64 empty slice",
+			input: []float64{},
+		},
+		{
+			name:  "float64 single element",
+			input: []float64{1.54},
+		},
+		{
+			name:  "float64 multiple elements",
+			input: []float64{52.32, 23.12, 54.123},
+		},
+	}
+
+	stringTests := []seqTestCase[string]{
+		{
+			name:  "string empty slice",
+			input: []string{},
+		},
+		{
+			name:  "single single element",
+			input: []string{"foo"},
+		},
+		{
+			name:  "string multiple elements",
+			input: []string{"foo", "bar", "baz"},
+		},
+	}
+
+	runSeqTests(t, intTests)
+	runSeqTests(t, floatTests)
+	runSeqTests(t, stringTests)
+}
+
+func TestSeqToSlice(t *testing.T) {
+	keys := []string{
+		"a", "b", "c", "d", "e", "f", "g",
+	}
+	m := make(map[string]string, len(keys))
+	for _, k := range keys {
+		m[k] = "value-" + k
+	}
+
+	seqKeys := maps.Keys(m)
+	seqValues := maps.Values(m)
+
+	// These do *NOT* align on indexes!
+	keySlice := SeqToSlice(seqKeys)
+	valueSlice := SeqToSlice(seqValues)
+
+	for _, k := range keySlice {
+		if !slices.Contains(keys, k) {
+			t.Errorf("expected %v to be in %v", k, keys)
+		}
+	}
+
+	for _, v := range valueSlice {
+		if !mapContainsValue(m, v) {
+			t.Errorf("expected %v to be in %v", v, m)
+		}
+	}
+}
+
+func mapContainsValue(m map[string]string, value string) bool {
+	for _, v := range m {
+		if v == value {
+			return true
+		}
+	}
+	return false
+}

+ 17 - 2
core/pkg/util/worker/worker.go

@@ -2,11 +2,13 @@ package worker
 
 import (
 	"fmt"
+	"iter"
 	"runtime"
 	"sync"
 	"sync/atomic"
 
 	"github.com/opencost/opencost/core/pkg/collections"
+	"github.com/opencost/opencost/core/pkg/util/sliceutil"
 )
 
 // Runner is a function type that takes a single input and returns nothing.
@@ -317,6 +319,12 @@ func ConcurrentCollect[T any, U any](workerFunc Worker[T, *U], inputs []T) []*U
 // ConcurrentCollectWith runs a pool of workers of the specified size which concurrently call the provided worker
 // func on each input to get a result slice of non-nil outputs. Size inputs < 1 will automatically be set to 1.
 func ConcurrentCollectWith[T any, U any](size int, workerFunc Worker[T, *U], inputs []T) []*U {
+	return ConcurrentIterCollect(size, workerFunc, sliceutil.AsSeq(inputs))
+}
+
+// ConcurrentIterCollect runs a pool of workers of the specified size which concurrently call the provided worker
+// func on each input to get a result slice of non-nil outputs. Size inputs < 1 will automatically be set to 1.
+func ConcurrentIterCollect[T any, U any](size int, workerFunc Worker[T, *U], inputs iter.Seq[T]) []*U {
 	if size < 1 {
 		size = 1
 	}
@@ -325,7 +333,7 @@ func ConcurrentCollectWith[T any, U any](size int, workerFunc Worker[T, *U], inp
 	defer workerPool.Shutdown()
 
 	workGroup := NewCollectionGroup(workerPool)
-	for _, input := range inputs {
+	for input := range inputs {
 		workGroup.Push(input)
 	}
 
@@ -342,6 +350,12 @@ func ConcurrentRun[T any](runner Runner[T], inputs []T) {
 // ConcurrentRunWith runs a pool of runners of the specified size which concurrently call the provided runner
 // func on each input. Size inputs < 1 will automatically be set to 1.
 func ConcurrentRunWith[T any](size int, runner Runner[T], inputs []T) {
+	ConcurrentIterRunWith(size, runner, sliceutil.AsSeq(inputs))
+}
+
+// ConcurrentIterRunWith runs a pool of runners of the specified size which concurrently call the provided runner
+// func on each input. Size inputs < 1 will automatically be set to 1.
+func ConcurrentIterRunWith[T any](size int, runner Runner[T], inputs iter.Seq[T]) {
 	if size < 1 {
 		size = 1
 	}
@@ -350,9 +364,10 @@ func ConcurrentRunWith[T any](size int, runner Runner[T], inputs []T) {
 		runner(input)
 		return
 	})
+	defer workerPool.Shutdown()
 
 	workGroup := NewNoResultGroup(workerPool)
-	for _, input := range inputs {
+	for input := range inputs {
 		workGroup.Push(input)
 	}
 

+ 1 - 1
go.mod

@@ -198,4 +198,4 @@ require (
 	sigs.k8s.io/structured-merge-diff/v4 v4.4.2 // indirect
 )
 
-go 1.23.5
+go 1.24.2

+ 1 - 1
modules/collector-source/go.mod

@@ -2,7 +2,7 @@ module github.com/opencost/opencost/modules/collector-source
 
 replace github.com/opencost/opencost/core => ./../../core
 
-go 1.23.0
+go 1.24.2
 
 require (
 	github.com/opencost/opencost/core v0.0.0-00010101000000-000000000000

+ 2 - 1
modules/prometheus-source/go.mod

@@ -1,6 +1,6 @@
 module github.com/opencost/opencost/modules/prometheus-source
 
-go 1.23.0
+go 1.24.2
 
 replace (
 	github.com/golang/lint => golang.org/x/lint v0.0.0-20180702182130-06c8688daad7
@@ -24,6 +24,7 @@ require (
 	github.com/goccy/go-json v0.10.5 // indirect
 	github.com/gogo/protobuf v1.3.2 // indirect
 	github.com/google/gofuzz v1.2.0 // indirect
+	github.com/google/uuid v1.6.0 // indirect
 	github.com/hashicorp/hcl v1.0.0 // indirect
 	github.com/json-iterator/go v1.1.12 // indirect
 	github.com/magiconair/properties v1.8.5 // indirect

+ 25 - 0
modules/prometheus-source/pkg/prom/datasource.go

@@ -12,6 +12,7 @@ import (
 	"github.com/opencost/opencost/modules/prometheus-source/pkg/env"
 
 	"github.com/opencost/opencost/core/pkg/clusters"
+	"github.com/opencost/opencost/core/pkg/diagnostics"
 	"github.com/opencost/opencost/core/pkg/log"
 	"github.com/opencost/opencost/core/pkg/protocol"
 	"github.com/opencost/opencost/core/pkg/source"
@@ -552,6 +553,30 @@ func (pds *PrometheusDataSource) RegisterEndPoints(router *httprouter.Router) {
 	router.GET("/diagnostics/prometheusMetrics", pds.prometheusMetrics)
 }
 
+// RegisterDiagnostics registers any custom data source diagnostics with the `DiagnosticService` that can
+// be used to report externally.
+func (pds *PrometheusDataSource) RegisterDiagnostics(diagService diagnostics.DiagnosticService) {
+	const PrometheusDiagnosticCategory = "prometheus"
+
+	for _, dd := range diagnosticDefinitions {
+		err := diagService.Register(dd.ID, dd.Description, PrometheusDiagnosticCategory, func(ctx context.Context) (map[string]any, error) {
+			promDiag := dd.NewDiagnostic(pds.promConfig.ClusterFilter, "")
+
+			promContext := pds.promContexts.NewNamedContext(DiagnosticContextName)
+			e := promDiag.executePrometheusDiagnosticQuery(promContext)
+			if e != nil {
+				return nil, fmt.Errorf("failed to execute prometheus diagnostic: %s - %w", dd.ID, e)
+			}
+
+			return promDiag.AsMap(), nil
+		})
+
+		if err != nil {
+			log.Warnf("Failed to register prometheus diagnostic %s: %s", dd.ID, err.Error())
+		}
+	}
+}
+
 func (pds *PrometheusDataSource) RefreshInterval() time.Duration {
 	return pds.promConfig.ScrapeInterval
 }

+ 10 - 0
modules/prometheus-source/pkg/prom/diagnostics.go

@@ -302,3 +302,13 @@ func (pd *PrometheusDiagnostic) executePrometheusDiagnosticQuery(ctx *Context) e
 	pd.Passed = len(result) == 0
 	return nil
 }
+
+func (pd *PrometheusDiagnostic) AsMap() map[string]any {
+	return map[string]any{
+		"query":   pd.Query,
+		"label":   pd.Label,
+		"docLink": pd.DocLink,
+		"result":  pd.Result,
+		"passed":  pd.Passed,
+	}
+}