Sfoglia il codice sorgente

[Collector] Scraper Diagnostics (#3337)

Signed-off-by: Matt Bolt <mbolt35@gmail.com>
Co-authored-by: Ishaan Mittal <ishaanmittal123@gmail.com>
Matt Bolt 8 mesi fa
parent
commit
a9152cb0b7

+ 0 - 2
core/go.sum

@@ -201,8 +201,6 @@ github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PU
 github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
 github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
 github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
 github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
 github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
 github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
-github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17wHk=
-github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk=
 github.com/golang-jwt/jwt/v5 v5.3.0 h1:pv4AsKCKKZuqlgs5sUmn4x8UlGa0kEVt/puTpKx9vvo=
 github.com/golang-jwt/jwt/v5 v5.3.0 h1:pv4AsKCKKZuqlgs5sUmn4x8UlGa0kEVt/puTpKx9vvo=
 github.com/golang-jwt/jwt/v5 v5.3.0/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE=
 github.com/golang-jwt/jwt/v5 v5.3.0/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE=
 github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
 github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=

+ 167 - 0
core/pkg/collections/idnamemap.go

@@ -0,0 +1,167 @@
+package collections
+
+import (
+	"errors"
+	"fmt"
+	"iter"
+)
+
+var (
+	// ErrEmptyID is returned when the provided entry into an IdNameMap returns an empty string
+	// for ID
+	ErrEmptyID error = errors.New("id must be non-empty")
+
+	// ErrEmptyName is returned when the provided entry into an IdNameMap returns an empty string
+	// for Name
+	ErrEmptyName error = errors.New("name must be non-empty")
+)
+
+// WithIdName is a generic constraint required for elements added to a `IdNameMap`
+type WithIdName interface {
+	Id() string
+	Name() string
+}
+
+// IdNameMap contains two maps which alias the same element by id and name. It provides O(1) lookups
+// by identifier or by name, both a required constraint on the `T` type.
+type IdNameMap[T WithIdName] struct {
+	m map[string]T
+	r map[string]T
+}
+
+func NewIdNameMap[T WithIdName]() *IdNameMap[T] {
+	return &IdNameMap[T]{
+		m: make(map[string]T),
+		r: make(map[string]T),
+	}
+}
+
+// Insert inserts a `T` instance into the map successfully under the following requirements:
+//
+// Insertion of new Entry:
+//  1. IDs and Name for the `T` instance must be non-empty.
+//  2. ID and Name must not partially overlap with an existing entry. This would happen if
+//     you attempted to insert a `T` with a unique ID, but a conflicting Name. Likewise,
+//     a unique name, but conflicting ID will also fail.
+//
+// Replacing an existing Entry:
+//  1. If there exists an old entry with the id of the new entry, then the name for the new
+//     entry must also point to the old entry.
+//  2. If there exists an old entry with the name of the new entry, then the id for the new
+//     entry must also point to the old entry.
+//
+// To summarize, you can replace an existing item as long as the id/name lookups for the entry
+// being replaced are the same.
+func (rm *IdNameMap[T]) Insert(item T) error {
+	id := item.Id()
+	if id == "" {
+		return ErrEmptyID
+	}
+
+	name := item.Name()
+	if name == "" {
+		return ErrEmptyName
+	}
+
+	oldForId, idExists := rm.m[id]
+	oldForName, nameExists := rm.r[name]
+
+	// check partial insertion of id
+	if idExists && !nameExists {
+		return fmt.Errorf(
+			"insertion of new entry: [id: %s, name: %s] would partially overwrite existing entry: [id: %s, name: %s]",
+			id,
+			name,
+			oldForId.Id(),
+			oldForId.Name(),
+		)
+	}
+
+	// check partial insertion of name
+	if !idExists && nameExists {
+		return fmt.Errorf(
+			"insertion of new entry: [id: %s, name: %s] would partially overwrite existing entry: [id: %s, name: %s]",
+			id,
+			name,
+			oldForName.Id(),
+			oldForName.Name(),
+		)
+	}
+
+	// if we are overwriting, check to ensure that the entities from each map have identical mappings
+	if idExists && nameExists {
+		if oldForId.Id() != oldForName.Id() || oldForId.Name() != oldForName.Name() {
+			return fmt.Errorf(
+				"attempting to overwrite entries [id: %s, name: %s] and [id: %s, name: %s] with new entry [id: %s, name: %s] creating a multi-entry conflict",
+				oldForId.Id(),
+				oldForId.Name(),
+				oldForName.Id(),
+				oldForName.Name(),
+				id,
+				name,
+			)
+		}
+	}
+
+	rm.m[id] = item
+	rm.r[name] = item
+
+	return nil
+}
+
+func (rm *IdNameMap[T]) ById(id string) (T, bool) {
+	item, ok := rm.m[id]
+	return item, ok
+}
+
+func (rm *IdNameMap[T]) ByName(name string) (T, bool) {
+	item, ok := rm.r[name]
+	return item, ok
+}
+
+func (rm *IdNameMap[T]) RemoveById(id string) bool {
+	item, ok := rm.ById(id)
+	if !ok {
+		return false
+	}
+
+	name := item.Name()
+	delete(rm.m, id)
+	delete(rm.r, name)
+
+	return true
+}
+
+func (rm *IdNameMap[T]) RemoveByName(name string) bool {
+	item, ok := rm.ByName(name)
+	if !ok {
+		return false
+	}
+
+	id := item.Id()
+	delete(rm.m, id)
+	delete(rm.r, name)
+
+	return true
+}
+
+func (rm *IdNameMap[T]) Keys() iter.Seq2[string, string] {
+	return func(yield func(string, string) bool) {
+		for id, value := range rm.m {
+			name := value.Name()
+			if !yield(id, name) {
+				return
+			}
+		}
+	}
+}
+
+func (rm *IdNameMap[T]) Values() iter.Seq[T] {
+	return func(yield func(T) bool) {
+		for _, value := range rm.m {
+			if !yield(value) {
+				return
+			}
+		}
+	}
+}

+ 462 - 0
core/pkg/collections/idnamemap_test.go

@@ -0,0 +1,462 @@
+package collections
+
+import (
+	"errors"
+	"testing"
+)
+
+type testItem struct {
+	id    string
+	name  string
+	value int
+}
+
+func (t testItem) Id() string {
+	return t.id
+}
+
+func (t testItem) Name() string {
+	return t.name
+}
+
+func TestNewIdNameMap(t *testing.T) {
+	m := NewIdNameMap[testItem]()
+	if m == nil {
+		t.Fatal("NewIdNameMap returned nil")
+	}
+	if m.m == nil {
+		t.Fatal("internal id map is nil")
+	}
+	if m.r == nil {
+		t.Fatal("internal name map is nil")
+	}
+}
+
+func TestIdNameMap_Insert(t *testing.T) {
+	m := NewIdNameMap[testItem]()
+	item := testItem{id: "id1", name: "name1"}
+
+	m.Insert(item)
+
+	// Verify item can be retrieved by id
+	retrieved, ok := m.ById("id1")
+	if !ok {
+		t.Fatal("item not found by id after insert")
+	}
+	if retrieved.Id() != "id1" || retrieved.Name() != "name1" {
+		t.Errorf("retrieved item mismatch: got %+v, want %+v", retrieved, item)
+	}
+
+	// Verify item can be retrieved by name
+	retrieved, ok = m.ByName("name1")
+	if !ok {
+		t.Fatal("item not found by name after insert")
+	}
+	if retrieved.Id() != "id1" || retrieved.Name() != "name1" {
+		t.Errorf("retrieved item mismatch: got %+v, want %+v", retrieved, item)
+	}
+}
+
+func TestIdNameMap_InsertEmptyIdAndName(t *testing.T) {
+	m := NewIdNameMap[testItem]()
+
+	item1 := testItem{id: "", name: "name1"}
+	item2 := testItem{id: "id1", name: ""}
+
+	err := m.Insert(item1)
+	if err == nil {
+		t.Fatalf("Expected insertion failure, but succeeded!")
+	}
+	if !errors.Is(err, ErrEmptyID) {
+		t.Fatalf("Expected ErrEmptyID, but instead got: %s", err.Error())
+	}
+
+	err = m.Insert(item2)
+	if err == nil {
+		t.Fatalf("Expected insertion failure, but succeeded!")
+	}
+	if !errors.Is(err, ErrEmptyName) {
+		t.Fatalf("Expected ErrEmptyName, but instead got: %s", err.Error())
+	}
+}
+
+func TestIdNameMap_InsertDuplicateId(t *testing.T) {
+	m := NewIdNameMap[testItem]()
+	item1 := testItem{id: "id1", name: "name1"}
+	item2 := testItem{id: "id1", name: "name2"}
+	item3 := testItem{id: "id1", name: "name1", value: 10}
+
+	err := m.Insert(item1)
+	if err != nil {
+		t.Fatalf("%s", err.Error())
+		return
+	}
+
+	// should fail
+	err = m.Insert(item2)
+	if err == nil {
+		t.Fatalf("Expected insertion to fail, but it succeeded!")
+	} else {
+		t.Logf("%s", err.Error())
+	}
+
+	err = m.Insert(item3)
+	if err != nil {
+		t.Fatalf("%s", err.Error())
+	}
+
+	// Verify second item overwrote the first
+	retrieved, ok := m.ById("id1")
+	if !ok {
+		t.Fatal("item not found by id")
+	}
+	if retrieved.Name() != "name1" {
+		t.Errorf("expected name1, got %s", retrieved.Name())
+	}
+
+	// Failed name should not exist
+	_, ok = m.ByName("name2")
+	if ok {
+		t.Error("old name should not exist after overwrite")
+	}
+
+	// Second name should exist
+	retrieved, ok = m.ByName("name1")
+	if !ok {
+		t.Fatal("new name not found")
+	}
+	if retrieved.Id() != "id1" {
+		t.Errorf("expected id1, got %s", retrieved.Id())
+	}
+	if retrieved.value != 10 {
+		t.Errorf("expected 10, got: %d", retrieved.value)
+	}
+}
+
+func TestIdNameMap_InsertDuplicateName(t *testing.T) {
+	m := NewIdNameMap[testItem]()
+	item1 := testItem{id: "id1", name: "name1"}
+	item2 := testItem{id: "id2", name: "name1"}
+	item3 := testItem{id: "id1", name: "name1", value: 10}
+
+	err := m.Insert(item1)
+	if err != nil {
+		t.Fatalf("%s", err.Error())
+		return
+	}
+
+	// expect error
+	err = m.Insert(item2)
+	if err == nil {
+		t.Fatalf("Expected insertion to fail, but it succeeded!")
+	} else {
+		t.Log(err)
+	}
+
+	// overwrite
+	err = m.Insert(item3)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// Verify second item overwrote the first by name
+	retrieved, ok := m.ByName("name1")
+	if !ok {
+		t.Fatal("item not found by name")
+	}
+	if retrieved.Id() != "id1" {
+		t.Errorf("expected id1, got %s", retrieved.Id())
+	}
+
+	// id2 shouldn't exist
+	_, ok = m.ById("id2")
+	if ok {
+		t.Error("id2 failed to insert, so should not exist")
+	}
+
+	retrieved, ok = m.ById("id1")
+	if !ok {
+		t.Fatal("id1 not found")
+	}
+	if retrieved.Name() != "name1" {
+		t.Errorf("expected name1, got %s", retrieved.Name())
+	}
+	if retrieved.value != 10 {
+		t.Errorf("expected value: 10, got %d", retrieved.value)
+	}
+}
+
+func TestIdNameMap_InsertTrickyPartialOverwrites(t *testing.T) {
+	m := NewIdNameMap[testItem]()
+
+	// 2 unique items
+	item1 := testItem{id: "id1", name: "foo"}
+	item2 := testItem{id: "id2", name: "bar"}
+
+	// item overlaps id from previous entry, and name from another previous entry
+	item3 := testItem{id: "id1", name: "bar"}
+
+	err := m.Insert(item1)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	err = m.Insert(item2)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	err = m.Insert(item3)
+	if err == nil {
+		t.Fatalf("Expected to fail, but insert succeeded!")
+	} else {
+		t.Log(err)
+	}
+
+	if !m.RemoveById("id1") {
+		t.Fatalf("Expected to remove entry with id: 'id1', but failed")
+	}
+
+	// this will _still_ be a partial insert, so expect failure
+	err = m.Insert(item3)
+	if err == nil {
+		t.Fatalf("Expected to fail, but insert succeeded!")
+	} else {
+		t.Log(err)
+	}
+
+	if !m.RemoveByName("bar") {
+		t.Fatalf("Expected to remove entry with name: 'bar', but failed")
+	}
+
+	// this should now succeed!
+	err = m.Insert(item3)
+	if err != nil {
+		t.Fatal(err)
+	}
+}
+
+func TestIdNameMap_ByIdNotFound(t *testing.T) {
+	m := NewIdNameMap[testItem]()
+
+	_, ok := m.ById("nonexistent")
+	if ok {
+		t.Error("expected false for nonexistent id")
+	}
+}
+
+func TestIdNameMap_ByNameNotFound(t *testing.T) {
+	m := NewIdNameMap[testItem]()
+
+	_, ok := m.ByName("nonexistent")
+	if ok {
+		t.Error("expected false for nonexistent name")
+	}
+}
+
+func TestIdNameMap_RemoveById(t *testing.T) {
+	m := NewIdNameMap[testItem]()
+	item := testItem{id: "id1", name: "name1"}
+
+	m.Insert(item)
+
+	// Verify item exists
+	_, ok := m.ById("id1")
+	if !ok {
+		t.Fatal("item should exist before removal")
+	}
+	_, ok = m.ByName("name1")
+	if !ok {
+		t.Fatal("item should exist before removal")
+	}
+
+	// Remove by id
+	removed := m.RemoveById("id1")
+	if !removed {
+		t.Error("RemoveById should return true for existing item")
+	}
+
+	// Verify item no longer exists
+	_, ok = m.ById("id1")
+	if ok {
+		t.Error("item should not exist after removal by id")
+	}
+	_, ok = m.ByName("name1")
+	if ok {
+		t.Error("item should not exist after removal by id")
+	}
+}
+
+func TestIdNameMap_RemoveByIdNotFound(t *testing.T) {
+	m := NewIdNameMap[testItem]()
+
+	removed := m.RemoveById("nonexistent")
+	if removed {
+		t.Error("RemoveById should return false for nonexistent item")
+	}
+}
+
+func TestIdNameMap_RemoveByName(t *testing.T) {
+	m := NewIdNameMap[testItem]()
+	item := testItem{id: "id1", name: "name1"}
+
+	m.Insert(item)
+
+	// Verify item exists
+	_, ok := m.ById("id1")
+	if !ok {
+		t.Fatal("item should exist before removal")
+	}
+	_, ok = m.ByName("name1")
+	if !ok {
+		t.Fatal("item should exist before removal")
+	}
+
+	// Remove by name
+	removed := m.RemoveByName("name1")
+	if !removed {
+		t.Error("RemoveByName should return true for existing item")
+	}
+
+	// Verify item no longer exists
+	_, ok = m.ById("id1")
+	if ok {
+		t.Error("item should not exist after removal by name")
+	}
+	_, ok = m.ByName("name1")
+	if ok {
+		t.Error("item should not exist after removal by name")
+	}
+}
+
+func TestIdNameMap_RemoveByNameNotFound(t *testing.T) {
+	m := NewIdNameMap[testItem]()
+
+	removed := m.RemoveByName("nonexistent")
+	if removed {
+		t.Error("RemoveByName should return false for nonexistent item")
+	}
+}
+
+func TestIdNameMap_Keys(t *testing.T) {
+	m := NewIdNameMap[testItem]()
+	items := []testItem{
+		{id: "id1", name: "name1"},
+		{id: "id2", name: "name2"},
+		{id: "id3", name: "name3"},
+	}
+
+	for _, item := range items {
+		m.Insert(item)
+	}
+
+	keys := make(map[string]string)
+	for id, name := range m.Keys() {
+		keys[id] = name
+	}
+
+	if len(keys) != 3 {
+		t.Errorf("expected 3 keys, got %d", len(keys))
+	}
+
+	for _, item := range items {
+		name, ok := keys[item.id]
+		if !ok {
+			t.Errorf("missing key for id %s", item.id)
+		}
+		if name != item.name {
+			t.Errorf("wrong name for id %s: got %s, want %s", item.id, name, item.name)
+		}
+	}
+}
+
+func TestIdNameMap_Values(t *testing.T) {
+	m := NewIdNameMap[testItem]()
+	items := []testItem{
+		{id: "id1", name: "name1"},
+		{id: "id2", name: "name2"},
+		{id: "id3", name: "name3"},
+	}
+
+	for _, item := range items {
+		m.Insert(item)
+	}
+
+	values := make(map[string]testItem)
+	for value := range m.Values() {
+		values[value.Id()] = value
+	}
+
+	if len(values) != 3 {
+		t.Errorf("expected 3 values, got %d", len(values))
+	}
+
+	for _, item := range items {
+		value, ok := values[item.id]
+		if !ok {
+			t.Errorf("missing value for id %s", item.id)
+		}
+		if value.Id() != item.id || value.Name() != item.name {
+			t.Errorf("wrong value for id %s: got %+v, want %+v", item.id, value, item)
+		}
+	}
+}
+
+func TestIdNameMap_EmptyIterators(t *testing.T) {
+	m := NewIdNameMap[testItem]()
+
+	// Test empty Keys iterator
+	count := 0
+	for range m.Keys() {
+		count++
+	}
+	if count != 0 {
+		t.Errorf("expected 0 keys in empty map, got %d", count)
+	}
+
+	// Test empty Values iterator
+	count = 0
+	for range m.Values() {
+		count++
+	}
+	if count != 0 {
+		t.Errorf("expected 0 values in empty map, got %d", count)
+	}
+}
+
+func TestIdNameMap_IteratorEarlyBreak(t *testing.T) {
+	m := NewIdNameMap[testItem]()
+	items := []testItem{
+		{id: "id1", name: "name1"},
+		{id: "id2", name: "name2"},
+		{id: "id3", name: "name3"},
+	}
+
+	for _, item := range items {
+		m.Insert(item)
+	}
+
+	// Test early break in Keys iterator
+	count := 0
+	for range m.Keys() {
+		count++
+		if count == 1 {
+			break
+		}
+	}
+	if count != 1 {
+		t.Errorf("expected early break to work in Keys iterator, got count %d", count)
+	}
+
+	// Test early break in Values iterator
+	count = 0
+	for range m.Values() {
+		count++
+		if count == 1 {
+			break
+		}
+	}
+	if count != 1 {
+		t.Errorf("expected early break to work in Values iterator, got count %d", count)
+	}
+}

+ 20 - 1
core/pkg/nodestats/nodestats.go

@@ -7,6 +7,7 @@ import (
 	"io"
 	"io"
 	"net/http"
 	"net/http"
 	"os"
 	"os"
+	"sync"
 
 
 	"github.com/opencost/opencost/core/pkg/clustercache"
 	"github.com/opencost/opencost/core/pkg/clustercache"
 	"github.com/opencost/opencost/core/pkg/log"
 	"github.com/opencost/opencost/core/pkg/log"
@@ -60,6 +61,9 @@ func (nssc *NodeStatsSummaryClient) GetNodeData() ([]*stats.Summary, error) {
 	size := nssc.config.ConcurrentPollers
 	size := nssc.config.ConcurrentPollers
 	nodes := getReadyNodes(nssc.cache)
 	nodes := getReadyNodes(nssc.cache)
 
 
+	var errLock sync.Mutex
+	var errs []error
+
 	work := func(n *clustercache.Node) *stats.Summary {
 	work := func(n *clustercache.Node) *stats.Summary {
 		if n.SpecProviderID == "" {
 		if n.SpecProviderID == "" {
 			log.Warnf("node ProviderID not set, skipping for %s", n.Name)
 			log.Warnf("node ProviderID not set, skipping for %s", n.Name)
@@ -70,12 +74,20 @@ func (nssc *NodeStatsSummaryClient) GetNodeData() ([]*stats.Summary, error) {
 
 
 		resp, err := requestNodeData(connections, nssc.endpoint, bearerToken)
 		resp, err := requestNodeData(connections, nssc.endpoint, bearerToken)
 		if err != nil {
 		if err != nil {
+			errLock.Lock()
+			errs = append(errs, err)
+			errLock.Unlock()
+
 			log.Warnf("error retrieving node data: %s", err)
 			log.Warnf("error retrieving node data: %s", err)
 			return nil
 			return nil
 		}
 		}
 
 
 		data, err := nodeResponseToStatSummary(resp)
 		data, err := nodeResponseToStatSummary(resp)
 		if err != nil {
 		if err != nil {
+			errLock.Lock()
+			errs = append(errs, err)
+			errLock.Unlock()
+
 			log.Warnf("error converting node data: %s", err)
 			log.Warnf("error converting node data: %s", err)
 			return nil
 			return nil
 		}
 		}
@@ -83,7 +95,14 @@ func (nssc *NodeStatsSummaryClient) GetNodeData() ([]*stats.Summary, error) {
 		return data
 		return data
 	}
 	}
 
 
-	return worker.ConcurrentCollectWith(size, work, nodes), nil
+	results := worker.ConcurrentCollectWith(size, work, nodes)
+
+	// no need to lock, as the concurrent collect blocks until all complete
+	var err error = nil
+	if len(errs) > 0 {
+		err = errors.Join(errs...)
+	}
+	return results, err
 }
 }
 
 
 // connectionOptions returns the connection methods that are allowed for this node based on config
 // connectionOptions returns the connection methods that are allowed for this node based on config

+ 1 - 0
modules/collector-source/go.mod

@@ -6,6 +6,7 @@ go 1.24.2
 
 
 require (
 require (
 	github.com/julienschmidt/httprouter v1.3.0
 	github.com/julienschmidt/httprouter v1.3.0
+	github.com/kubecost/events v0.0.8
 	github.com/opencost/opencost/core v0.0.0-20250521155634-81d2b597d1bc
 	github.com/opencost/opencost/core v0.0.0-20250521155634-81d2b597d1bc
 	golang.org/x/exp v0.0.0-20250408133849-7e4ce0ab07d0
 	golang.org/x/exp v0.0.0-20250408133849-7e4ce0ab07d0
 	k8s.io/api v0.33.1
 	k8s.io/api v0.33.1

+ 2 - 2
modules/collector-source/go.sum

@@ -191,8 +191,6 @@ github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PU
 github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
 github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
 github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
 github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
 github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
 github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
-github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17wHk=
-github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk=
 github.com/golang-jwt/jwt/v5 v5.3.0 h1:pv4AsKCKKZuqlgs5sUmn4x8UlGa0kEVt/puTpKx9vvo=
 github.com/golang-jwt/jwt/v5 v5.3.0 h1:pv4AsKCKKZuqlgs5sUmn4x8UlGa0kEVt/puTpKx9vvo=
 github.com/golang-jwt/jwt/v5 v5.3.0/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE=
 github.com/golang-jwt/jwt/v5 v5.3.0/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE=
 github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
 github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
@@ -331,6 +329,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
 github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
 github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
 github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
 github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
 github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
 github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
+github.com/kubecost/events v0.0.8 h1:FEglMSOGkjiSZT2FnSYM99s2M4DMiBOgHVheM7Vnurs=
+github.com/kubecost/events v0.0.8/go.mod h1:PXnE7CSZs3OulOLcB8baQENploBp4NM7ERZVBCqNi4A=
 github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
 github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
 github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
 github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
 github.com/magiconair/properties v1.8.5 h1:b6kJs+EmPFMYGkow9GiUyCyOvIwYetYJ3fSaWak/Gls=
 github.com/magiconair/properties v1.8.5 h1:b6kJs+EmPFMYGkow9GiUyCyOvIwYetYJ3fSaWak/Gls=

+ 2 - 0
modules/collector-source/pkg/collector/datasource.go

@@ -113,7 +113,9 @@ func (c *collectorDataSource) RegisterEndPoints(router *httprouter.Router) {
 
 
 func (c *collectorDataSource) RegisterDiagnostics(diagService diagnostics.DiagnosticService) {
 func (c *collectorDataSource) RegisterDiagnostics(diagService diagnostics.DiagnosticService) {
 	const CollectorDiagnosticCategory = "collector"
 	const CollectorDiagnosticCategory = "collector"
+
 	diagnosticDefinitions := c.diagnosticsModule.DiagnosticsDefinitions()
 	diagnosticDefinitions := c.diagnosticsModule.DiagnosticsDefinitions()
+
 	for _, dd := range diagnosticDefinitions {
 	for _, dd := range diagnosticDefinitions {
 		err := diagService.Register(dd.ID, dd.Description, CollectorDiagnosticCategory, func(ctx context.Context) (map[string]any, error) {
 		err := diagService.Register(dd.ID, dd.Description, CollectorDiagnosticCategory, func(ctx context.Context) (map[string]any, error) {
 			details, err := c.diagnosticsModule.DiagnosticsDetails(dd.ID)
 			details, err := c.diagnosticsModule.DiagnosticsDetails(dd.ID)

+ 38 - 0
modules/collector-source/pkg/event/scrape.go

@@ -0,0 +1,38 @@
+package event
+
+const (
+	DCGMScraperName              = "dcgm-metrics"
+	OpenCostScraperName          = "opencost-metrics"
+	NodeStatsScraperName         = "nodestats-metrics"
+	NetworkCostsScraperName      = "network-costs-metrics"
+	KubernetesClusterScraperName = "kubernetes-metrics"
+)
+
+const (
+	NodeScraperType        = "nodes"
+	NamespaceScraperType   = "namespaces"
+	ReplicaSetScraperType  = "replicasets"
+	DeploymentScraperType  = "deployments"
+	StatefulSetScraperType = "statefulsets"
+	ServiceScraperType     = "services"
+	PodScraperType         = "pods"
+	PvScraperType          = "pvs"
+	PvcScraperType         = "pvcs"
+)
+
+// ScrapeEvent is dispatched when a scrape is performed over a set of targets. It contains the name
+// of the scraper performing the scrape, the total number of targets, and any errors encountered.
+type ScrapeEvent struct {
+	// The name of the actual Scraper implementation performing the target scrapes.
+	ScraperName string
+
+	// The type of scrape being performed. For example, if a scraper performs multiple scrapes
+	// for different resources, this field can be used to distinguish between them.
+	ScrapeType string
+
+	// The total number of targets being accessed by the scraper.
+	Targets int
+
+	// Any errors that occurred during the scrape.
+	Errors []error
+}

+ 440 - 60
modules/collector-source/pkg/metric/diagnostics.go

@@ -4,80 +4,428 @@ import (
 	"fmt"
 	"fmt"
 	"maps"
 	"maps"
 	"sync"
 	"sync"
+	"time"
+
+	"github.com/kubecost/events"
+	"github.com/opencost/opencost/core/pkg/collections"
+	"github.com/opencost/opencost/core/pkg/util/sliceutil"
+	"github.com/opencost/opencost/modules/collector-source/pkg/event"
 )
 )
 
 
 // Collector Metric Diagnostic IDs
 // Collector Metric Diagnostic IDs
 const (
 const (
-	// KubecostDiagnosticMetricID is the identifier for the metric used to determine if Kubecost metrics are being scraped.
-	KubecostDiagnosticMetricID = "kubecostMetric"
+	// OpencostDiagnosticMetricID is the identifier for the metric used to determine if Opencost metrics are being updated
+	OpencostDiagnosticMetricID = "opencostMetric"
 
 
-	// NodesDiagnosticMetricID is the identifier for the query used to determine if the node CPU cores capacity is being scraped
+	// NodesDiagnosticMetricID is the identifier for the query used to determine if the node CPU cores capacity is being updated
 	NodesDiagnosticMetricID = "nodesCPUMetrics"
 	NodesDiagnosticMetricID = "nodesCPUMetrics"
+
+	// DcgmScraperDiagnosticID contains the identifier for the the DCGM scraper diagnostic.
+	DcgmScraperDiagnosticID = event.DCGMScraperName
+
+	// OpenCostScraperDiagnosticID contains the identifier for the the opencost metrics scraper diagnostic
+	OpenCostScraperDiagnosticID = event.OpenCostScraperName
+
+	// NodeStatsScraperDiagnosticID contains the identifier for the the node stats summary scraper diagnostic
+	NodeStatsScraperDiagnosticID = event.NodeStatsScraperName
+
+	// NetworkCostsScraperDiagnosticID contains the identifier for the the network-costs scraper diagnostic.
+	NetworkCostsScraperDiagnosticID = event.NetworkCostsScraperName
+
+	// Kubernetes scrapers contains the identifiers for all the specific KubernetesCluster scrapers.
+	KubernetesNodesScraperDiagnosticID        = event.KubernetesClusterScraperName + "-" + event.NodeScraperType
+	KubernetesNamespacesScraperDiagnosticID   = event.KubernetesClusterScraperName + "-" + event.NamespaceScraperType
+	KubernetesReplicaSetsScraperDiagnosticID  = event.KubernetesClusterScraperName + "-" + event.ReplicaSetScraperType
+	KubernetesDeploymentsScraperDiagnosticID  = event.KubernetesClusterScraperName + "-" + event.DeploymentScraperType
+	KubernetesStatefulSetsScraperDiagnosticID = event.KubernetesClusterScraperName + "-" + event.StatefulSetScraperType
+	KubernetesServicesScraperDiagnosticID     = event.KubernetesClusterScraperName + "-" + event.ServiceScraperType
+	KubernetesPodsScraperDiagnosticID         = event.KubernetesClusterScraperName + "-" + event.PodScraperType
+	KubernetesPvsScraperDiagnosticID          = event.KubernetesClusterScraperName + "-" + event.PvScraperType
+	KubernetesPvcsScraperDiagnosticID         = event.KubernetesClusterScraperName + "-" + event.PvcScraperType
+)
+
+// DiagnosticType is used in the definitions to determine which type of implementation to use when representing the
+// diagnostic
+type DiagnosticType int
+
+const (
+	DiagnosticTypeMetric  DiagnosticType = 0
+	DiagnosticTypeScraper DiagnosticType = 1
+	// more diagnostic types?
 )
 )
 
 
+// diagnostic defintion is the type used to define a deterministic list of specific diagnostics we _expect_ to collect
+type diagnosticDefinition struct {
+	ID          string
+	MetricName  string
+	Label       string
+	Description string
+	DocLink     string
+	DiagType    DiagnosticType
+}
+
 // diagnostic definitions mapping holds all of the diagnostic definitions that can be used for collector metrics diagnostics
 // diagnostic definitions mapping holds all of the diagnostic definitions that can be used for collector metrics diagnostics
 var diagnosticDefinitions map[string]*diagnosticDefinition = map[string]*diagnosticDefinition{
 var diagnosticDefinitions map[string]*diagnosticDefinition = map[string]*diagnosticDefinition{
 	NodesDiagnosticMetricID: {
 	NodesDiagnosticMetricID: {
 		ID:          NodesDiagnosticMetricID,
 		ID:          NodesDiagnosticMetricID,
 		MetricName:  KubeNodeStatusCapacityCPUCores,
 		MetricName:  KubeNodeStatusCapacityCPUCores,
 		Label:       "Node CPU cores capacity is being scraped",
 		Label:       "Node CPU cores capacity is being scraped",
-		Description: "Determine if the node CPU cores capacity is being scraped",
+		Description: "Determine if the node CPU cores capacity metrics are being updated",
+		DiagType:    DiagnosticTypeMetric,
 	},
 	},
 
 
-	KubecostDiagnosticMetricID: {
-		ID:          KubecostDiagnosticMetricID,
+	OpencostDiagnosticMetricID: {
+		ID:          OpencostDiagnosticMetricID,
 		MetricName:  NodeTotalHourlyCost,
 		MetricName:  NodeTotalHourlyCost,
-		Label:       "Kubecost metrics for a node are being scraped",
-		Description: "Determine if kubecost metrics for a node are being scraped",
+		Label:       "Opencost metrics for a node are being scraped",
+		Description: "Determine if opencost metrics for a node are being updated",
+		DiagType:    DiagnosticTypeMetric,
+	},
+
+	DcgmScraperDiagnosticID: {
+		ID:          DcgmScraperDiagnosticID,
+		MetricName:  event.DCGMScraperName,
+		Label:       "DCGM scraper is available and is being scraped.",
+		Description: scraperDiagnosticDescriptionFor(event.DCGMScraperName, ""),
+		DiagType:    DiagnosticTypeScraper,
+	},
+
+	OpenCostScraperDiagnosticID: {
+		ID:          OpenCostScraperDiagnosticID,
+		MetricName:  event.OpenCostScraperName,
+		Label:       "Opencost metrics scraper is available and is being scraped.",
+		Description: scraperDiagnosticDescriptionFor(event.OpenCostScraperName, ""),
+		DiagType:    DiagnosticTypeScraper,
+	},
+
+	NodeStatsScraperDiagnosticID: {
+		ID:          NodeStatsScraperDiagnosticID,
+		MetricName:  event.NodeStatsScraperName,
+		Label:       "Node stats summary scraper is available and is being scraped.",
+		Description: scraperDiagnosticDescriptionFor(event.NodeStatsScraperName, ""),
+		DiagType:    DiagnosticTypeScraper,
+	},
+
+	NetworkCostsScraperDiagnosticID: {
+		ID:          NetworkCostsScraperDiagnosticID,
+		MetricName:  event.NetworkCostsScraperName,
+		Label:       "Network costs daemonset metrics scrapers are available and being scraped.",
+		Description: scraperDiagnosticDescriptionFor(event.NetworkCostsScraperName, ""),
+		DiagType:    DiagnosticTypeScraper,
+	},
+
+	KubernetesNodesScraperDiagnosticID: {
+		ID:          KubernetesNodesScraperDiagnosticID,
+		MetricName:  KubernetesNodesScraperDiagnosticID,
+		Label:       fmt.Sprintf("Kubernetes cluster resources: %s are available and being scraped", event.NodeScraperType),
+		Description: scraperDiagnosticDescriptionFor(event.KubernetesClusterScraperName, event.NodeScraperType),
+		DiagType:    DiagnosticTypeScraper,
+	},
+
+	KubernetesNamespacesScraperDiagnosticID: {
+		ID:          KubernetesNamespacesScraperDiagnosticID,
+		MetricName:  KubernetesNamespacesScraperDiagnosticID,
+		Label:       fmt.Sprintf("Kubernetes cluster resources: %s are available and being scraped", event.NamespaceScraperType),
+		Description: scraperDiagnosticDescriptionFor(event.KubernetesClusterScraperName, event.NamespaceScraperType),
+		DiagType:    DiagnosticTypeScraper,
+	},
+
+	KubernetesReplicaSetsScraperDiagnosticID: {
+		ID:          KubernetesReplicaSetsScraperDiagnosticID,
+		MetricName:  KubernetesReplicaSetsScraperDiagnosticID,
+		Label:       fmt.Sprintf("Kubernetes cluster resources: %s are available and being scraped", event.ReplicaSetScraperType),
+		Description: scraperDiagnosticDescriptionFor(event.KubernetesClusterScraperName, event.ReplicaSetScraperType),
+		DiagType:    DiagnosticTypeScraper,
+	},
+
+	KubernetesDeploymentsScraperDiagnosticID: {
+		ID:          KubernetesDeploymentsScraperDiagnosticID,
+		MetricName:  KubernetesDeploymentsScraperDiagnosticID,
+		Label:       fmt.Sprintf("Kubernetes cluster resources: %s are available and being scraped", event.DeploymentScraperType),
+		Description: scraperDiagnosticDescriptionFor(event.KubernetesClusterScraperName, event.DeploymentScraperType),
+		DiagType:    DiagnosticTypeScraper,
+	},
+
+	KubernetesStatefulSetsScraperDiagnosticID: {
+		ID:          KubernetesStatefulSetsScraperDiagnosticID,
+		MetricName:  KubernetesStatefulSetsScraperDiagnosticID,
+		Label:       fmt.Sprintf("Kubernetes cluster resources: %s are available and being scraped", event.StatefulSetScraperType),
+		Description: scraperDiagnosticDescriptionFor(event.KubernetesClusterScraperName, event.StatefulSetScraperType),
+		DiagType:    DiagnosticTypeScraper,
+	},
+
+	KubernetesServicesScraperDiagnosticID: {
+		ID:          KubernetesServicesScraperDiagnosticID,
+		MetricName:  KubernetesServicesScraperDiagnosticID,
+		Label:       fmt.Sprintf("Kubernetes cluster resources: %s are available and being scraped", event.ServiceScraperType),
+		Description: scraperDiagnosticDescriptionFor(event.KubernetesClusterScraperName, event.ServiceScraperType),
+		DiagType:    DiagnosticTypeScraper,
+	},
+
+	KubernetesPodsScraperDiagnosticID: {
+		ID:          KubernetesPodsScraperDiagnosticID,
+		MetricName:  KubernetesPodsScraperDiagnosticID,
+		Label:       fmt.Sprintf("Kubernetes cluster resources: %s are available and being scraped", event.PodScraperType),
+		Description: scraperDiagnosticDescriptionFor(event.KubernetesClusterScraperName, event.PodScraperType),
+		DiagType:    DiagnosticTypeScraper,
+	},
+
+	KubernetesPvsScraperDiagnosticID: {
+		ID:          KubernetesPvsScraperDiagnosticID,
+		MetricName:  KubernetesPvsScraperDiagnosticID,
+		Label:       fmt.Sprintf("Kubernetes cluster resources: %s are available and being scraped", event.PvScraperType),
+		Description: scraperDiagnosticDescriptionFor(event.KubernetesClusterScraperName, event.PvScraperType),
+		DiagType:    DiagnosticTypeScraper,
+	},
+
+	KubernetesPvcsScraperDiagnosticID: {
+		ID:          KubernetesPvcsScraperDiagnosticID,
+		MetricName:  KubernetesPvcsScraperDiagnosticID,
+		Label:       fmt.Sprintf("Kubernetes cluster resources: %s are available and being scraped", event.PvcScraperType),
+		Description: scraperDiagnosticDescriptionFor(event.KubernetesClusterScraperName, event.PvcScraperType),
+		DiagType:    DiagnosticTypeScraper,
 	},
 	},
 }
 }
 
 
-// diagnosticsResults stores the current state of diagnostic results
-var diagnosticsResults map[string]*diagnosticsResult = make(map[string]*diagnosticsResult)
+// scraper identifier for diagnostic mapping _must_ match diagnostic ids defined above
+func scraperIdFor(scraperName, scrapeType string) string {
+	if scrapeType == "" {
+		return scraperName
+	}
+	return fmt.Sprintf("%s-%s", scraperName, scrapeType)
+}
 
 
-type diagnosticDefinition struct {
-	ID          string
-	MetricName  string
-	Label       string
-	Description string
-	DocLink     string
+// helper for generating dynamic scraper events diagnostic descriptions
+func scraperDiagnosticDescriptionFor(scraperName, scrapeType string) string {
+	if scrapeType == "" {
+		return fmt.Sprintf("Determine if the scraper for: %s is correctly reporting data", scraperName)
+	}
+	return fmt.Sprintf("Determine if the scraper for: %s is correctly report data for type: %s", scraperName, scrapeType)
 }
 }
 
 
-type diagnosticsResult struct {
-	Result map[string]any
-	Passed bool
+// CollectorDiagnostic is a basic interface used to allow various types of diagnostic data collection
+type CollectorDiagnostic interface {
+	// Id returns the identifier for the diagnostic
+	Id() string
+
+	// Name returns the name of the metric being run
+	Name() string
+
+	// Details generates an exportable detail map for the specific diagnostic, and resets any of its internal
+	// state for the current cycle.
+	Details() map[string]any
 }
 }
 
 
+// metric diagnostic is checked on metrics update -- it maintains a historic record of all the instants
+// a specific metric was updated, and reports a diagnostic on the validity of that history.
+type metricDiagnostic struct {
+	diagnostic       *diagnosticDefinition
+	updateTimestamps []time.Time
+	result           map[string]float64
+}
+
+// creates a new metric diagnostic
+func newMetricDiagnostic(diagnostic *diagnosticDefinition) *metricDiagnostic {
+	return &metricDiagnostic{
+		diagnostic: diagnostic,
+		result:     make(map[string]float64),
+	}
+}
+
+// Id returns the identifier for the metric diagnostic type -- this just proxies from the diagnostic
+// definition.
+func (md *metricDiagnostic) Id() string {
+	return md.diagnostic.ID
+}
+
+// Name returns the name of the metric being run for the metric diagnostic type -- this just proxies from
+// the diagnostic definition.
+func (md *metricDiagnostic) Name() string {
+	return md.diagnostic.MetricName
+}
+
+// Details generates an exportable detail map for the specific diagnostic, and resets any of its internal
+// state for the current cycle.
+func (md *metricDiagnostic) Details() map[string]any {
+	// for all timestamps that occurred during our update cycle,
+	// if any timestamps for our metric do not exist, then we
+	// say that the diagnostic failed. if there are no timestamps
+	// marked in the result, then we also say the diagnostic failed.
+	passed := true
+	if len(md.result) == 0 {
+		passed = false
+	} else {
+		for _, t := range md.updateTimestamps {
+			key := t.Format(time.RFC3339)
+
+			_, hasTimestamp := md.result[key]
+			if !hasTimestamp {
+				passed = false
+				break
+			}
+		}
+	}
+
+	details := map[string]any{
+		"query":   md.Name(),
+		"label":   md.diagnostic.Label,
+		"docLink": md.diagnostic.DocLink,
+		"result":  maps.Clone(md.result),
+		"passed":  passed,
+	}
+
+	// reset the update timestamps and results
+	md.updateTimestamps = []time.Time{}
+	for k := range md.result {
+		delete(md.result, k)
+	}
+
+	return details
+}
+
+// scrapeDiagnostic maintains the latest state of each scrape event that occurs. scrape
+// events can be registered for any event, but only the specific scrapes with diagnostic
+// definitions defined will export as diagnostics.
+type scrapeDiagnostic struct {
+	diagnostic *diagnosticDefinition
+	scraper    string
+	scrapeType string
+	targets    int
+	errors     []error
+}
+
+// creates a new scrape diagnostic from the event data and diagnostics definition
+func newScrapeDiagnostic(
+	scrapeEvent event.ScrapeEvent,
+	definition *diagnosticDefinition,
+) *scrapeDiagnostic {
+	return &scrapeDiagnostic{
+		diagnostic: definition,
+		scraper:    scrapeEvent.ScraperName,
+		scrapeType: scrapeEvent.ScrapeType,
+		targets:    scrapeEvent.Targets,
+		errors:     scrapeEvent.Errors,
+	}
+}
+
+// Id is a concatenation of scraper and scrapeType if a scrapeType exists.
+func (sd *scrapeDiagnostic) Id() string {
+	if sd.diagnostic != nil {
+		return sd.diagnostic.ID
+	}
+	return scraperIdFor(sd.scraper, sd.scrapeType)
+}
+
+// Name returns the name of the scraper the event fired from.
+func (sd *scrapeDiagnostic) Name() string {
+	return sd.scraper
+}
+
+// Details generates an exportable detail map for the specific diagnostic, and resets any of its internal
+// state for the current cycle.
+func (sd *scrapeDiagnostic) Details() map[string]any {
+	// passed if there are no errors
+	passed := len(sd.errors) == 0
+
+	// map errors to a string slice for easier propagation
+	var errs []string
+	if !passed {
+		errs = sliceutil.Map(sd.errors, func(e error) string { return e.Error() })
+	} else {
+		errs = []string{}
+	}
+
+	// since a scrape event does not require a matching diagnostic definition,
+	// we must generate properties normally extracted from the defintiion
+	var label string
+	if sd.diagnostic != nil {
+		label = sd.diagnostic.Label
+	} else {
+		label = fmt.Sprintf("%s scraper is available and being scraped.", sd.scraper)
+	}
+
+	// same for doclink
+	var docLink string
+	if sd.diagnostic != nil {
+		docLink = sd.diagnostic.DocLink
+	} else {
+		docLink = ""
+	}
+
+	details := map[string]any{
+		// stats contains total entities to scrape, success (of the total), and failures (of the total)
+		"stats": map[string]any{
+			"total":   sd.targets,
+			"success": max(sd.targets-len(errs), 0),
+			"fail":    len(errs),
+		},
+		"label":   label,
+		"docLink": docLink,
+		"errors":  errs,
+		"passed":  passed,
+	}
+
+	// scraper diagnostics do not maintain any internal/historical state
+	// to reset -- it just maintains the most recent data. if we decide
+	// to track historical event data, would need to reset the state after
+	// this call.
+
+	return details
+}
+
+// DiagnosticsModule is a helper type for managing all of the internal diagnostics for the collector datasource.
 type DiagnosticsModule struct {
 type DiagnosticsModule struct {
-	lock    sync.RWMutex
-	updater Updater
+	lock            sync.RWMutex
+	diagnostics     *collections.IdNameMap[CollectorDiagnostic]
+	updater         Updater
+	scrapeHandlerId events.HandlerID // scrape event handler identifier for removal
 }
 }
 
 
+// NewDiagnosticsModule creates a new `DiagnosticsModule` instance to be used with a collector data source
 func NewDiagnosticsModule(updater Updater) *DiagnosticsModule {
 func NewDiagnosticsModule(updater Updater) *DiagnosticsModule {
-	// Initialize diagnostics results to false to represent that no data has been collected yet
-	for id := range diagnosticDefinitions {
-		diagnosticsResults[id] = &diagnosticsResult{
-			Result: make(map[string]any),
-			Passed: false,
+	// initialize all metric diagnostics IFF the diagnostic type is "metrics"
+	// NOTE: scraper diagnostics are dynamically created as scrape results arrive
+	diagnostics := collections.NewIdNameMap[CollectorDiagnostic]()
+	for _, def := range diagnosticDefinitions {
+		// only insert metric diagnostic types
+		if def.DiagType == DiagnosticTypeMetric {
+			diagnostics.Insert(newMetricDiagnostic(def))
 		}
 		}
 	}
 	}
 
 
-	return &DiagnosticsModule{
-		updater: updater,
+	dm := &DiagnosticsModule{
+		diagnostics: diagnostics,
+		updater:     updater,
 	}
 	}
+
+	scrapeEvents := events.GlobalDispatcherFor[event.ScrapeEvent]()
+	dm.scrapeHandlerId = scrapeEvents.AddEventHandler(dm.onScrapeEvent)
+
+	return dm
 }
 }
 
 
-func (d *DiagnosticsModule) Update(updateSet *UpdateSet) {
-	if updateSet == nil {
+// handles a scrape event dispatched -- updates the record for the specific scrape
+// diagnostic.
+func (d *DiagnosticsModule) onScrapeEvent(event event.ScrapeEvent) {
+	d.lock.Lock()
+	defer d.lock.Unlock()
+
+	id := scraperIdFor(event.ScraperName, event.ScrapeType)
+
+	// scrape events can occur without a backing diagnostic definition -- just
+	// ignore if this happens
+	def, ok := diagnosticDefinitions[id]
+	if !ok {
 		return
 		return
 	}
 	}
 
 
-	// Create a deep copy for the async update to avoid race condition
-	updateSetCopy := &UpdateSet{
-		Timestamp: updateSet.Timestamp,
-		Updates:   make([]Update, len(updateSet.Updates)),
+	d.diagnostics.Insert(newScrapeDiagnostic(event, def))
+}
+
+func (d *DiagnosticsModule) Update(updateSet *UpdateSet) {
+	if updateSet == nil {
+		return
 	}
 	}
-	copy(updateSetCopy.Updates, updateSet.Updates)
 
 
 	// This is done so that the update func is marked complete when both the updater and diagnostics are done
 	// This is done so that the update func is marked complete when both the updater and diagnostics are done
 	// Otherwise we might face a race condition when calling the diagnostics details func before the diagnostics are done
 	// Otherwise we might face a race condition when calling the diagnostics details func before the diagnostics are done
@@ -86,23 +434,28 @@ func (d *DiagnosticsModule) Update(updateSet *UpdateSet) {
 
 
 	go func() {
 	go func() {
 		defer wg.Done()
 		defer wg.Done()
+
 		d.lock.Lock()
 		d.lock.Lock()
 		defer d.lock.Unlock()
 		defer d.lock.Unlock()
 
 
-		timestamp := updateSet.Timestamp.String()
-		for id, dd := range diagnosticDefinitions {
-			for _, update := range updateSet.Updates {
-				if update.Name == dd.MetricName {
-					if len(diagnosticsResults[id].Result) == 0 {
-						// For the first UpdateSet received for that metric, we default to true. If we later miss the metric for a timestamp, it will be set to false.
-						diagnosticsResults[id].Passed = true
-					}
-					diagnosticsResults[id].Result[timestamp] = update.Value
+		// add the timestamp to all metric diagnostic instances (see notes on addUpdateTimestamp)
+		ts := updateSet.Timestamp
+		d.addUpdateTimestamp(ts)
+
+		timestamp := ts.Format(time.RFC3339)
+
+		for _, update := range updateSet.Updates {
+			if metric, ok := d.diagnostics.ByName(update.Name); ok {
+				// this is unfortunately necessary due to the way our diangostic collectors
+				// differ in functionality -- it makes more sense to duck type here rather
+				// than maintain a separate map of just the metric types, or add metric
+				// specific implementation details to the CollectorDiagnostic interface.
+				// generally, we _should_ be able to make this assertion -- but we'll check in case.
+				if metricDiag, isType := metric.(*metricDiagnostic); isType {
+					// mark the timestamp as "seen" with the value
+					metricDiag.result[timestamp] = update.Value
 				}
 				}
 			}
 			}
-			if diagnosticsResults[id].Result[timestamp] == nil {
-				diagnosticsResults[id].Passed = false
-			}
 		}
 		}
 	}()
 	}()
 
 
@@ -110,33 +463,60 @@ func (d *DiagnosticsModule) Update(updateSet *UpdateSet) {
 	// as this function gets the new call only when both these go routines are done
 	// as this function gets the new call only when both these go routines are done
 	go func() {
 	go func() {
 		defer wg.Done()
 		defer wg.Done()
-		d.updater.Update(updateSetCopy)
+		d.updater.Update(updateSet)
 	}()
 	}()
 
 
 	wg.Wait()
 	wg.Wait()
 }
 }
 
 
+// appends an update timestamp on each of the metric diagnostics -- we need to write
+// every timestamp that the update makes unfortunately. There isn't a way to determine
+// if a diagnostic service "cycle" is complete, so it's not really possible to maintain
+// a most recent timestamps on the DiagnosticsModule (the optimal solution). we're not
+// far from a solid design here, just might need some more support on the diagnostic
+// service side.
+func (d *DiagnosticsModule) addUpdateTimestamp(t time.Time) {
+	for _, def := range diagnosticDefinitions {
+		if def.DiagType != DiagnosticTypeMetric {
+			continue
+		}
+
+		diag, ok := d.diagnostics.ById(def.ID)
+		if !ok {
+			continue
+		}
+
+		// More duck typing sadly -- there are some fundamental design incompatibilities
+		// with the way DiagnosticService was written and this cached diagnostic approach
+		// that make things like "cycle" resets a bit difficult
+		if metricDiag, ok := diag.(*metricDiagnostic); ok {
+			metricDiag.updateTimestamps = append(metricDiag.updateTimestamps, t)
+		}
+	}
+}
+
+// DiagnosticDefinitions returns a deterministic mapping of pre-defined diagnostics used with the collector.
 func (d *DiagnosticsModule) DiagnosticsDefinitions() map[string]*diagnosticDefinition {
 func (d *DiagnosticsModule) DiagnosticsDefinitions() map[string]*diagnosticDefinition {
 	return diagnosticDefinitions
 	return diagnosticDefinitions
 }
 }
 
 
+// DiagnosticDetails returns the latest details for the diagnostic type
 func (d *DiagnosticsModule) DiagnosticsDetails(diagnosticsId string) (map[string]any, error) {
 func (d *DiagnosticsModule) DiagnosticsDetails(diagnosticsId string) (map[string]any, error) {
 	d.lock.RLock()
 	d.lock.RLock()
 	defer d.lock.RUnlock()
 	defer d.lock.RUnlock()
 
 
+	// If a bogus diagnostics id was passed, we can check the definitions first
 	if _, exists := diagnosticDefinitions[diagnosticsId]; !exists {
 	if _, exists := diagnosticDefinitions[diagnosticsId]; !exists {
-		return nil, fmt.Errorf("diagnostic ID: %s not found", diagnosticsId)
+		return nil, fmt.Errorf("invalid diagnostic id: %s not found", diagnosticsId)
 	}
 	}
 
 
-	details := map[string]any{
-		"query":   diagnosticDefinitions[diagnosticsId].MetricName,
-		"label":   diagnosticDefinitions[diagnosticsId].Label,
-		"docLink": diagnosticDefinitions[diagnosticsId].DocLink,
-		"result":  maps.Clone(diagnosticsResults[diagnosticsId].Result),
-		"passed":  diagnosticsResults[diagnosticsId].Passed,
-	}
-	// reset the result and passed for the next run
-	diagnosticsResults[diagnosticsId].Result = make(map[string]any)
-	diagnosticsResults[diagnosticsId].Passed = false
-	return details, nil
+	// for some diagnostics, like the scraper variant, they may not have been registered
+	// yet (no scrape events), so we should return an error indicating that the scrape
+	// hasn't occurred yet
+	diagnostic, exists := d.diagnostics.ById(diagnosticsId)
+	if !exists {
+		return nil, fmt.Errorf("diagnostic not available: %s", diagnosticsId)
+	}
+
+	return diagnostic.Details(), nil
 }
 }

+ 106 - 4
modules/collector-source/pkg/metric/diagnostics_test.go

@@ -1,8 +1,12 @@
 package metric
 package metric
 
 
 import (
 import (
+	"fmt"
 	"testing"
 	"testing"
 	"time"
 	"time"
+
+	"github.com/kubecost/events"
+	"github.com/opencost/opencost/modules/collector-source/pkg/event"
 )
 )
 
 
 // MockUpdater implements the Updater interface for testing
 // MockUpdater implements the Updater interface for testing
@@ -44,15 +48,113 @@ func TestDiagnosticsModule_Update(t *testing.T) {
 		t.Error("Expected node diagnostic to pass")
 		t.Error("Expected node diagnostic to pass")
 	}
 	}
 
 
-	kubecostDetails, err := module.DiagnosticsDetails(KubecostDiagnosticMetricID)
+	opencostDetails, err := module.DiagnosticsDetails(OpencostDiagnosticMetricID)
 	if err != nil {
 	if err != nil {
 		t.Error("Expected no error for valid diagnostic ID")
 		t.Error("Expected no error for valid diagnostic ID")
 	}
 	}
-	if kubecostDetails["passed"] != true {
+	if opencostDetails["passed"] != true {
 		t.Error("Expected kubecost diagnostic to pass")
 		t.Error("Expected kubecost diagnostic to pass")
 	}
 	}
 }
 }
 
 
+func TestDiagnosticsModule_ScrapeDiagnostics(t *testing.T) {
+	mockUpdater := &MockUpdater{}
+	module := NewDiagnosticsModule(mockUpdater)
+
+	// dispatch some faux scrape events
+	events.Dispatch(event.ScrapeEvent{
+		ScraperName: event.NetworkCostsScraperName,
+		Targets:     10,
+		Errors:      []error{},
+	})
+
+	events.Dispatch(event.ScrapeEvent{
+		ScraperName: event.KubernetesClusterScraperName,
+		ScrapeType:  event.NodeScraperType,
+		Targets:     8,
+		Errors: []error{
+			fmt.Errorf("failed to scrape node 'foo'"),
+			fmt.Errorf("failed to scrape node 'bar'"),
+		},
+	})
+
+	time.Sleep(500 * time.Millisecond)
+
+	networkDiagnosticDetails, err := module.DiagnosticsDetails(NetworkCostsScraperDiagnosticID)
+	if err != nil {
+		t.Fatalf("unexpected error: %s", err)
+		return
+	}
+
+	stats := networkDiagnosticDetails["stats"].(map[string]any)
+	errors := networkDiagnosticDetails["errors"].([]string)
+	label := networkDiagnosticDetails["label"].(string)
+
+	statsTotal := stats["total"].(int)
+	statsSuccess := stats["success"].(int)
+	statsFail := stats["fail"].(int)
+
+	if statsTotal != 10 {
+		t.Fatalf("expected networkCostsDetails[\"stats\"][\"total\"] to equal 10, got: %d", statsTotal)
+		return
+	}
+	if statsSuccess != 10 {
+		t.Fatalf("expected networkCostsDetails[\"stats\"][\"success\"] to equal 10, got: %d", statsSuccess)
+		return
+	}
+	if statsFail != 0 {
+		t.Fatalf("expected networkCostsDetails[\"stats\"][\"fail\"] to equal 0, got: %d", statsFail)
+		return
+	}
+
+	if len(errors) != 0 {
+		t.Fatalf("expected len(networkCostsDetails[\"errors\"]) to equal 0, got: %d", len(errors))
+		return
+	}
+
+	if len(label) == 0 {
+		t.Fatalf("expected len(networkCostsDetails[\"label\"]) to be non-zero. Got 0.")
+		return
+	}
+
+	nodeScrapeDetails, err := module.DiagnosticsDetails(KubernetesNodesScraperDiagnosticID)
+	if err != nil {
+		t.Fatalf("unexpected error: %s", err)
+		return
+	}
+
+	stats = nodeScrapeDetails["stats"].(map[string]any)
+	errors = nodeScrapeDetails["errors"].([]string)
+	label = nodeScrapeDetails["label"].(string)
+
+	statsTotal = stats["total"].(int)
+	statsSuccess = stats["success"].(int)
+	statsFail = stats["fail"].(int)
+
+	if statsTotal != 8 {
+		t.Fatalf("expected nodeScrapeDetails[\"stats\"][\"total\"] to equal 8, got: %d", statsTotal)
+		return
+	}
+	if statsSuccess != 6 {
+		t.Fatalf("expected nodeScrapeDetails[\"stats\"][\"success\"] to equal 6, got: %d", statsSuccess)
+		return
+	}
+	if statsFail != 2 {
+		t.Fatalf("expected nodeScrapeDetails[\"stats\"][\"fail\"] to equal 2, got: %d", statsFail)
+		return
+	}
+
+	if len(errors) != 2 {
+		t.Fatalf("expected len(nodeScrapeDetails[\"errors\"]) to equal 2, got: %d", len(errors))
+		return
+	}
+
+	if len(label) == 0 {
+		t.Fatalf("expected len(nodeScrapeDetails[\"label\"]) to be non-zero. Got 0.")
+		return
+	}
+}
+
 // Test Update func in DiagnosticsModule with missing metrics and test if diagnostics fail
 // Test Update func in DiagnosticsModule with missing metrics and test if diagnostics fail
 func TestDiagnosticsModule_UpdateWithMissingMetrics(t *testing.T) {
 func TestDiagnosticsModule_UpdateWithMissingMetrics(t *testing.T) {
 	mockUpdater := &MockUpdater{}
 	mockUpdater := &MockUpdater{}
@@ -80,7 +182,7 @@ func TestDiagnosticsModule_UpdateWithMissingMetrics(t *testing.T) {
 		t.Error("Expected node diagnostic to fail when metric is missing")
 		t.Error("Expected node diagnostic to fail when metric is missing")
 	}
 	}
 
 
-	kubecostDetails, err := module.DiagnosticsDetails(KubecostDiagnosticMetricID)
+	kubecostDetails, err := module.DiagnosticsDetails(OpencostDiagnosticMetricID)
 	if err != nil {
 	if err != nil {
 		t.Error("Expected no error for valid diagnostic ID")
 		t.Error("Expected no error for valid diagnostic ID")
 	}
 	}
@@ -96,7 +198,7 @@ func TestDiagnosticsModule_DiagnosticsDetails(t *testing.T) {
 
 
 	// Test with invalid diagnostic ID
 	// Test with invalid diagnostic ID
 	_, err := module.DiagnosticsDetails("invalid_id")
 	_, err := module.DiagnosticsDetails("invalid_id")
-	if err.Error() != "diagnostic ID: invalid_id not found" {
+	if err.Error() != "invalid diagnostic id: invalid_id not found" {
 		t.Error("Expected error for invalid diagnostic ID")
 		t.Error("Expected error for invalid diagnostic ID")
 	}
 	}
 
 

+ 74 - 0
modules/collector-source/pkg/scrape/clustercache.go

@@ -5,10 +5,12 @@ import (
 	"slices"
 	"slices"
 	"strings"
 	"strings"
 
 
+	"github.com/kubecost/events"
 	"github.com/opencost/opencost/core/pkg/clustercache"
 	"github.com/opencost/opencost/core/pkg/clustercache"
 	"github.com/opencost/opencost/core/pkg/log"
 	"github.com/opencost/opencost/core/pkg/log"
 	"github.com/opencost/opencost/core/pkg/source"
 	"github.com/opencost/opencost/core/pkg/source"
 	"github.com/opencost/opencost/core/pkg/util/promutil"
 	"github.com/opencost/opencost/core/pkg/util/promutil"
+	"github.com/opencost/opencost/modules/collector-source/pkg/event"
 	"github.com/opencost/opencost/modules/collector-source/pkg/metric"
 	"github.com/opencost/opencost/modules/collector-source/pkg/metric"
 	"github.com/opencost/opencost/modules/collector-source/pkg/util"
 	"github.com/opencost/opencost/modules/collector-source/pkg/util"
 	"golang.org/x/exp/maps"
 	"golang.org/x/exp/maps"
@@ -110,6 +112,14 @@ func (ccs *ClusterCacheScraper) scrapeNodes(nodes []*clustercache.Node) []metric
 		})
 		})
 
 
 	}
 	}
+
+	events.Dispatch(event.ScrapeEvent{
+		ScraperName: event.KubernetesClusterScraperName,
+		ScrapeType:  event.NodeScraperType,
+		Targets:     len(nodes),
+		Errors:      nil,
+	})
+
 	return scrapeResults
 	return scrapeResults
 }
 }
 
 
@@ -137,6 +147,14 @@ func (ccs *ClusterCacheScraper) scrapeDeployments(deployments []*clustercache.De
 			AdditionalInfo: deploymentLabels,
 			AdditionalInfo: deploymentLabels,
 		})
 		})
 	}
 	}
+
+	events.Dispatch(event.ScrapeEvent{
+		ScraperName: event.KubernetesClusterScraperName,
+		ScrapeType:  event.DeploymentScraperType,
+		Targets:     len(deployments),
+		Errors:      nil,
+	})
+
 	return scrapeResults
 	return scrapeResults
 }
 }
 
 
@@ -172,6 +190,14 @@ func (ccs *ClusterCacheScraper) scrapeNamespaces(namespaces []*clustercache.Name
 			AdditionalInfo: namespaceAnnotations,
 			AdditionalInfo: namespaceAnnotations,
 		})
 		})
 	}
 	}
