| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394 |
- package scrape
- import (
- "io"
- "sync"
- "github.com/kubecost/events"
- "github.com/opencost/opencost/core/pkg/log"
- "github.com/opencost/opencost/modules/collector-source/pkg/event"
- "github.com/opencost/opencost/modules/collector-source/pkg/metric"
- "github.com/opencost/opencost/modules/collector-source/pkg/scrape/parser"
- "github.com/opencost/opencost/modules/collector-source/pkg/scrape/target"
- )
- type TargetScraper struct {
- name string // identifier for the scraper
- targetProvider target.TargetProvider
- metricNames map[string]struct{} // filter for which metrics will be processed
- includeMetrics bool // toggle to make metrics an include or exclude list
- }
- func newTargetScrapper(name string, provider target.TargetProvider, metricNames []string, includeMetrics bool) *TargetScraper {
- metricSet := make(map[string]struct{})
- for _, metricName := range metricNames {
- metricSet[metricName] = struct{}{}
- }
- return &TargetScraper{
- name: name,
- targetProvider: provider,
- metricNames: metricSet,
- includeMetrics: includeMetrics,
- }
- }
- func (s *TargetScraper) Scrape() []metric.Update {
- targets := s.targetProvider.GetTargets()
- var errLock sync.Mutex
- var errors []error
- var scrapeFuncs []ScrapeFunc
- for i := range targets {
- target := targets[i]
- fn := func() []metric.Update {
- var scrapeResults []metric.Update
- f, err := target.Load()
- if err != nil {
- errLock.Lock()
- errors = append(errors, err)
- errLock.Unlock()
- log.Errorf("failed to scrape target: %s", err.Error())
- return scrapeResults
- }
- if closer, ok := f.(io.ReadCloser); ok {
- defer closer.Close()
- }
- results, err := parser.Parse(f)
- if err != nil {
- errLock.Lock()
- errors = append(errors, err)
- errLock.Unlock()
- log.Errorf("failed to parse target: %s", err.Error())
- return scrapeResults
- }
- for _, result := range results {
- // filter metrics to be processed by name
- if _, ok := s.metricNames[result.Name]; ok != s.includeMetrics {
- continue
- }
- scrapeResults = append(scrapeResults, metric.Update{
- Name: result.Name,
- Labels: result.Labels,
- Value: result.Value,
- })
- }
- return scrapeResults
- }
- scrapeFuncs = append(scrapeFuncs, fn)
- }
- updates := concurrentScrape(scrapeFuncs...)
- // dispatch a scrape event for this specific scrape
- events.Dispatch(event.ScrapeEvent{
- ScraperName: s.name,
- Targets: len(targets),
- Errors: errors,
- })
- return updates
- }
|