service.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. package diagnostics
  2. import (
  3. "context"
  4. "fmt"
  5. "iter"
  6. "maps"
  7. "slices"
  8. "sync"
  9. "time"
  10. "github.com/google/uuid"
  11. "github.com/opencost/opencost/core/pkg/util/maputil"
  12. "github.com/opencost/opencost/core/pkg/util/worker"
  13. )
  14. // basic composite type for diagnostics and the runner function
  15. type runner struct {
  16. diagnostic Diagnostic
  17. run DiagnosticRunner
  18. }
  19. // OpencostDiagnosticsService is an implementation of the `DiagnosticService` contract that provides concurrent diagnostic
  20. // execution and result collection.
  21. type OpencostDiagnosticService struct {
  22. lock sync.RWMutex
  23. runners map[string]map[string]*runner
  24. count int
  25. }
  26. func NewDiagnosticService() DiagnosticService {
  27. return &OpencostDiagnosticService{
  28. runners: make(map[string]map[string]*runner),
  29. count: 0,
  30. }
  31. }
  32. // Register registers a new diagnostic runner implementation with the service that will run the next time diagnostics are requested.
  33. // An error is returned if a runner failed to register. Note that category _and_ name must be a unique combination.
  34. func (ocds *OpencostDiagnosticService) Register(name string, description string, category string, r DiagnosticRunner) error {
  35. ocds.lock.Lock()
  36. defer ocds.lock.Unlock()
  37. categoryRunners, exists := ocds.runners[category]
  38. if !exists {
  39. categoryRunners = make(map[string]*runner)
  40. ocds.runners[category] = categoryRunners
  41. }
  42. if _, exists := categoryRunners[name]; exists {
  43. return fmt.Errorf("runner with name %s already exists in category %s", name, category)
  44. }
  45. categoryRunners[name] = &runner{
  46. diagnostic: Diagnostic{
  47. Name: name,
  48. Description: description,
  49. Category: category,
  50. },
  51. run: r,
  52. }
  53. ocds.count += 1
  54. return nil
  55. }
  56. // Unregister unregisters a diagnostic runner implementation with the service. True is returned if the runner was unregistered successfully,
  57. // false otherwise.
  58. func (ocds *OpencostDiagnosticService) Unregister(name string, category string) bool {
  59. ocds.lock.Lock()
  60. defer ocds.lock.Unlock()
  61. categoryRunners, exists := ocds.runners[category]
  62. if !exists {
  63. return false
  64. }
  65. if _, exists := categoryRunners[name]; !exists {
  66. return false
  67. }
  68. delete(categoryRunners, name)
  69. if len(categoryRunners) == 0 {
  70. delete(ocds.runners, category)
  71. }
  72. ocds.count -= 1
  73. return true
  74. }
  75. // Run executes all registered diagnostics and returns the results.
  76. func (ocds *OpencostDiagnosticService) Run(ctx context.Context) []*DiagnosticResult {
  77. ocds.lock.RLock()
  78. defer ocds.lock.RUnlock()
  79. return runAll(ctx, maputil.Flatten(ocds.runners))
  80. }
  81. // RunCategory executes all registered diagnostics in the provided category.
  82. func (ocds *OpencostDiagnosticService) RunCategory(ctx context.Context, category string) []*DiagnosticResult {
  83. ocds.lock.RLock()
  84. defer ocds.lock.RUnlock()
  85. categoryRunners, exists := ocds.runners[category]
  86. if !exists {
  87. return nil
  88. }
  89. return runAll(ctx, maps.Values(categoryRunners))
  90. }
  91. // RunDiagnostic executes a specific diagnostic by category and name. If the diagnostic does not exist, nil is returned.
  92. func (ocds *OpencostDiagnosticService) RunDiagnostic(ctx context.Context, category, name string) *DiagnosticResult {
  93. ocds.lock.RLock()
  94. defer ocds.lock.RUnlock()
  95. categoryRunners, exists := ocds.runners[category]
  96. if !exists {
  97. return nil
  98. }
  99. r, exists := categoryRunners[name]
  100. if !exists {
  101. return nil
  102. }
  103. diagRunner := diagRunnerFor(ctx)
  104. return diagRunner(r)
  105. }
  106. // runAll executes all runners in the provided iterator with a specific worker pool size,
  107. // and returns the results when all diagnostic runners have completed.
  108. func runAll(ctx context.Context, runners iter.Seq[*runner]) []*DiagnosticResult {
  109. allContext, cancel := context.WithCancel(ctx)
  110. defer cancel()
  111. return worker.ConcurrentIterCollect(5, diagRunnerFor(allContext), runners)
  112. }
  113. // diagRunnerFor returns a diagnostic runner function that executes the diagnostic and creates the DiagnosticResult
  114. // leveraging the provided context as a parent.
  115. func diagRunnerFor(ctx context.Context) func(*runner) *DiagnosticResult {
  116. return func(r *runner) *DiagnosticResult {
  117. result := &DiagnosticResult{
  118. ID: uuid.Must(uuid.NewV7()).String(),
  119. Name: r.diagnostic.Name,
  120. Description: r.diagnostic.Description,
  121. Category: r.diagnostic.Category,
  122. }
  123. c, cancelDiag := context.WithTimeout(ctx, 5*time.Second)
  124. defer cancelDiag()
  125. details, err := r.run(c)
  126. if err != nil {
  127. result.Error = err.Error()
  128. } else {
  129. result.Details = details
  130. }
  131. result.Timestamp = time.Now().UTC()
  132. return result
  133. }
  134. }
  135. // Diagnostics returns a list of all registered diagnostics.
  136. func (ocds *OpencostDiagnosticService) Diagnostics() []Diagnostic {
  137. ocds.lock.RLock()
  138. defer ocds.lock.RUnlock()
  139. diagnostics := maputil.FlatMap(ocds.runners, func(r *runner) Diagnostic {
  140. return r.diagnostic
  141. })
  142. return slices.Collect(diagnostics)
  143. }
  144. // Total returns the total number of registered diagnostics.
  145. func (ocds *OpencostDiagnosticService) Total() int {
  146. ocds.lock.RLock()
  147. defer ocds.lock.RUnlock()
  148. return ocds.count
  149. }