+
+	events.Dispatch(event.ScrapeEvent{
+		ScraperName: event.KubernetesClusterScraperName,
+		ScrapeType:  event.NamespaceScraperType,
+		Targets:     len(namespaces),
+		Errors:      nil,
+	})
+
 	return scrapeResults
 	return scrapeResults
 }
 }
 
 
@@ -266,6 +292,14 @@ func (ccs *ClusterCacheScraper) scrapePods(pods []*clustercache.Pod) []metric.Up
 			}
 			}
 		}
 		}
 	}
 	}
+
+	events.Dispatch(event.ScrapeEvent{
+		ScraperName: event.KubernetesClusterScraperName,
+		ScrapeType:  event.PodScraperType,
+		Targets:     len(pods),
+		Errors:      nil,
+	})
+
 	return scrapeResults
 	return scrapeResults
 }
 }
 
 
@@ -298,6 +332,14 @@ func (ccs *ClusterCacheScraper) scrapePVCs(pvcs []*clustercache.PersistentVolume
 			})
 			})
 		}
 		}
 	}
 	}
+
+	events.Dispatch(event.ScrapeEvent{
+		ScraperName: event.KubernetesClusterScraperName,
+		ScrapeType:  event.PvcScraperType,
+		Targets:     len(pvcs),
+		Errors:      nil,
+	})
+
 	return scrapeResults
 	return scrapeResults
 }
 }
 
 
