diagnostics.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522
  1. package metric
  2. import (
  3. "fmt"
  4. "maps"
  5. "sync"
  6. "time"
  7. "github.com/kubecost/events"
  8. "github.com/opencost/opencost/core/pkg/collections"
  9. "github.com/opencost/opencost/core/pkg/util/sliceutil"
  10. "github.com/opencost/opencost/modules/collector-source/pkg/event"
  11. )
  12. // Collector Metric Diagnostic IDs
  13. const (
  14. // OpencostDiagnosticMetricID is the identifier for the metric used to determine if Opencost metrics are being updated
  15. OpencostDiagnosticMetricID = "opencostMetric"
  16. // NodesDiagnosticMetricID is the identifier for the query used to determine if the node CPU cores capacity is being updated
  17. NodesDiagnosticMetricID = "nodesCPUMetrics"
  18. // DcgmScraperDiagnosticID contains the identifier for the the DCGM scraper diagnostic.
  19. DcgmScraperDiagnosticID = event.DCGMScraperName
  20. // OpenCostScraperDiagnosticID contains the identifier for the the opencost metrics scraper diagnostic
  21. OpenCostScraperDiagnosticID = event.OpenCostScraperName
  22. // NodeStatsScraperDiagnosticID contains the identifier for the the node stats summary scraper diagnostic
  23. NodeStatsScraperDiagnosticID = event.NodeStatsScraperName
  24. // NetworkCostsScraperDiagnosticID contains the identifier for the the network-costs scraper diagnostic.
  25. NetworkCostsScraperDiagnosticID = event.NetworkCostsScraperName
  26. // Kubernetes scrapers contains the identifiers for all the specific KubernetesCluster scrapers.
  27. KubernetesNodesScraperDiagnosticID = event.KubernetesClusterScraperName + "-" + event.NodeScraperType
  28. KubernetesNamespacesScraperDiagnosticID = event.KubernetesClusterScraperName + "-" + event.NamespaceScraperType
  29. KubernetesReplicaSetsScraperDiagnosticID = event.KubernetesClusterScraperName + "-" + event.ReplicaSetScraperType
  30. KubernetesDeploymentsScraperDiagnosticID = event.KubernetesClusterScraperName + "-" + event.DeploymentScraperType
  31. KubernetesStatefulSetsScraperDiagnosticID = event.KubernetesClusterScraperName + "-" + event.StatefulSetScraperType
  32. KubernetesServicesScraperDiagnosticID = event.KubernetesClusterScraperName + "-" + event.ServiceScraperType
  33. KubernetesPodsScraperDiagnosticID = event.KubernetesClusterScraperName + "-" + event.PodScraperType
  34. KubernetesPvsScraperDiagnosticID = event.KubernetesClusterScraperName + "-" + event.PvScraperType
  35. KubernetesPvcsScraperDiagnosticID = event.KubernetesClusterScraperName + "-" + event.PvcScraperType
  36. )
  37. // DiagnosticType is used in the definitions to determine which type of implementation to use when representing the
  38. // diagnostic
  39. type DiagnosticType int
  40. const (
  41. DiagnosticTypeMetric DiagnosticType = 0
  42. DiagnosticTypeScraper DiagnosticType = 1
  43. // more diagnostic types?
  44. )
  45. // diagnostic defintion is the type used to define a deterministic list of specific diagnostics we _expect_ to collect
  46. type diagnosticDefinition struct {
  47. ID string
  48. MetricName string
  49. Label string
  50. Description string
  51. DocLink string
  52. DiagType DiagnosticType
  53. }
  54. // diagnostic definitions mapping holds all of the diagnostic definitions that can be used for collector metrics diagnostics
  55. var diagnosticDefinitions map[string]*diagnosticDefinition = map[string]*diagnosticDefinition{
  56. NodesDiagnosticMetricID: {
  57. ID: NodesDiagnosticMetricID,
  58. MetricName: KubeNodeStatusCapacityCPUCores,
  59. Label: "Node CPU cores capacity is being scraped",
  60. Description: "Determine if the node CPU cores capacity metrics are being updated",
  61. DiagType: DiagnosticTypeMetric,
  62. },
  63. OpencostDiagnosticMetricID: {
  64. ID: OpencostDiagnosticMetricID,
  65. MetricName: NodeTotalHourlyCost,
  66. Label: "Opencost metrics for a node are being scraped",
  67. Description: "Determine if opencost metrics for a node are being updated",
  68. DiagType: DiagnosticTypeMetric,
  69. },
  70. DcgmScraperDiagnosticID: {
  71. ID: DcgmScraperDiagnosticID,
  72. MetricName: event.DCGMScraperName,
  73. Label: "DCGM scraper is available and is being scraped.",
  74. Description: scraperDiagnosticDescriptionFor(event.DCGMScraperName, ""),
  75. DiagType: DiagnosticTypeScraper,
  76. },
  77. OpenCostScraperDiagnosticID: {
  78. ID: OpenCostScraperDiagnosticID,
  79. MetricName: event.OpenCostScraperName,
  80. Label: "Opencost metrics scraper is available and is being scraped.",
  81. Description: scraperDiagnosticDescriptionFor(event.OpenCostScraperName, ""),
  82. DiagType: DiagnosticTypeScraper,
  83. },
  84. NodeStatsScraperDiagnosticID: {
  85. ID: NodeStatsScraperDiagnosticID,
  86. MetricName: event.NodeStatsScraperName,
  87. Label: "Node stats summary scraper is available and is being scraped.",
  88. Description: scraperDiagnosticDescriptionFor(event.NodeStatsScraperName, ""),
  89. DiagType: DiagnosticTypeScraper,
  90. },
  91. NetworkCostsScraperDiagnosticID: {
  92. ID: NetworkCostsScraperDiagnosticID,
  93. MetricName: event.NetworkCostsScraperName,
  94. Label: "Network costs daemonset metrics scrapers are available and being scraped.",
  95. Description: scraperDiagnosticDescriptionFor(event.NetworkCostsScraperName, ""),
  96. DiagType: DiagnosticTypeScraper,
  97. },
  98. KubernetesNodesScraperDiagnosticID: {
  99. ID: KubernetesNodesScraperDiagnosticID,
  100. MetricName: KubernetesNodesScraperDiagnosticID,
  101. Label: fmt.Sprintf("Kubernetes cluster resources: %s are available and being scraped", event.NodeScraperType),
  102. Description: scraperDiagnosticDescriptionFor(event.KubernetesClusterScraperName, event.NodeScraperType),
  103. DiagType: DiagnosticTypeScraper,
  104. },
  105. KubernetesNamespacesScraperDiagnosticID: {
  106. ID: KubernetesNamespacesScraperDiagnosticID,
  107. MetricName: KubernetesNamespacesScraperDiagnosticID,
  108. Label: fmt.Sprintf("Kubernetes cluster resources: %s are available and being scraped", event.NamespaceScraperType),
  109. Description: scraperDiagnosticDescriptionFor(event.KubernetesClusterScraperName, event.NamespaceScraperType),
  110. DiagType: DiagnosticTypeScraper,
  111. },
  112. KubernetesReplicaSetsScraperDiagnosticID: {
  113. ID: KubernetesReplicaSetsScraperDiagnosticID,
  114. MetricName: KubernetesReplicaSetsScraperDiagnosticID,
  115. Label: fmt.Sprintf("Kubernetes cluster resources: %s are available and being scraped", event.ReplicaSetScraperType),
  116. Description: scraperDiagnosticDescriptionFor(event.KubernetesClusterScraperName, event.ReplicaSetScraperType),
  117. DiagType: DiagnosticTypeScraper,
  118. },
  119. KubernetesDeploymentsScraperDiagnosticID: {
  120. ID: KubernetesDeploymentsScraperDiagnosticID,
  121. MetricName: KubernetesDeploymentsScraperDiagnosticID,
  122. Label: fmt.Sprintf("Kubernetes cluster resources: %s are available and being scraped", event.DeploymentScraperType),
  123. Description: scraperDiagnosticDescriptionFor(event.KubernetesClusterScraperName, event.DeploymentScraperType),
  124. DiagType: DiagnosticTypeScraper,
  125. },
  126. KubernetesStatefulSetsScraperDiagnosticID: {
  127. ID: KubernetesStatefulSetsScraperDiagnosticID,
  128. MetricName: KubernetesStatefulSetsScraperDiagnosticID,
  129. Label: fmt.Sprintf("Kubernetes cluster resources: %s are available and being scraped", event.StatefulSetScraperType),
  130. Description: scraperDiagnosticDescriptionFor(event.KubernetesClusterScraperName, event.StatefulSetScraperType),
  131. DiagType: DiagnosticTypeScraper,
  132. },
  133. KubernetesServicesScraperDiagnosticID: {
  134. ID: KubernetesServicesScraperDiagnosticID,
  135. MetricName: KubernetesServicesScraperDiagnosticID,
  136. Label: fmt.Sprintf("Kubernetes cluster resources: %s are available and being scraped", event.ServiceScraperType),
  137. Description: scraperDiagnosticDescriptionFor(event.KubernetesClusterScraperName, event.ServiceScraperType),
  138. DiagType: DiagnosticTypeScraper,
  139. },
  140. KubernetesPodsScraperDiagnosticID: {
  141. ID: KubernetesPodsScraperDiagnosticID,
  142. MetricName: KubernetesPodsScraperDiagnosticID,
  143. Label: fmt.Sprintf("Kubernetes cluster resources: %s are available and being scraped", event.PodScraperType),
  144. Description: scraperDiagnosticDescriptionFor(event.KubernetesClusterScraperName, event.PodScraperType),
  145. DiagType: DiagnosticTypeScraper,
  146. },
  147. KubernetesPvsScraperDiagnosticID: {
  148. ID: KubernetesPvsScraperDiagnosticID,
  149. MetricName: KubernetesPvsScraperDiagnosticID,
  150. Label: fmt.Sprintf("Kubernetes cluster resources: %s are available and being scraped", event.PvScraperType),
  151. Description: scraperDiagnosticDescriptionFor(event.KubernetesClusterScraperName, event.PvScraperType),
  152. DiagType: DiagnosticTypeScraper,
  153. },
  154. KubernetesPvcsScraperDiagnosticID: {
  155. ID: KubernetesPvcsScraperDiagnosticID,
  156. MetricName: KubernetesPvcsScraperDiagnosticID,
  157. Label: fmt.Sprintf("Kubernetes cluster resources: %s are available and being scraped", event.PvcScraperType),
  158. Description: scraperDiagnosticDescriptionFor(event.KubernetesClusterScraperName, event.PvcScraperType),
  159. DiagType: DiagnosticTypeScraper,
  160. },
  161. }
  162. // scraper identifier for diagnostic mapping _must_ match diagnostic ids defined above
  163. func scraperIdFor(scraperName, scrapeType string) string {
  164. if scrapeType == "" {
  165. return scraperName
  166. }
  167. return fmt.Sprintf("%s-%s", scraperName, scrapeType)
  168. }
  169. // helper for generating dynamic scraper events diagnostic descriptions
  170. func scraperDiagnosticDescriptionFor(scraperName, scrapeType string) string {
  171. if scrapeType == "" {
  172. return fmt.Sprintf("Determine if the scraper for: %s is correctly reporting data", scraperName)
  173. }
  174. return fmt.Sprintf("Determine if the scraper for: %s is correctly report data for type: %s", scraperName, scrapeType)
  175. }
  176. // CollectorDiagnostic is a basic interface used to allow various types of diagnostic data collection
  177. type CollectorDiagnostic interface {
  178. // Id returns the identifier for the diagnostic
  179. Id() string
  180. // Name returns the name of the metric being run
  181. Name() string
  182. // Details generates an exportable detail map for the specific diagnostic, and resets any of its internal
  183. // state for the current cycle.
  184. Details() map[string]any
  185. }
  186. // metric diagnostic is checked on metrics update -- it maintains a historic record of all the instants
  187. // a specific metric was updated, and reports a diagnostic on the validity of that history.
  188. type metricDiagnostic struct {
  189. diagnostic *diagnosticDefinition
  190. updateTimestamps []time.Time
  191. result map[string]float64
  192. }
  193. // creates a new metric diagnostic
  194. func newMetricDiagnostic(diagnostic *diagnosticDefinition) *metricDiagnostic {
  195. return &metricDiagnostic{
  196. diagnostic: diagnostic,
  197. result: make(map[string]float64),
  198. }
  199. }
  200. // Id returns the identifier for the metric diagnostic type -- this just proxies from the diagnostic
  201. // definition.
  202. func (md *metricDiagnostic) Id() string {
  203. return md.diagnostic.ID
  204. }
  205. // Name returns the name of the metric being run for the metric diagnostic type -- this just proxies from
  206. // the diagnostic definition.
  207. func (md *metricDiagnostic) Name() string {
  208. return md.diagnostic.MetricName
  209. }
  210. // Details generates an exportable detail map for the specific diagnostic, and resets any of its internal
  211. // state for the current cycle.
  212. func (md *metricDiagnostic) Details() map[string]any {
  213. // for all timestamps that occurred during our update cycle,
  214. // if any timestamps for our metric do not exist, then we
  215. // say that the diagnostic failed. if there are no timestamps
  216. // marked in the result, then we also say the diagnostic failed.
  217. passed := true
  218. if len(md.result) == 0 {
  219. passed = false
  220. } else {
  221. for _, t := range md.updateTimestamps {
  222. key := t.Format(time.RFC3339)
  223. _, hasTimestamp := md.result[key]
  224. if !hasTimestamp {
  225. passed = false
  226. break
  227. }
  228. }
  229. }
  230. details := map[string]any{
  231. "query": md.Name(),
  232. "label": md.diagnostic.Label,
  233. "docLink": md.diagnostic.DocLink,
  234. "result": maps.Clone(md.result),
  235. "passed": passed,
  236. }
  237. // reset the update timestamps and results
  238. md.updateTimestamps = []time.Time{}
  239. for k := range md.result {
  240. delete(md.result, k)
  241. }
  242. return details
  243. }
  244. // scrapeDiagnostic maintains the latest state of each scrape event that occurs. scrape
  245. // events can be registered for any event, but only the specific scrapes with diagnostic
  246. // definitions defined will export as diagnostics.
  247. type scrapeDiagnostic struct {
  248. diagnostic *diagnosticDefinition
  249. scraper string
  250. scrapeType string
  251. targets int
  252. errors []error
  253. }
  254. // creates a new scrape diagnostic from the event data and diagnostics definition
  255. func newScrapeDiagnostic(
  256. scrapeEvent event.ScrapeEvent,
  257. definition *diagnosticDefinition,
  258. ) *scrapeDiagnostic {
  259. return &scrapeDiagnostic{
  260. diagnostic: definition,
  261. scraper: scrapeEvent.ScraperName,
  262. scrapeType: scrapeEvent.ScrapeType,
  263. targets: scrapeEvent.Targets,
  264. errors: scrapeEvent.Errors,
  265. }
  266. }
  267. // Id is a concatenation of scraper and scrapeType if a scrapeType exists.
  268. func (sd *scrapeDiagnostic) Id() string {
  269. if sd.diagnostic != nil {
  270. return sd.diagnostic.ID
  271. }
  272. return scraperIdFor(sd.scraper, sd.scrapeType)
  273. }
  274. // Name returns the name of the scraper the event fired from.
  275. func (sd *scrapeDiagnostic) Name() string {
  276. return sd.scraper
  277. }
  278. // Details generates an exportable detail map for the specific diagnostic, and resets any of its internal
  279. // state for the current cycle.
  280. func (sd *scrapeDiagnostic) Details() map[string]any {
  281. // passed if there are no errors
  282. passed := len(sd.errors) == 0
  283. // map errors to a string slice for easier propagation
  284. var errs []string
  285. if !passed {
  286. errs = sliceutil.Map(sd.errors, func(e error) string { return e.Error() })
  287. } else {
  288. errs = []string{}
  289. }
  290. // since a scrape event does not require a matching diagnostic definition,
  291. // we must generate properties normally extracted from the defintiion
  292. var label string
  293. if sd.diagnostic != nil {
  294. label = sd.diagnostic.Label
  295. } else {
  296. label = fmt.Sprintf("%s scraper is available and being scraped.", sd.scraper)
  297. }
  298. // same for doclink
  299. var docLink string
  300. if sd.diagnostic != nil {
  301. docLink = sd.diagnostic.DocLink
  302. } else {
  303. docLink = ""
  304. }
  305. details := map[string]any{
  306. // stats contains total entities to scrape, success (of the total), and failures (of the total)
  307. "stats": map[string]any{
  308. "total": sd.targets,
  309. "success": max(sd.targets-len(errs), 0),
  310. "fail": len(errs),
  311. },
  312. "label": label,
  313. "docLink": docLink,
  314. "errors": errs,
  315. "passed": passed,
  316. }
  317. // scraper diagnostics do not maintain any internal/historical state
  318. // to reset -- it just maintains the most recent data. if we decide
  319. // to track historical event data, would need to reset the state after
  320. // this call.
  321. return details
  322. }
  323. // DiagnosticsModule is a helper type for managing all of the internal diagnostics for the collector datasource.
  324. type DiagnosticsModule struct {
  325. lock sync.RWMutex
  326. diagnostics *collections.IdNameMap[CollectorDiagnostic]
  327. updater Updater
  328. scrapeHandlerId events.HandlerID // scrape event handler identifier for removal
  329. }
  330. // NewDiagnosticsModule creates a new `DiagnosticsModule` instance to be used with a collector data source
  331. func NewDiagnosticsModule(updater Updater) *DiagnosticsModule {
  332. // initialize all metric diagnostics IFF the diagnostic type is "metrics"
  333. // NOTE: scraper diagnostics are dynamically created as scrape results arrive
  334. diagnostics := collections.NewIdNameMap[CollectorDiagnostic]()
  335. for _, def := range diagnosticDefinitions {
  336. // only insert metric diagnostic types
  337. if def.DiagType == DiagnosticTypeMetric {
  338. diagnostics.Insert(newMetricDiagnostic(def))
  339. }
  340. }
  341. dm := &DiagnosticsModule{
  342. diagnostics: diagnostics,
  343. updater: updater,
  344. }
  345. scrapeEvents := events.GlobalDispatcherFor[event.ScrapeEvent]()
  346. dm.scrapeHandlerId = scrapeEvents.AddEventHandler(dm.onScrapeEvent)
  347. return dm
  348. }
  349. // handles a scrape event dispatched -- updates the record for the specific scrape
  350. // diagnostic.
  351. func (d *DiagnosticsModule) onScrapeEvent(event event.ScrapeEvent) {
  352. d.lock.Lock()
  353. defer d.lock.Unlock()
  354. id := scraperIdFor(event.ScraperName, event.ScrapeType)
  355. // scrape events can occur without a backing diagnostic definition -- just
  356. // ignore if this happens
  357. def, ok := diagnosticDefinitions[id]
  358. if !ok {
  359. return
  360. }
  361. d.diagnostics.Insert(newScrapeDiagnostic(event, def))
  362. }
  363. func (d *DiagnosticsModule) Update(updateSet *UpdateSet) {
  364. if updateSet == nil {
  365. return
  366. }
  367. // This is done so that the update func is marked complete when both the updater and diagnostics are done
  368. // Otherwise we might face a race condition when calling the diagnostics details func before the diagnostics are done
  369. var wg sync.WaitGroup
  370. wg.Add(2) // 1 for updater, 1 for diagnostics
  371. go func() {
  372. defer wg.Done()
  373. d.lock.Lock()
  374. defer d.lock.Unlock()
  375. // add the timestamp to all metric diagnostic instances (see notes on addUpdateTimestamp)
  376. ts := updateSet.Timestamp
  377. d.addUpdateTimestamp(ts)
  378. timestamp := ts.Format(time.RFC3339)
  379. for _, update := range updateSet.Updates {
  380. if metric, ok := d.diagnostics.ByName(update.Name); ok {
  381. // this is unfortunately necessary due to the way our diangostic collectors
  382. // differ in functionality -- it makes more sense to duck type here rather
  383. // than maintain a separate map of just the metric types, or add metric
  384. // specific implementation details to the CollectorDiagnostic interface.
  385. // generally, we _should_ be able to make this assertion -- but we'll check in case.
  386. if metricDiag, isType := metric.(*metricDiagnostic); isType {
  387. // mark the timestamp as "seen" with the value
  388. metricDiag.result[timestamp] = update.Value
  389. }
  390. }
  391. }
  392. }()
  393. // We are still maintaining the order in which the updates to the repo are called
  394. // as this function gets the new call only when both these go routines are done
  395. go func() {
  396. defer wg.Done()
  397. d.updater.Update(updateSet)
  398. }()
  399. wg.Wait()
  400. }
  401. // appends an update timestamp on each of the metric diagnostics -- we need to write
  402. // every timestamp that the update makes unfortunately. There isn't a way to determine
  403. // if a diagnostic service "cycle" is complete, so it's not really possible to maintain
  404. // a most recent timestamps on the DiagnosticsModule (the optimal solution). we're not
  405. // far from a solid design here, just might need some more support on the diagnostic
  406. // service side.
  407. func (d *DiagnosticsModule) addUpdateTimestamp(t time.Time) {
  408. for _, def := range diagnosticDefinitions {
  409. if def.DiagType != DiagnosticTypeMetric {
  410. continue
  411. }
  412. diag, ok := d.diagnostics.ById(def.ID)
  413. if !ok {
  414. continue
  415. }
  416. // More duck typing sadly -- there are some fundamental design incompatibilities
  417. // with the way DiagnosticService was written and this cached diagnostic approach
  418. // that make things like "cycle" resets a bit difficult
  419. if metricDiag, ok := diag.(*metricDiagnostic); ok {
  420. metricDiag.updateTimestamps = append(metricDiag.updateTimestamps, t)
  421. }
  422. }
  423. }
  424. // DiagnosticDefinitions returns a deterministic mapping of pre-defined diagnostics used with the collector.
  425. func (d *DiagnosticsModule) DiagnosticsDefinitions() map[string]*diagnosticDefinition {
  426. return diagnosticDefinitions
  427. }
  428. // DiagnosticDetails returns the latest details for the diagnostic type
  429. func (d *DiagnosticsModule) DiagnosticsDetails(diagnosticsId string) (map[string]any, error) {
  430. d.lock.RLock()
  431. defer d.lock.RUnlock()
  432. // If a bogus diagnostics id was passed, we can check the definitions first
  433. if _, exists := diagnosticDefinitions[diagnosticsId]; !exists {
  434. return nil, fmt.Errorf("invalid diagnostic id: %s not found", diagnosticsId)
  435. }
  436. // for some diagnostics, like the scraper variant, they may not have been registered
  437. // yet (no scrape events), so we should return an error indicating that the scrape
  438. // hasn't occurred yet
  439. diagnostic, exists := d.diagnostics.ById(diagnosticsId)
  440. if !exists {
  441. return nil, fmt.Errorf("diagnostic not available: %s", diagnosticsId)
  442. }
  443. return diagnostic.Details(), nil
  444. }