targetscraper.go 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. package scrape
  2. import (
  3. "io"
  4. "sync"
  5. "github.com/kubecost/events"
  6. "github.com/opencost/opencost/core/pkg/log"
  7. "github.com/opencost/opencost/modules/collector-source/pkg/event"
  8. "github.com/opencost/opencost/modules/collector-source/pkg/metric"
  9. "github.com/opencost/opencost/modules/collector-source/pkg/scrape/parser"
  10. "github.com/opencost/opencost/modules/collector-source/pkg/scrape/target"
  11. )
  12. type TargetScraper struct {
  13. name string // identifier for the scraper
  14. targetProvider target.TargetProvider
  15. metricNames map[string]struct{} // filter for which metrics will be processed
  16. includeMetrics bool // toggle to make metrics an include or exclude list
  17. }
  18. func newTargetScrapper(name string, provider target.TargetProvider, metricNames []string, includeMetrics bool) *TargetScraper {
  19. metricSet := make(map[string]struct{})
  20. for _, metricName := range metricNames {
  21. metricSet[metricName] = struct{}{}
  22. }
  23. return &TargetScraper{
  24. name: name,
  25. targetProvider: provider,
  26. metricNames: metricSet,
  27. includeMetrics: includeMetrics,
  28. }
  29. }
  30. func (s *TargetScraper) Scrape() []metric.Update {
  31. targets := s.targetProvider.GetTargets()
  32. var errLock sync.Mutex
  33. var errors []error
  34. var scrapeFuncs []ScrapeFunc
  35. for i := range targets {
  36. target := targets[i]
  37. fn := func() []metric.Update {
  38. var scrapeResults []metric.Update
  39. f, err := target.Load()
  40. if err != nil {
  41. errLock.Lock()
  42. errors = append(errors, err)
  43. errLock.Unlock()
  44. log.Errorf("failed to scrape target: %s", err.Error())
  45. return scrapeResults
  46. }
  47. if closer, ok := f.(io.ReadCloser); ok {
  48. defer closer.Close()
  49. }
  50. results, err := parser.Parse(f)
  51. if err != nil {
  52. errLock.Lock()
  53. errors = append(errors, err)
  54. errLock.Unlock()
  55. log.Errorf("failed to parse target: %s", err.Error())
  56. return scrapeResults
  57. }
  58. for _, result := range results {
  59. // filter metrics to be processed by name
  60. if _, ok := s.metricNames[result.Name]; ok != s.includeMetrics {
  61. continue
  62. }
  63. scrapeResults = append(scrapeResults, metric.Update{
  64. Name: result.Name,
  65. Labels: result.Labels,
  66. Value: result.Value,
  67. })
  68. }
  69. return scrapeResults
  70. }
  71. scrapeFuncs = append(scrapeFuncs, fn)
  72. }
  73. updates := concurrentScrape(scrapeFuncs...)
  74. // dispatch a scrape event for this specific scrape
  75. events.Dispatch(event.ScrapeEvent{
  76. ScraperName: s.name,
  77. Targets: len(targets),
  78. Errors: errors,
  79. })
  80. return updates
  81. }