@@ -334,6 +376,14 @@ func (ccs *ClusterCacheScraper) scrapePVs(pvs []*clustercache.PersistentVolume)
 			})
 			})
 		}
 		}
 	}
 	}
+
+	events.Dispatch(event.ScrapeEvent{
+		ScraperName: event.KubernetesClusterScraperName,
+		ScrapeType:  event.PvScraperType,
+		Targets:     len(pvs),
+		Errors:      nil,
+	})
+
 	return scrapeResults
 	return scrapeResults
 }
 }
 
 
@@ -361,6 +411,14 @@ func (ccs *ClusterCacheScraper) scrapeServices(services []*clustercache.Service)
 		})
 		})
 
 
 	}
 	}
+
+	events.Dispatch(event.ScrapeEvent{
+		ScraperName: event.KubernetesClusterScraperName,
+		ScrapeType:  event.ServiceScraperType,
+		Targets:     len(services),
+		Errors:      nil,
+	})
+
 	return scrapeResults
 	return scrapeResults
 }
 }
 
 
@@ -387,6 +445,14 @@ func (ccs *ClusterCacheScraper) scrapeStatefulSets(statefulSets []*clustercache.
 			AdditionalInfo: statefulSetLabels,
 			AdditionalInfo: statefulSetLabels,
 		})
 		})
 	}
 	}
