service.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. package diagnostics
  2. import (
  3. "context"
  4. "fmt"
  5. "iter"
  6. "maps"
  7. "slices"
  8. "sync"
  9. "time"
  10. "github.com/google/uuid"
  11. "github.com/opencost/opencost/core/pkg/util/maputil"
  12. "github.com/opencost/opencost/core/pkg/util/worker"
  13. )
  14. // basic composite type for diagnostics and the runner function
  15. type runner struct {
  16. diagnostic Diagnostic
  17. run DiagnosticRunner
  18. }
  19. // OpencostDiagnosticsService is an implementation of the `DiagnosticService` contract that provides concurrent diagnostic
  20. // execution and result collection.
  21. type OpencostDiagnosticService struct {
  22. lock sync.RWMutex
  23. runners map[string]map[string]*runner
  24. }
  25. func NewDiagnosticService() DiagnosticService {
  26. return &OpencostDiagnosticService{
  27. runners: make(map[string]map[string]*runner),
  28. }
  29. }
  30. // Register registers a new diagnostic runner implementation with the service that will run the next time diagnostics are requested.
  31. // An error is returned if a runner failed to register. Note that category _and_ name must be a unique combination.
  32. func (ocds *OpencostDiagnosticService) Register(name string, description string, category string, r DiagnosticRunner) error {
  33. ocds.lock.Lock()
  34. defer ocds.lock.Unlock()
  35. categoryRunners, exists := ocds.runners[category]
  36. if !exists {
  37. categoryRunners = make(map[string]*runner)
  38. ocds.runners[category] = categoryRunners
  39. }
  40. if _, exists := categoryRunners[name]; exists {
  41. return fmt.Errorf("runner with name %s already exists in category %s", name, category)
  42. }
  43. categoryRunners[name] = &runner{
  44. diagnostic: Diagnostic{
  45. Name: name,
  46. Description: description,
  47. Category: category,
  48. },
  49. run: r,
  50. }
  51. return nil
  52. }
  53. // Unregister unregisters a diagnostic runner implementation with the service. True is returned if the runner was unregistered successfully,
  54. // false otherwise.
  55. func (ocds *OpencostDiagnosticService) Unregister(name string, category string) bool {
  56. ocds.lock.Lock()
  57. defer ocds.lock.Unlock()
  58. categoryRunners, exists := ocds.runners[category]
  59. if !exists {
  60. return false
  61. }
  62. if _, exists := categoryRunners[name]; !exists {
  63. return false
  64. }
  65. delete(categoryRunners, name)
  66. if len(categoryRunners) == 0 {
  67. delete(ocds.runners, category)
  68. }
  69. return true
  70. }
  71. // Run executes all registered diagnostics and returns the results.
  72. func (ocds *OpencostDiagnosticService) Run(ctx context.Context) []*DiagnosticResult {
  73. ocds.lock.RLock()
  74. defer ocds.lock.RUnlock()
  75. return runAll(ctx, maputil.Flatten(ocds.runners))
  76. }
  77. // RunCategory executes all registered diagnostics in the provided category.
  78. func (ocds *OpencostDiagnosticService) RunCategory(ctx context.Context, category string) []*DiagnosticResult {
  79. ocds.lock.RLock()
  80. defer ocds.lock.RUnlock()
  81. categoryRunners, exists := ocds.runners[category]
  82. if !exists {
  83. return nil
  84. }
  85. return runAll(ctx, maps.Values(categoryRunners))
  86. }
  87. // RunDiagnostic executes a specific diagnostic by category and name. If the diagnostic does not exist, nil is returned.
  88. func (ocds *OpencostDiagnosticService) RunDiagnostic(ctx context.Context, category, name string) *DiagnosticResult {
  89. ocds.lock.RLock()
  90. defer ocds.lock.RUnlock()
  91. categoryRunners, exists := ocds.runners[category]
  92. if !exists {
  93. return nil
  94. }
  95. r, exists := categoryRunners[name]
  96. if !exists {
  97. return nil
  98. }
  99. diagRunner := diagRunnerFor(ctx)
  100. return diagRunner(r)
  101. }
  102. // runAll executes all runners in the provided iterator with a specific worker pool size,
  103. // and returns the results when all diagnostic runners have completed.
  104. func runAll(ctx context.Context, runners iter.Seq[*runner]) []*DiagnosticResult {
  105. allContext, cancel := context.WithCancel(ctx)
  106. defer cancel()
  107. return worker.ConcurrentIterCollect(5, diagRunnerFor(allContext), runners)
  108. }
  109. // diagRunnerFor returns a diagnostic runner function that executes the diagnostic and creates the DiagnosticResult
  110. // leveraging the provided context as a parent.
  111. func diagRunnerFor(ctx context.Context) func(*runner) *DiagnosticResult {
  112. return func(r *runner) *DiagnosticResult {
  113. result := &DiagnosticResult{
  114. ID: uuid.Must(uuid.NewV7()).String(),
  115. Name: r.diagnostic.Name,
  116. Description: r.diagnostic.Description,
  117. Category: r.diagnostic.Category,
  118. }
  119. c, cancelDiag := context.WithTimeout(ctx, 5*time.Second)
  120. defer cancelDiag()
  121. details, err := r.run(c)
  122. if err != nil {
  123. result.Error = err.Error()
  124. } else {
  125. result.Details = details
  126. }
  127. result.Timestamp = time.Now().UTC()
  128. return result
  129. }
  130. }
  131. // Diagnostics returns a list of all registered diagnostics.
  132. func (ocds *OpencostDiagnosticService) Diagnostics() []Diagnostic {
  133. ocds.lock.RLock()
  134. defer ocds.lock.RUnlock()
  135. diagnostics := maputil.FlatMap(ocds.runners, func(r *runner) Diagnostic {
  136. return r.diagnostic
  137. })
  138. return slices.Collect(diagnostics)
  139. }