targetscraper.go 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. package scrape
  2. import (
  3. "sync"
  4. "github.com/kubecost/events"
  5. "github.com/opencost/opencost/core/pkg/log"
  6. "github.com/opencost/opencost/modules/collector-source/pkg/event"
  7. "github.com/opencost/opencost/modules/collector-source/pkg/metric"
  8. "github.com/opencost/opencost/modules/collector-source/pkg/scrape/parser"
  9. "github.com/opencost/opencost/modules/collector-source/pkg/scrape/target"
  10. )
  11. type TargetScraper struct {
  12. name string // identifier for the scraper
  13. targetProvider target.TargetProvider
  14. metricNames map[string]struct{} // filter for which metrics will be processed
  15. includeMetrics bool // toggle to make metrics an include or exclude list
  16. }
  17. func newTargetScrapper(name string, provider target.TargetProvider, metricNames []string, includeMetrics bool) *TargetScraper {
  18. metricSet := make(map[string]struct{})
  19. for _, metricName := range metricNames {
  20. metricSet[metricName] = struct{}{}
  21. }
  22. return &TargetScraper{
  23. name: name,
  24. targetProvider: provider,
  25. metricNames: metricSet,
  26. includeMetrics: includeMetrics,
  27. }
  28. }
  29. func (s *TargetScraper) Scrape() []metric.Update {
  30. targets := s.targetProvider.GetTargets()
  31. var errLock sync.Mutex
  32. var errors []error
  33. var scrapeFuncs []ScrapeFunc
  34. for i := range targets {
  35. target := targets[i]
  36. fn := func() []metric.Update {
  37. var scrapeResults []metric.Update
  38. f, err := target.Load()
  39. if err != nil {
  40. errLock.Lock()
  41. errors = append(errors, err)
  42. errLock.Unlock()
  43. log.Errorf("failed to scrape target: %s", err.Error())
  44. return scrapeResults
  45. }
  46. results, err := parser.Parse(f)
  47. if err != nil {
  48. errLock.Lock()
  49. errors = append(errors, err)
  50. errLock.Unlock()
  51. log.Errorf("failed to parse target: %s", err.Error())
  52. return scrapeResults
  53. }
  54. for _, result := range results {
  55. // filter metrics to be processed by name
  56. if _, ok := s.metricNames[result.Name]; ok != s.includeMetrics {
  57. continue
  58. }
  59. scrapeResults = append(scrapeResults, metric.Update{
  60. Name: result.Name,
  61. Labels: result.Labels,
  62. Value: result.Value,
  63. })
  64. }
  65. return scrapeResults
  66. }
  67. scrapeFuncs = append(scrapeFuncs, fn)
  68. }
  69. updates := concurrentScrape(scrapeFuncs...)
  70. // dispatch a scrape event for this specific scrape
  71. events.Dispatch(event.ScrapeEvent{
  72. ScraperName: s.name,
  73. Targets: len(targets),
  74. Errors: errors,
  75. })
  76. return updates
  77. }