+
+	events.Dispatch(event.ScrapeEvent{
+		ScraperName: event.KubernetesClusterScraperName,
+		ScrapeType:  event.StatefulSetScraperType,
+		Targets:     len(statefulSets),
+		Errors:      nil,
+	})
+
 	return scrapeResults
 	return scrapeResults
 }
 }
 
 
@@ -414,6 +480,14 @@ func (ccs *ClusterCacheScraper) scrapeReplicaSets(replicaSets []*clustercache.Re
 			})
 			})
 		}
 		}
 	}
 	}
+
+	events.Dispatch(event.ScrapeEvent{
+		ScraperName: event.KubernetesClusterScraperName,
+		ScrapeType:  event.ReplicaSetScraperType,
+		Targets:     len(replicaSets),
+		Errors:      nil,
+	})
+
 	return scrapeResults
 	return scrapeResults
 }
 }
 
 

+ 2 - 0
modules/collector-source/pkg/scrape/dcgm.go

@@ -6,6 +6,7 @@ import (
 
 
 	"github.com/opencost/opencost/core/pkg/clustercache"
 	"github.com/opencost/opencost/core/pkg/clustercache"
 	"github.com/opencost/opencost/core/pkg/log"
 	"github.com/opencost/opencost/core/pkg/log"
+	"github.com/opencost/opencost/modules/collector-source/pkg/event"
 	"github.com/opencost/opencost/modules/collector-source/pkg/metric"
 	"github.com/opencost/opencost/modules/collector-source/pkg/metric"
 	"github.com/opencost/opencost/modules/collector-source/pkg/scrape/target"
 	"github.com/opencost/opencost/modules/collector-source/pkg/scrape/target"
 )
 )
