ソースを参照

Added an application name to the export controllers for diagnostics and heartbeat.

Matt Bolt 1 年間 前
コミット
8d31918c4d

+ 3 - 0
core/pkg/diagnostics/diagnostics.go

@@ -82,4 +82,7 @@ type DiagnosticService interface {
 
 	// Diagnostics returns a list of all registered diagnostics.
 	Diagnostics() []Diagnostic
+
+	// Total returns the total number of registered diagnostics.
+	Total() int
 }

+ 2 - 2
core/pkg/diagnostics/exporter/controller.go

@@ -7,9 +7,9 @@ import (
 )
 
 // NewDiagnosticsExportController creates a new EventExportController for DiagnosticsRunReport events.
-func NewDiagnosticsExportController(clusterId string, store storage.Storage, service diagnostics.DiagnosticService) *exporter.EventExportController[diagnostics.DiagnosticsRunReport] {
+func NewDiagnosticsExportController(clusterId string, applicationName string, store storage.Storage, service diagnostics.DiagnosticService) *exporter.EventExportController[diagnostics.DiagnosticsRunReport] {
 	return exporter.NewEventExportController(
 		NewDiagnosticSource(service),
-		NewDiagnosticExporter(clusterId, store),
+		NewDiagnosticExporter(clusterId, applicationName, store),
 	)
 }

+ 2 - 2
core/pkg/diagnostics/exporter/exporter.go

@@ -9,8 +9,8 @@ import (
 )
 
 // NewDiagnosticExporter creates a new `StorageExporter[DiagnosticsRunReport]` instance for exporting diagnostic run events.
-func NewDiagnosticExporter(clusterId string, storage storage.Storage) *exporter.StorageExporter[diagnostics.DiagnosticsRunReport] {
-	pathing, err := pathing.NewEventStoragePathFormatter("", clusterId, diagnostics.DiagnosticsEventName)
+func NewDiagnosticExporter(clusterId string, applicationName string, storage storage.Storage) *exporter.StorageExporter[diagnostics.DiagnosticsRunReport] {
+	pathing, err := pathing.NewEventStoragePathFormatter("", clusterId, diagnostics.DiagnosticsEventName, applicationName)
 	if err != nil {
 		log.Errorf("failed to create pathing formatter: %v", err)
 		return nil

+ 5 - 0
core/pkg/diagnostics/exporter/source.go

@@ -24,6 +24,11 @@ func NewDiagnosticSource(diagnosticService diagnostics.DiagnosticService) *Diagn
 func (ds *DiagnosticSource) Make(t time.Time) *diagnostics.DiagnosticsRunReport {
 	ctx := context.Background()
 
+	// returning nil will prevent export -- skip for 0 registered diagnostics
+	if ds.diagnosticService.Total() == 0 {
+		return nil
+	}
+
 	return &diagnostics.DiagnosticsRunReport{
 		StartTime: t,
 		Results:   ds.diagnosticService.Run(ctx),

+ 14 - 0
core/pkg/diagnostics/service.go

@@ -25,11 +25,13 @@ type runner struct {
 type OpencostDiagnosticService struct {
 	lock    sync.RWMutex
 	runners map[string]map[string]*runner
+	count   int
 }
 
 func NewDiagnosticService() DiagnosticService {
 	return &OpencostDiagnosticService{
 		runners: make(map[string]map[string]*runner),
+		count:   0,
 	}
 }
 
@@ -58,6 +60,8 @@ func (ocds *OpencostDiagnosticService) Register(name string, description string,
 		run: r,
 	}
 
+	ocds.count += 1
+
 	return nil
 }
 
@@ -81,6 +85,8 @@ func (ocds *OpencostDiagnosticService) Unregister(name string, category string)
 		delete(ocds.runners, category)
 	}
 
+	ocds.count -= 1
+
 	return true
 }
 
@@ -171,3 +177,11 @@ func (ocds *OpencostDiagnosticService) Diagnostics() []Diagnostic {
 
 	return slices.Collect(diagnostics)
 }
+
+// Total returns the total number of registered diagnostics.
+func (ocds *OpencostDiagnosticService) Total() int {
+	ocds.lock.RLock()
+	defer ocds.lock.RUnlock()
+
+	return ocds.count
+}

+ 8 - 2
core/pkg/exporter/controller.go

@@ -69,10 +69,16 @@ func (cd *EventExportController[T]) Start(interval time.Duration) bool {
 			case <-time.After(interval):
 			}
 
-			// truncate the time to the minute to ensure broad enough coverage for event exports
+			// truncate the time to the second to ensure broad enough coverage for event exports
 			t := time.Now().UTC().Truncate(time.Second)
 
-			err := cd.exporter.Export(cd.source.Make(t))
+			evt := cd.source.Make(t)
+			if evt == nil {
+				log.Debugf("[%s] No event data to export", cd.typeName)
+				continue
+			}
+
+			err := cd.exporter.Export(evt)
 			if err != nil {
 				log.Warnf("[%s] Error during Write: %s", cd.typeName, err)
 			}

+ 11 - 2
core/pkg/exporter/pathing/eventpath.go

@@ -14,17 +14,18 @@ const EventStorageTimeFormat = "20060102150405"
 // EventStoragePathFormatter is an implementation of the StoragePathFormatter interface for
 // a cluster separated storage path of the format:
 //
-//	<root>/federated/<cluster>/<event>/YYYYMMDDHHmmss
+//	<root>/federated/<cluster>/<event>/<sub-paths...>/YYYYMMDDHHmmss
 type EventStoragePathFormatter struct {
 	rootDir   string
 	clusterId string
 	event     string
+	subPaths  []string
 }
 
 // NewBingenStoragePathFormatter creates a StoragePathFormatter for a cluster separated storage path
 // with the given root directory, cluster id, pipeline, and resolution. To omit the resolution directory
 // structure, provide a `nil` resolution.
-func NewEventStoragePathFormatter(rootDir, clusterId, event string) (StoragePathFormatter[time.Time], error) {
+func NewEventStoragePathFormatter(rootDir, clusterId, event string, subPaths ...string) (StoragePathFormatter[time.Time], error) {
 	if clusterId == "" {
 		return nil, fmt.Errorf("cluster id cannot be empty")
 	}
@@ -33,10 +34,17 @@ func NewEventStoragePathFormatter(rootDir, clusterId, event string) (StoragePath
 		return nil, fmt.Errorf("event cannot be empty")
 	}
 
+	for _, subPath := range subPaths {
+		if subPath == "" {
+			return nil, fmt.Errorf("subpaths cannot be empty")
+		}
+	}
+
 	return &EventStoragePathFormatter{
 		rootDir:   rootDir,
 		clusterId: clusterId,
 		event:     event,
+		subPaths:  subPaths,
 	}, nil
 }
 
@@ -56,6 +64,7 @@ func (espf *EventStoragePathFormatter) ToFullPath(prefix string, timestamp time.
 		federatedDir,
 		espf.clusterId,
 		espf.event,
+		path.Join(espf.subPaths...),
 		fileName,
 	)
 }

+ 9 - 3
core/pkg/heartbeat/exporter/controller.go

@@ -8,9 +8,15 @@ import (
 
 // NewHeartbeatExportController creates a new EventExportController for Heartbeat events.
 // A HeartbeatMetadataProvider can optionally be provided to append metadata to the Heartbeat payload.
-func NewHeartbeatExportController(clusterId string, store storage.Storage, provider HeartbeatMetadataProvider) *exporter.EventExportController[heartbeat.Heartbeat] {
+func NewHeartbeatExportController(
+	clusterId string,
+	applicationName string,
+	version string,
+	store storage.Storage,
+	provider HeartbeatMetadataProvider,
+) *exporter.EventExportController[heartbeat.Heartbeat] {
 	return exporter.NewEventExportController(
-		NewHeartbeatSource(provider),
-		NewHeartbeatExporter(clusterId, store),
+		NewHeartbeatSource(applicationName, version, provider),
+		NewHeartbeatExporter(clusterId, applicationName, store),
 	)
 }

+ 2 - 2
core/pkg/heartbeat/exporter/exporter.go

@@ -9,8 +9,8 @@ import (
 )
 
 // NewHeartbeatExporter creates a new `StorageExporter[Heartbeat]` instance for exporting Heartbeat events.
-func NewHeartbeatExporter(clusterId string, storage storage.Storage) *exporter.StorageExporter[heartbeat.Heartbeat] {
-	pathing, err := pathing.NewEventStoragePathFormatter("", clusterId, heartbeat.HeartbeatEventName)
+func NewHeartbeatExporter(clusterId string, applicationName string, storage storage.Storage) *exporter.StorageExporter[heartbeat.Heartbeat] {
+	pathing, err := pathing.NewEventStoragePathFormatter("", clusterId, heartbeat.HeartbeatEventName, applicationName)
 	if err != nil {
 		log.Errorf("failed to create pathing formatter: %v", err)
 		return nil

+ 9 - 4
core/pkg/heartbeat/exporter/heartbeat_test.go

@@ -3,6 +3,7 @@ package exporter
 import (
 	"encoding/json"
 	"fmt"
+	"path"
 	"path/filepath"
 	"slices"
 	"testing"
@@ -13,7 +14,11 @@ import (
 	"github.com/opencost/opencost/core/pkg/util/sliceutil"
 )
 
-const MockClusterId = "mock-cluster-1"
+const (
+	MockClusterId       = "mock-cluster-1"
+	MockApplicationName = "mock-agent"
+	MockVersion         = "1.0.0"
+)
 
 type MockHeartbeatMetadataProvider struct{}
 
@@ -31,7 +36,7 @@ func TestHeartbeatExporter(t *testing.T) {
 	mdp := NewMockHeartbeatMetadataProvider()
 	store := storage.NewMemoryStorage()
 
-	controller := NewHeartbeatExportController(MockClusterId, store, mdp)
+	controller := NewHeartbeatExportController(MockClusterId, MockApplicationName, MockVersion, store, mdp)
 
 	if !controller.Start(time.Second) {
 		t.Fatal("Failed to start controller")
@@ -40,7 +45,7 @@ func TestHeartbeatExporter(t *testing.T) {
 	time.Sleep(10 * time.Second)
 	controller.Stop()
 
-	files, _ := store.List("federated/mock-cluster-1/heartbeat")
+	files, _ := store.List(path.Join("federated", MockClusterId, heartbeat.HeartbeatEventName, MockApplicationName))
 	if len(files) == 0 {
 		t.Fatal("No files found in storage")
 	}
@@ -54,7 +59,7 @@ func TestHeartbeatExporter(t *testing.T) {
 	lastCheck := time.Time{}
 
 	for _, f := range fileNames {
-		fpath := filepath.Join("federated", MockClusterId, "heartbeat", f)
+		fpath := filepath.Join("federated", MockClusterId, "heartbeat", MockApplicationName, f)
 		data, err := store.Read(fpath)
 		if err != nil {
 			t.Fatalf("Failed to read file %s: %v", fpath, err)

+ 7 - 12
core/pkg/heartbeat/exporter/source.go

@@ -6,8 +6,6 @@ import (
 	"github.com/google/uuid"
 	"github.com/opencost/opencost/core/pkg/clusters"
 	"github.com/opencost/opencost/core/pkg/heartbeat"
-	"github.com/opencost/opencost/core/pkg/log"
-	"github.com/opencost/opencost/core/pkg/version"
 )
 
 // HeartbeatMetadataProvider is an interface that provides metadata for heartbeat instances. It can be used to inject
@@ -48,36 +46,33 @@ func (c *ClusterInfoMetadataProvider) GetMetadata() map[string]any {
 // leverages a `HeartbeatMetadataProvider` to inject custom metadata.
 type HeartbeatSource struct {
 	startTime        time.Time
+	applicationName  string
+	version          string
 	metadataProvider HeartbeatMetadataProvider
 }
 
 // NewHeartbeatSource creates a new `HeartbeatSource` instance. The `provider` parameter is used to inject custom metadata,
 // but can be set to `nil` if no metadata is needed.
-func NewHeartbeatSource(provider HeartbeatMetadataProvider) *HeartbeatSource {
+func NewHeartbeatSource(applicationName string, version string, provider HeartbeatMetadataProvider) *HeartbeatSource {
 	return &HeartbeatSource{
 		startTime:        time.Now().UTC(),
+		applicationName:  applicationName,
+		version:          version,
 		metadataProvider: provider,
 	}
 }
 
 // Make creates a new `Heartbeat` instance with the provided current time.
 func (h *HeartbeatSource) Make(t time.Time) *heartbeat.Heartbeat {
-	uid, err := uuid.NewV7()
-	if err != nil {
-		log.Warnf("failed to generate v7 UUID, replacing with UUID v4: %s", err)
-		uid = uuid.New()
-	}
-
-	id := uid.String()
+	id := uuid.Must(uuid.NewV7()).String()
 	uptime := uint64(t.Sub(h.startTime).Minutes())
-	v := version.FriendlyVersion()
 
 	var metadata map[string]any
 	if h.metadataProvider != nil {
 		metadata = h.metadataProvider.GetMetadata()
 	}
 
-	return heartbeat.NewHeartbeat(id, t, uptime, v, metadata)
+	return heartbeat.NewHeartbeat(id, t, uptime, h.applicationName, h.version, metadata)
 }
 
 func (h *HeartbeatSource) Name() string {

+ 13 - 11
core/pkg/heartbeat/heartbeat.go

@@ -9,11 +9,12 @@ const HeartbeatEventName string = "heartbeat"
 
 // Heartbeat is a payload struct that contains custom information and the timestamp of the heartbeat.
 type Heartbeat struct {
-	Id        string         `json:"id"`
-	Timestamp time.Time      `json:"timestamp"`
-	Uptime    uint64         `json:"uptime"`
-	Version   string         `json:"version"`
-	Metadata  map[string]any `json:"metadata,omitempty"`
+	Id          string         `json:"id"`
+	Timestamp   time.Time      `json:"timestamp"`
+	Uptime      uint64         `json:"uptime"`
+	Application string         `json:"application"`
+	Version     string         `json:"version"`
+	Metadata    map[string]any `json:"metadata,omitempty"`
 }
 
 // NewHeartbeat creates a new Heartbeat instance with the provided parameters.
@@ -21,12 +22,13 @@ type Heartbeat struct {
 // `uptime` is the uptime in seconds, `version` is the version of the heartbeat, and `metadata`
 // is a pointer to a generic type that can hold any additional information. Metadata _can_ be omitted
 // by passing `nil`.
-func NewHeartbeat(id string, timestamp time.Time, uptime uint64, version string, metadata map[string]any) *Heartbeat {
+func NewHeartbeat(id string, timestamp time.Time, uptime uint64, application string, version string, metadata map[string]any) *Heartbeat {
 	return &Heartbeat{
-		Id:        id,
-		Timestamp: timestamp,
-		Uptime:    uptime,
-		Version:   version,
-		Metadata:  metadata,
+		Id:          id,
+		Timestamp:   timestamp,
+		Uptime:      uptime,
+		Application: application,
+		Version:     version,
+		Metadata:    metadata,
 	}
 }