@@ -19,6 +20,7 @@ func newDCGMScrapper(clusterCache clustercache.ClusterCache) Scraper {
 
 
 func newDCGMTargetScraper(provider target.TargetProvider) *TargetScraper {
 func newDCGMTargetScraper(provider target.TargetProvider) *TargetScraper {
 	return newTargetScrapper(
 	return newTargetScrapper(
+		event.DCGMScraperName,
 		provider,
 		provider,
 		[]string{
 		[]string{
 			metric.DCGMFIPROFGRENGINEACTIVE,
 			metric.DCGMFIPROFGRENGINEACTIVE,

+ 2 - 0
modules/collector-source/pkg/scrape/network.go

@@ -5,6 +5,7 @@ import (
 
 
 	"github.com/opencost/opencost/core/pkg/clustercache"
 	"github.com/opencost/opencost/core/pkg/clustercache"
 	"github.com/opencost/opencost/core/pkg/log"
 	"github.com/opencost/opencost/core/pkg/log"
+	"github.com/opencost/opencost/modules/collector-source/pkg/event"
 	"github.com/opencost/opencost/modules/collector-source/pkg/metric"
 	"github.com/opencost/opencost/modules/collector-source/pkg/metric"
 	"github.com/opencost/opencost/modules/collector-source/pkg/scrape/target"
 	"github.com/opencost/opencost/modules/collector-source/pkg/scrape/target"
 )
 )
@@ -19,6 +20,7 @@ func newNetworkScraper(
 
 
 func newNetworkTargetScraper(provider target.TargetProvider) *TargetScraper {
 func newNetworkTargetScraper(provider target.TargetProvider) *TargetScraper {
 	return newTargetScrapper(
 	return newTargetScrapper(
+		event.NetworkCostsScraperName,
 		provider,
 		provider,
 		[]string{
 		[]string{
 			metric.KubecostPodNetworkEgressBytesTotal,
 			metric.KubecostPodNetworkEgressBytesTotal,

+ 2 - 0
modules/collector-source/pkg/scrape/opencost.go

@@ -1,6 +1,7 @@
 package scrape
 package scrape
 
 
 import (
 import (
+	"github.com/opencost/opencost/modules/collector-source/pkg/event"
 	"github.com/opencost/opencost/modules/collector-source/pkg/metric"
 	"github.com/opencost/opencost/modules/collector-source/pkg/metric"
 	"github.com/opencost/opencost/modules/collector-source/pkg/scrape/target"
 	"github.com/opencost/opencost/modules/collector-source/pkg/scrape/target"
 )
 )
@@ -16,6 +17,7 @@ func newOpenCostScraper() Scraper {
 
 
 func newOpencostTargetScraper(provider target.TargetProvider) *TargetScraper {
 func newOpencostTargetScraper(provider target.TargetProvider) *TargetScraper {
 	return newTargetScrapper(
 	return newTargetScrapper(
+		event.OpenCostScraperName,
 		provider,
 		provider,
 		[]string{
 		[]string{
 			metric.KubecostClusterManagementCost,
 			metric.KubecostClusterManagementCost,

+ 1 - 0
modules/collector-source/pkg/scrape/scraper.go

@@ -5,6 +5,7 @@ import (
 )
 )
 
 
 type Scraper interface {
 type Scraper interface {
+	// Scrape performs the metrics scrape and returns a slice of `Update` instances to apply.
 	Scrape() []metric.Update
 	Scrape() []metric.Update
 }
 }
 
 

+ 23 - 0
modules/collector-source/pkg/scrape/statsummary.go

@@ -1,9 +1,11 @@
 package scrape
 package scrape
 
 
 import (
 import (
+	"github.com/kubecost/events"
 	"github.com/opencost/opencost/core/pkg/log"
 	"github.com/opencost/opencost/core/pkg/log"
 	"github.com/opencost/opencost/core/pkg/nodestats"
 	"github.com/opencost/opencost/core/pkg/nodestats"
 	"github.com/opencost/opencost/core/pkg/source"
 	"github.com/opencost/opencost/core/pkg/source"
+	"github.com/opencost/opencost/modules/collector-source/pkg/event"
 	"github.com/opencost/opencost/modules/collector-source/pkg/metric"
 	"github.com/opencost/opencost/modules/collector-source/pkg/metric"
 	stats "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
 	stats "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
 )
 )
@@ -21,7 +23,21 @@ func newStatSummaryScraper(client nodestats.StatSummaryClient) Scraper {
 func (s *StatSummaryScraper) Scrape() []metric.Update {
 func (s *StatSummaryScraper) Scrape() []metric.Update {
 	var scrapeResults []metric.Update
 	var scrapeResults []metric.Update
 	nodeStats, err := s.client.GetNodeData()
 	nodeStats, err := s.client.GetNodeData()
+
 	if err != nil {
 	if err != nil {
+		var errs []error
+		if multiErr, ok := err.(interface{ Unwrap() []error }); ok {
+			errs = multiErr.Unwrap()
+		} else {
+			errs = []error{err}
+		}
+
+		events.Dispatch(event.ScrapeEvent{
+			ScraperName: event.NodeStatsScraperName,
+			Targets:     len(nodeStats) + len(errs),
+			Errors:      errs,
+		})
+
 		log.Errorf("error retrieving node stat data: %s", err.Error())
 		log.Errorf("error retrieving node stat data: %s", err.Error())
 		return scrapeResults
 		return scrapeResults
 	}
 	}
@@ -135,6 +151,13 @@ func (s *StatSummaryScraper) Scrape() []metric.Update {
 			}
 			}
 		}
 		}
 	}
 	}
+
+	events.Dispatch(event.ScrapeEvent{
+		ScraperName: event.NodeStatsScraperName,
+		Targets:     len(nodeStats),
+		Errors:      []error{},
+	})
+
 	return scrapeResults
 	return scrapeResults
 }
 }
 
 

+ 29 - 2
modules/collector-source/pkg/scrape/targetscraper.go

@@ -1,24 +1,30 @@
 package scrape
 package scrape
 
 
 import (
 import (
+	"sync"
+
+	"github.com/kubecost/events"
 	"github.com/opencost/opencost/core/pkg/log"
 	"github.com/opencost/opencost/core/pkg/log"
+	"github.com/opencost/opencost/modules/collector-source/pkg/event"
 	"github.com/opencost/opencost/modules/collector-source/pkg/metric"
 	"github.com/opencost/opencost/modules/collector-source/pkg/metric"
 	"github.com/opencost/opencost/modules/collector-source/pkg/scrape/parser"
 	"github.com/opencost/opencost/modules/collector-source/pkg/scrape/parser"
 	"github.com/opencost/opencost/modules/collector-source/pkg/scrape/target"
 	"github.com/opencost/opencost/modules/collector-source/pkg/scrape/target"
 )
 )
 
 
 type TargetScraper struct {
 type TargetScraper struct {
+	name           string // identifier for the scraper
 	targetProvider target.TargetProvider
 	targetProvider target.TargetProvider
 	metricNames    map[string]struct{} // filter for which metrics will be processed
 	metricNames    map[string]struct{} // filter for which metrics will be processed
 	includeMetrics bool                // toggle to make metrics an include or exclude list
 	includeMetrics bool                // toggle to make metrics an include or exclude list
 }
 }
 
 
-func newTargetScrapper(provider target.TargetProvider, metricNames []string, includeMetrics bool) *TargetScraper {
+func newTargetScrapper(name string, provider target.TargetProvider, metricNames []string, includeMetrics bool) *TargetScraper {
 	metricSet := make(map[string]struct{})
 	metricSet := make(map[string]struct{})
 	for _, metricName := range metricNames {
 	for _, metricName := range metricNames {
 		metricSet[metricName] = struct{}{}
 		metricSet[metricName] = struct{}{}
 	}
 	}
 	return &TargetScraper{
 	return &TargetScraper{
+		name:           name,
 		targetProvider: provider,
 		targetProvider: provider,
 		metricNames:    metricSet,
 		metricNames:    metricSet,
 		includeMetrics: includeMetrics,
 		includeMetrics: includeMetrics,
@@ -27,6 +33,10 @@ func newTargetScrapper(provider target.TargetProvider, metricNames []string, inc
 
 
 func (s *TargetScraper) Scrape() []metric.Update {
 func (s *TargetScraper) Scrape() []metric.Update {
 	targets := s.targetProvider.GetTargets()
 	targets := s.targetProvider.GetTargets()
+
+	var errLock sync.Mutex
+	var errors []error
+
 	var scrapeFuncs []ScrapeFunc
 	var scrapeFuncs []ScrapeFunc
 	for i := range targets {
 	for i := range targets {
 		target := targets[i]
 		target := targets[i]
@@ -34,11 +44,19 @@ func (s *TargetScraper) Scrape() []metric.Update {
 			var scrapeResults []metric.Update
 			var scrapeResults []metric.Update
 			f, err := target.Load()
 			f, err := target.Load()
 			if err != nil {
 			if err != nil {
+				errLock.Lock()
+				errors = append(errors, err)
+				errLock.Unlock()
+
 				log.Errorf("failed to scrape target: %s", err.Error())
 				log.Errorf("failed to scrape target: %s", err.Error())
 				return scrapeResults
 				return scrapeResults
 			}
 			}
 			results, err := parser.Parse(f)
 			results, err := parser.Parse(f)
 			if err != nil {
 			if err != nil {
+				errLock.Lock()
+				errors = append(errors, err)
+				errLock.Unlock()
+
 				log.Errorf("failed to parse target: %s", err.Error())
 				log.Errorf("failed to parse target: %s", err.Error())
 				return scrapeResults
 				return scrapeResults
 			}
 			}
@@ -58,5 +76,14 @@ func (s *TargetScraper) Scrape() []metric.Update {
 		scrapeFuncs = append(scrapeFuncs, fn)
 		scrapeFuncs = append(scrapeFuncs, fn)
 	}
 	}
 
 
-	return concurrentScrape(scrapeFuncs...)
+	updates := concurrentScrape(scrapeFuncs...)
+
+	// dispatch a scrape event for this specific scrape
+	events.Dispatch(event.ScrapeEvent{
+		ScraperName: s.name,
+		Targets:     len(targets),
+		Errors:      errors,
+	})
+
+	return updates
 }
 }