Просмотр исходного кода

Sth/kcm 3385 (#3101)

Signed-off-by: Sean Holcomb <seanholcomb@gmail.com>
Sean Holcomb 1 год назад
Родитель
Сommit
f29c7ccf13
22 измененных файлов с 2309 добавлено и 105 удалено
  1. 14 10
      modules/collector-source/pkg/collector/activeminutes.go
  2. 13 3
      modules/collector-source/pkg/collector/avgovertime.go
  3. 9 8
      modules/collector-source/pkg/collector/collector.go
  4. 2 2
      modules/collector-source/pkg/collector/collector_test.go
  5. 57 0
      modules/collector-source/pkg/collector/collectordriver.go
  6. 4 0
      modules/collector-source/pkg/collector/helper.go
  7. 13 3
      modules/collector-source/pkg/collector/increase.go
  8. 19 7
      modules/collector-source/pkg/collector/info.go
  9. 13 3
      modules/collector-source/pkg/collector/maxovertime.go
  10. 45 9
      modules/collector-source/pkg/collector/metric.go
  11. 193 59
      modules/collector-source/pkg/collector/metrics.go
  12. 400 0
      modules/collector-source/pkg/collector/metricsquerier.go
  13. 74 0
      modules/collector-source/pkg/collector/metricsquerier_test.go
  14. 87 0
      modules/collector-source/pkg/collector/mock.go
  15. 45 0
      modules/collector-source/pkg/collector/networktargetprovider.go
  16. 342 0
      modules/collector-source/pkg/collector/scraper.go
  17. 796 0
      modules/collector-source/pkg/collector/scraper_test.go
  18. 32 0
      modules/collector-source/pkg/collector/targetscraper.go
  19. 116 0
      modules/collector-source/pkg/collector/targetscraper_test.go
  20. 20 0
      modules/collector-source/pkg/metrics/target/stringtarget.go
  21. 4 0
      modules/collector-source/pkg/metrics/target/target.go
  22. 11 1
      pkg/env/costmodelenv.go

+ 14 - 10
modules/collector-source/pkg/collector/activeminutes.go

@@ -22,23 +22,27 @@ func (m *ActiveMinutesAggregator) Name() string {
 	return m.name
 }
 
+func (m *ActiveMinutesAggregator) AdditionInfo() map[string]string {
+	return nil
+}
+
 func (m *ActiveMinutesAggregator) LabelValues() []string {
 	return m.labelValues
 }
 
-func (m *ActiveMinutesAggregator) Update(value float64) {
-	now := time.Now().UTC()
+func (m *ActiveMinutesAggregator) Update(value float64, timestamp *time.Time, additionalInfo map[string]string) {
+	if timestamp == nil {
+		return
+	}
 	if m.start == nil {
-		m.start = &now
+		m.start = timestamp
 	}
-
-	m.end = &now
+	m.end = timestamp
 }
 
-func (m *ActiveMinutesAggregator) Value() float64 {
-	if m.start == nil || m.end == nil {
-		return 0.0
+func (m *ActiveMinutesAggregator) Value() []MetricValue {
+	return []MetricValue{
+		{Value: 1, Timestamp: m.start},
+		{Value: 1, Timestamp: m.end},
 	}
-
-	return m.end.Sub(*m.start).Minutes()
 }

+ 13 - 3
modules/collector-source/pkg/collector/avgovertime.go

@@ -1,5 +1,9 @@
 package collector
 
+import (
+	"time"
+)
+
 type AverageOverTimeAggregator struct {
 	name        string
 	labelValues []string
@@ -18,15 +22,21 @@ func (m *AverageOverTimeAggregator) Name() string {
 	return m.name
 }
 
+func (m *AverageOverTimeAggregator) AdditionInfo() map[string]string {
+	return nil
+}
+
 func (m *AverageOverTimeAggregator) LabelValues() []string {
 	return m.labelValues
 }
 
-func (m *AverageOverTimeAggregator) Update(value float64) {
+func (m *AverageOverTimeAggregator) Update(value float64, timestamp *time.Time, additionalInfo map[string]string) {
 	m.total += value
 	m.count++
 }
 
-func (m *AverageOverTimeAggregator) Value() float64 {
-	return m.total / float64(m.count)
+func (m *AverageOverTimeAggregator) Value() []MetricValue {
+	return []MetricValue{
+		{m.total / float64(m.count), nil},
+	}
 }

+ 9 - 8
modules/collector-source/pkg/collector/collector.go

@@ -154,7 +154,7 @@ type MetricsCollector interface {
 	// Update accepts the name of a metric, the label set and values to update the metric, the updated value, and a timestamp.
 	// This method does not accept a `MetricCollectorID` because it provides updates across many potential metric collector instances
 	// which utilize the same metric.
-	Update(metricName string, labels map[string]string, value float64, timestamp *time.Time)
+	Update(metricName string, labels map[string]string, value float64, timestamp *time.Time, additionalInformation map[string]string)
 }
 
 // InMemoryMetricsCollector is a thread-safe implementation of the `MetricsCollector` interface that stores metric instances
@@ -213,16 +213,17 @@ func (immc *InMemoryMetricsCollector) Query(collectorID MetricCollectorID) ([]*M
 	return immc.byCollectorID[collectorID].Get(), nil
 }
 
-func (immc *InMemoryMetricsCollector) Update(metricName string, labels map[string]string, value float64, timestamp *time.Time) {
+func (immc *InMemoryMetricsCollector) Update(
+	metricName string,
+	labels map[string]string,
+	value float64,
+	timestamp *time.Time,
+	additionalInformation map[string]string,
+) {
 	immc.lock.Lock()
 	defer immc.lock.Unlock()
 
 	for _, collector := range immc.byMetricName[metricName] {
-		labelValues := make([]string, 0, len(collector.labels))
-		for _, label := range collector.labels {
-			labelValues = append(labelValues, labels[label])
-		}
-
-		collector.Update(labelValues, value, timestamp)
+		collector.Update(labels, value, timestamp, additionalInformation)
 	}
 }

+ 2 - 2
modules/collector-source/pkg/collector/collector_test.go

@@ -29,8 +29,8 @@ func TestBasicCollectorFunctionality(t *testing.T) {
 	collector := NewOpenCostMetricCollector()
 
 	for i := 1; i <= 10; i++ {
-		collector.Update(ContainerMemoryWorkingSetBytes, labelsA, float64(i), nil)
-		collector.Update(ContainerMemoryWorkingSetBytes, labelsB, float64(i), nil)
+		collector.Update(ContainerMemoryWorkingSetBytes, labelsA, float64(i), nil, nil)
+		collector.Update(ContainerMemoryWorkingSetBytes, labelsB, float64(i), nil, nil)
 	}
 
 	results, err := collector.Query(RAMUsageAverageID)

+ 57 - 0
modules/collector-source/pkg/collector/collectordriver.go

@@ -0,0 +1,57 @@
+package collector
+
+import (
+	"time"
+
+	"github.com/opencost/opencost/core/pkg/log"
+	"github.com/opencost/opencost/core/pkg/util/atomic"
+)
+
+type Config struct {
+	ScrapeInterval time.Duration
+}
+type CollectorDriver struct {
+	config    Config
+	runState  atomic.AtomicRunState
+	stop      chan struct{}
+	collector MetricsCollector
+}
+
+func NewCollectorDriver(config Config) *CollectorDriver {
+	return &CollectorDriver{
+		collector: NewOpenCostMetricCollector(),
+	}
+}
+
+func (cd *CollectorDriver) Start() {
+	// Before we attempt to start, we must ensure we are not in a stopping state
+	cd.runState.WaitForReset()
+
+	// This will atomically check the current state to ensure we can run, then advances the state.
+	// If the state is already started, it will return false.
+	if !cd.runState.Start() {
+		log.Info("collector already running")
+		return
+	}
+	func() {
+		for {
+			select {
+			case <-cd.runState.OnStop():
+				cd.runState.Reset()
+				return // exit go routine
+			default:
+
+			}
+			time.Sleep(cd.config.ScrapeInterval)
+		}
+
+	}()
+}
+
+func (cd *CollectorDriver) Stop() {
+	cd.runState.Stop()
+}
+
+func (cd *CollectorDriver) scrape() {
+
+}

+ 4 - 0
modules/collector-source/pkg/collector/helper.go

@@ -43,3 +43,7 @@ func toMap(labels []string, values []string) map[string]string {
 	}
 	return m
 }
+
+func ptr[T any](v T) *T {
+	return &v
+}

+ 13 - 3
modules/collector-source/pkg/collector/increase.go

@@ -1,5 +1,9 @@
 package collector
 
+import (
+	"time"
+)
+
 type IncreaseAggregator struct {
 	name        string
 	labelValues []string
@@ -19,11 +23,15 @@ func (m *IncreaseAggregator) Name() string {
 	return m.name
 }
 
+func (m *IncreaseAggregator) AdditionInfo() map[string]string {
+	return nil
+}
+
 func (m *IncreaseAggregator) LabelValues() []string {
 	return m.labelValues
 }
 
-func (m *IncreaseAggregator) Update(value float64) {
+func (m *IncreaseAggregator) Update(value float64, timestamp *time.Time, additionalInfo map[string]string) {
 	if !m.initiated {
 		m.initiated = true
 		m.initial = value
@@ -31,6 +39,8 @@ func (m *IncreaseAggregator) Update(value float64) {
 	m.current = value
 }
 
-func (m *IncreaseAggregator) Value() float64 {
-	return m.current - m.initial
+func (m *IncreaseAggregator) Value() []MetricValue {
+	return []MetricValue{
+		{Value: m.current - m.initial},
+	}
 }

+ 19 - 7
modules/collector-source/pkg/collector/info.go

@@ -1,9 +1,15 @@
 package collector
 
-// InfoAggregator is metric aggregator meant to just record label values
+import (
+	"maps"
+	"time"
+)
+
+// InfoAggregator is metric aggregator meant to record label values and addition information
 type InfoAggregator struct {
-	name        string
-	labelValues []string
+	name           string
+	labelValues    []string
+	additionalInfo map[string]string
 }
 
 func Info(name string, labelValues []string) MetricAggregator {
@@ -17,14 +23,20 @@ func (m *InfoAggregator) Name() string {
 	return m.name
 }
 
+func (m *InfoAggregator) AdditionInfo() map[string]string {
+	return m.additionalInfo
+}
+
 func (m *InfoAggregator) LabelValues() []string {
 	return m.labelValues
 }
 
-func (m *InfoAggregator) Update(value float64) {
-
+func (m *InfoAggregator) Update(value float64, timestamp *time.Time, additionalInfo map[string]string) {
+	m.additionalInfo = maps.Clone(additionalInfo)
 }
 
-func (m *InfoAggregator) Value() float64 {
-	return 1
+func (m *InfoAggregator) Value() []MetricValue {
+	return []MetricValue{
+		{Value: 1},
+	}
 }

+ 13 - 3
modules/collector-source/pkg/collector/maxovertime.go

@@ -1,5 +1,9 @@
 package collector
 
+import (
+	"time"
+)
+
 type MaxOverTimeAggregator struct {
 	name        string
 	labelValues []string
@@ -17,16 +21,22 @@ func (m *MaxOverTimeAggregator) Name() string {
 	return m.name
 }
 
+func (m *MaxOverTimeAggregator) AdditionInfo() map[string]string {
+	return nil
+}
+
 func (m *MaxOverTimeAggregator) LabelValues() []string {
 	return m.labelValues
 }
 
-func (m *MaxOverTimeAggregator) Update(value float64) {
+func (m *MaxOverTimeAggregator) Update(value float64, timestamp *time.Time, additionalInfo map[string]string) {
 	if value > m.max {
 		m.max = value
 	}
 }
 
-func (m *MaxOverTimeAggregator) Value() float64 {
-	return m.max
+func (m *MaxOverTimeAggregator) Value() []MetricValue {
+	return []MetricValue{
+		{Value: m.max},
+	}
 }

+ 45 - 9
modules/collector-source/pkg/collector/metric.go

@@ -1,7 +1,11 @@
 package collector
 
 import (
+	"maps"
 	"time"
+
+	"github.com/opencost/opencost/core/pkg/source"
+	"github.com/opencost/opencost/core/pkg/util"
 )
 
 // MetricValue is a resulting data point value with an optional timestamp.
@@ -18,14 +22,36 @@ type MetricResult struct {
 	Values       []MetricValue
 }
 
+func (mr *MetricResult) ToQueryResult() *source.QueryResult {
+	metrics := map[string]any{}
+	for key, value := range mr.MetricLabels {
+		metrics[key] = value
+	}
+
+	values := make([]*util.Vector, len(mr.Values))
+	for i, value := range mr.Values {
+		timestamp := 0.0
+		if value.Timestamp != nil {
+			timestamp = float64(value.Timestamp.Unix())
+		}
+		values[i] = &util.Vector{
+			Timestamp: timestamp,
+			Value:     value.Value,
+		}
+	}
+
+	return source.NewQueryResult(metrics, values, nil)
+}
+
 // MetricAggregator is an interface that defines the methods for a metric collector aggregation.
 // For example, we have a metric `foo_metric`, and we wish to query and collect the average over time.
 // In this case, the `AverageOverTime` component is the MetricAggregator. It is the component responsible
 // for routing updates to metric values into their proper condensed form.
 type MetricAggregator interface {
 	Name() string
-	Update(value float64)
-	Value() float64
+	AdditionInfo() map[string]string
+	Update(value float64, timestamp *time.Time, additionalInfo map[string]string)
+	Value() []MetricValue
 	LabelValues() []string
 }
 
@@ -41,38 +67,48 @@ type MetricCollector struct {
 	labels            []string
 	aggregatorFactory MetricAggregatorFactory
 	metrics           map[uint64]MetricAggregator // map[hash(labelValues)] = aggregator
+	filter            func(map[string]string) bool
 }
 
 // NewMetricCollector creates a new MetricCollector instance with a unique identifier. The metric name is the specific
 // name of the collected metric that will be used to query the
-func NewMetricCollector(id MetricCollectorID, metricName string, labels []string, aggregatorFactory MetricAggregatorFactory) *MetricCollector {
+func NewMetricCollector(id MetricCollectorID, metricName string, labels []string, aggregatorFactory MetricAggregatorFactory, fn func(map[string]string) bool) *MetricCollector {
 	return &MetricCollector{
 		id:                id,
 		metricName:        metricName,
 		labels:            labels,
 		aggregatorFactory: aggregatorFactory,
 		metrics:           make(map[uint64]MetricAggregator),
+		filter:            fn,
 	}
 }
 
-func (mi *MetricCollector) Update(labelValues []string, value float64, timestamp *time.Time) {
+func (mi *MetricCollector) Update(labels map[string]string, value float64, timestamp *time.Time, additionalInfo map[string]string) {
+	if mi.filter != nil && !mi.filter(labels) {
+		return
+	}
+
+	labelValues := make([]string, len(mi.labels))
+	for i, key := range mi.labels {
+		labelValues[i] = labels[key]
+	}
 	key := hash(labelValues)
 	if mi.metrics[key] == nil {
 		mi.metrics[key] = mi.aggregatorFactory(metricNameFor(mi.metricName, mi.labels, labelValues), labelValues)
 	}
 
-	mi.metrics[key].Update(value)
+	mi.metrics[key].Update(value, timestamp, additionalInfo)
 }
 
 func (mi *MetricCollector) Get() []*MetricResult {
 	results := make([]*MetricResult, 0, len(mi.metrics))
 	for _, metric := range mi.metrics {
+		labels := toMap(mi.labels, metric.LabelValues())
+		maps.Copy(labels, metric.AdditionInfo())
 		mr := &MetricResult{
 			Name:         metric.Name(),
-			MetricLabels: toMap(mi.labels, metric.LabelValues()),
-			Values: []MetricValue{
-				{Value: metric.Value(), Timestamp: nil},
-			},
+			MetricLabels: labels,
+			Values:       metric.Value(),
 		}
 
 		results = append(results, mr)

+ 193 - 59
modules/collector-source/pkg/collector/metrics.go

@@ -12,8 +12,9 @@ func NewPVPricePerGiBHourMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		PVPricePerGiBHourID,
 		PVHourlyCost,
-		[]string{"cluster", "persistentvolume", "volumename", "provider_id"},
+		[]string{"persistentvolume", "volumename", "provider_id"},
 		AverageOverTime,
+		nil,
 	)
 }
 
@@ -29,8 +30,9 @@ func NewPVUsedAverageMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		PVUsedAverageID,
 		KubeletVolumeStatsUsedBytes,
-		[]string{"cluster", "persistentvolumeclaim", "namespace"},
+		[]string{"persistentvolumeclaim", "namespace"},
 		AverageOverTime,
+		nil,
 	)
 }
 
@@ -46,8 +48,9 @@ func NewPVUsedMaxMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		PVUsedMaxID,
 		KubeletVolumeStatsUsedBytes,
-		[]string{"cluster", "persistentvolumeclaim", "namespace"},
+		[]string{"persistentvolumeclaim", "namespace"},
 		MaxOverTime,
+		nil,
 	)
 }
 
@@ -62,8 +65,9 @@ func NewPVCInfoMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		PVCInfoID,
 		KubePersistenVolumeClaimInfo,
-		[]string{"persistentvolumeclaim", "storageclass", "volumename", "namespace", "cluster"},
+		[]string{"persistentvolumeclaim", "storageclass", "volumename", "namespace"},
 		Info,
+		nil,
 	)
 }
 
@@ -77,8 +81,9 @@ func NewPVActiveMinutesMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		PVActiveMinutesID,
 		KubePersistentVolumeCapacityBytes,
-		[]string{"cluster", "persistentvolume"},
+		[]string{"persistentvolume"},
 		ActiveMinutes,
+		nil,
 	)
 }
 
@@ -97,8 +102,12 @@ func NewLocalStorageCostMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		LocalStorageCostID,
 		ContainerFSLimitBytes,
-		[]string{"instance", "device", "cluster"},
+		[]string{"instance", "device"},
 		AverageOverTime,
+		func(labels map[string]string) bool {
+			// todo this filter needs a regex
+			return true
+		},
 	)
 }
 
@@ -117,8 +126,12 @@ func NewLocalStorageUsedCostMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		LocalStorageUsedCostID,
 		ContainerFSUsageBytes,
-		[]string{"instance", "device", "cluster"},
+		[]string{"instance", "device"},
 		AverageOverTime,
+		func(labels map[string]string) bool {
+			// todo this filter needs a regex
+			return true
+		},
 	)
 }
 
@@ -138,8 +151,12 @@ func NewLocalStorageUsedAverageMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		LocalStorageUsedAverageID,
 		ContainerFSUsageBytes,
-		[]string{"instance", "device", "cluster"},
+		[]string{"instance", "device"},
 		AverageOverTime,
+		func(labels map[string]string) bool {
+			// todo this filter needs a regex
+			return true
+		},
 	)
 }
 
@@ -160,8 +177,12 @@ func NewLocalStorageUsedMaxMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		LocalStorageUsedMaxID,
 		ContainerFSUsageBytes,
-		[]string{"instance", "device", "cluster"},
+		[]string{"instance", "device"},
 		MaxOverTime,
+		func(labels map[string]string) bool {
+			// todo this filter needs a regex
+			return true
+		},
 	)
 }
 
@@ -180,8 +201,12 @@ func NewLocalStorageBytesMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		LocalStorageBytesID,
 		ContainerFSLimitBytes,
-		[]string{"instance", "device", "cluster"},
+		[]string{"instance", "device"},
 		AverageOverTime,
+		func(labels map[string]string) bool {
+			// todo this filter needs a regex
+			return true
+		},
 	)
 }
 
@@ -196,8 +221,9 @@ func NewLocalStorageActiveMinutesMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		LocalStorageActiveMinutesID,
 		NodeTotalHourlyCost,
-		[]string{"cluster", "node", "instance", "provider_id"},
+		[]string{"node", "instance", "provider_id"},
 		ActiveMinutes,
+		nil,
 	)
 }
 
@@ -214,8 +240,9 @@ func NewNodeCPUCoresCapacityMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		NodeCPUCoresCapacityID,
 		KubeNodeStatusCapacityCPUCores,
-		[]string{"cluster", "node"},
+		[]string{"node"},
 		AverageOverTime,
+		nil,
 	)
 }
 
@@ -231,8 +258,9 @@ func NewNodeCPUCoresAllocatableMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		NodeCPUCoresAllocatableID,
 		KubeNodeStatusAllocatableCPUCores,
-		[]string{"cluster", "node"},
+		[]string{"node"},
 		AverageOverTime,
+		nil,
 	)
 }
 
@@ -248,8 +276,9 @@ func NewNodeRAMBytesCapacityMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		NodeRAMBytesCapacityID,
 		KubeNodeStatusCapacityMemoryBytes,
-		[]string{"cluster", "node"},
+		[]string{"node"},
 		AverageOverTime,
+		nil,
 	)
 }
 
@@ -265,8 +294,9 @@ func NewNodeRAMBytesAllocatableMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		NodeRAMBytesAllocatableID,
 		KubeNodeStatusAllocatableMemoryBytes,
-		[]string{"cluster", "node"},
+		[]string{"node"},
 		AverageOverTime,
+		nil,
 	)
 }
 
@@ -282,8 +312,9 @@ func NewNodeGPUCountMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		NodeGPUCountID,
 		NodeGPUCount,
-		[]string{"cluster", "node", "provider_id"},
+		[]string{"node", "provider_id"},
 		AverageOverTime,
+		nil,
 	)
 }
 
@@ -299,6 +330,7 @@ func NewNodeLabelsMetricCollector() *MetricCollector {
 		KubeNodeLabels,
 		[]string{},
 		Info,
+		nil,
 	)
 }
 
@@ -312,8 +344,9 @@ func NewNodeActiveMinutesMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		NodeActiveMinutesID,
 		NodeTotalHourlyCost,
-		[]string{"node", "cluster", "provider_id"},
+		[]string{"node", "provider_id"},
 		ActiveMinutes,
+		nil,
 	)
 }
 
@@ -329,8 +362,9 @@ func NewNodeCPUModeTotalMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		NodeCPUModeTotalID,
 		NodeCPUSecondsTotal,
-		[]string{"kubernetes_node", "cluster", "mode"},
+		[]string{"kubernetes_node", "mode"},
 		Increase,
+		nil,
 	)
 }
 
@@ -349,8 +383,11 @@ func NewNodeRAMSystemUsageAverageMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		NodeRAMSystemUsageAverageID,
 		ContainerMemoryWorkingSetBytes,
-		[]string{"instance", "cluster"},
+		[]string{"instance"},
 		AverageOverTime,
+		func(labels map[string]string) bool {
+			return labels["container_name"] != "POD" && labels["container_name"] != "" && labels["namespace"] == "kube-system"
+		},
 	)
 }
 
@@ -369,8 +406,11 @@ func NewNodeRAMUserUsageAverageMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		NodeRAMUserUsageAverageID,
 		ContainerMemoryWorkingSetBytes,
-		[]string{"instance", "cluster"},
+		[]string{"instance"},
 		AverageOverTime,
+		func(labels map[string]string) bool {
+			return labels["container_name"] != "POD" && labels["container_name"] != "" && labels["namespace"] != "kube-system"
+		},
 	)
 }
 
@@ -386,8 +426,9 @@ func NewLBPricePerHourMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		LBPricePerHourID,
 		KubecostLoadBalancerCost,
-		[]string{"namespace", "service_name", "ingress_ip", "cluster"},
+		[]string{"namespace", "service_name", "ingress_ip"},
 		AverageOverTime,
+		nil,
 	)
 }
 
@@ -401,8 +442,9 @@ func NewLBActiveMinutesMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		LBActiveMinutesID,
 		KubecostLoadBalancerCost,
-		[]string{"namespace", "service_name", "cluster", "ingress_ip"},
+		[]string{"namespace", "service_name", "ingress_ip"},
 		ActiveMinutes,
+		nil,
 	)
 }
 
@@ -416,8 +458,9 @@ func NewClusterManagementDurationMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		ClusterManagementDurationID,
 		KubecostClusterManagementCost,
-		[]string{"cluster", "provisioner_name"},
+		[]string{"provisioner_name"},
 		ActiveMinutes,
+		nil,
 	)
 }
 
@@ -433,8 +476,9 @@ func NewClusterManagementPricePerHourMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		ClusterManagementPricePerHourID,
 		KubecostClusterManagementCost,
-		[]string{"cluster", "provisioner_name"},
+		[]string{"provisioner_name"},
 		AverageOverTime,
+		nil,
 	)
 }
 
@@ -448,8 +492,9 @@ func NewPodActiveMinutesMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		PodActiveMinutesID,
 		KubePodContainerStatusRunning,
-		[]string{"pod", "namespace", "uid", "cluster"},
+		[]string{"pod", "namespace", "uid"},
 		ActiveMinutes,
+		nil,
 	)
 }
 
@@ -468,8 +513,11 @@ func NewRAMBytesAllocatedMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		RAMBytesAllocatedID,
 		ContainerMemoryAllocationBytes,
-		[]string{"container", "pod", "uid", "namespace", "node", "cluster", "provider_id"},
+		[]string{"container", "pod", "uid", "namespace", "node", "provider_id"},
 		AverageOverTime,
+		func(labels map[string]string) bool {
+			return labels["container"] != "POD" && labels["container"] != "" && labels["node"] != ""
+		},
 	)
 }
 
@@ -484,14 +532,17 @@ func NewRAMBytesAllocatedMetricCollector() *MetricCollector {
 //			<some_custom_filter>
 //		}[1h]
 //	)
-//) by (container, pod, namespace, node, %s)
+//) by (container, pod, namespace, node, cluster_id)
 
 func NewRAMRequestsMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		RAMRequestsID,
 		KubePodContainerResourceRequests,
-		[]string{"container", "pod", "uid", "namespace", "node", "cluster"},
+		[]string{"container", "pod", "uid", "namespace", "node"},
 		AverageOverTime,
+		func(labels map[string]string) bool {
+			return labels["resource"] == "memory" && labels["unit"] == "byte" && labels["container"] != "POD" && labels["container"] != "" && labels["node"] != ""
+		},
 	)
 }
 
@@ -509,8 +560,11 @@ func NewRAMUsageAverageMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		RAMUsageAverageID,
 		ContainerMemoryWorkingSetBytes,
-		[]string{"container", "uid", "pod", "namespace", "instance", "node", "cluster"},
+		[]string{"container", "uid", "pod", "namespace", "instance", "node"},
 		AverageOverTime,
+		func(labels map[string]string) bool {
+			return labels["container"] != "POD" && labels["container"] != ""
+		},
 	)
 }
 
@@ -529,8 +583,11 @@ func NewRAMUsageMaxMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		RAMUsageMaxID,
 		ContainerMemoryWorkingSetBytes,
-		[]string{"container", "uid", "pod", "namespace", "instance", "node", "cluster"},
+		[]string{"container_name", "container", "uid", "pod", "namespace", "instance", "node"},
 		MaxOverTime,
+		func(labels map[string]string) bool {
+			return labels["container"] != "" && labels["container_name"] != "POD" && labels["container"] != "POD" && labels["node"] != ""
+		},
 	)
 }
 
@@ -549,8 +606,11 @@ func NewCPUCoresAllocatedMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		CPUCoresAllocatedID,
 		ContainerCPUAllocation,
-		[]string{"container", "uid", "pod", "namespace", "node", "cluster"},
+		[]string{"container", "uid", "pod", "namespace", "node"},
 		AverageOverTime,
+		func(labels map[string]string) bool {
+			return labels["container"] != "POD" && labels["container"] != "" && labels["node"] != ""
+		},
 	)
 }
 
@@ -571,8 +631,11 @@ func NewCPURequestsMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		CPURequestsID,
 		KubePodContainerResourceRequests,
-		[]string{"container", "uid", "pod", "namespace", "node", "cluster"},
+		[]string{"container", "uid", "pod", "namespace", "node"},
 		AverageOverTime,
+		func(labels map[string]string) bool {
+			return labels["resource"] == "cpu" && labels["unit"] == "core" && labels["container"] != "POD" && labels["container"] != "" && labels["node"] != ""
+		},
 	)
 }
 
@@ -591,8 +654,11 @@ func NewCPUUsageAverageMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		CPUUsageAverageID,
 		ContainerCPUUsageSecondsTotal,
-		[]string{"container", "uid", "pod", "namespace", "node", "instance", "cluster"},
+		[]string{"container", "uid", "pod", "namespace", "node", "instance"},
 		Increase,
+		func(labels map[string]string) bool {
+			return labels["container"] != "" && labels["container_name"] != "POD" && labels["container"] != "POD"
+		},
 	)
 }
 
@@ -601,8 +667,9 @@ func NewCPUUsageMaxMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		CPUUsageMaxID,
 		ContainerCPUUsageSecondsTotal,
-		[]string{"container", "uid", "pod", "namespace", "node", "instance", "cluster"},
+		[]string{"container", "uid", "pod", "namespace", "node", "instance"},
 		MaxOverTime,
+		nil,
 	)
 }
 
@@ -622,8 +689,11 @@ func NewGPUsRequestedMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		GPUsRequestedID,
 		KubePodContainerResourceRequests,
-		[]string{"container", "uid", "pod", "namespace", "node", "cluster"},
+		[]string{"container", "uid", "pod", "namespace", "node"},
 		AverageOverTime,
+		func(labels map[string]string) bool {
+			return labels["resource"] == "nvidia_com_gpu" && labels["container"] != "POD" && labels["container"] != "" && labels["node"] != ""
+		},
 	)
 }
 
@@ -639,8 +709,11 @@ func NewGPUsUsageAverageMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		GPUsUsageAverageID,
 		DCGMFIPROFGRENGINEACTIVE,
-		[]string{"container", "uid", "pod", "namespace", "cluster"},
+		[]string{"container", "uid", "pod", "namespace"},
 		AverageOverTime,
+		func(labels map[string]string) bool {
+			return labels["container"] != ""
+		},
 	)
 }
 
@@ -656,8 +729,11 @@ func NewGPUsUsageMaxMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		GPUsUsageMaxID,
 		DCGMFIPROFGRENGINEACTIVE,
-		[]string{"container", "uid", "pod", "namespace", "cluster"},
+		[]string{"container", "uid", "pod", "namespace"},
 		MaxOverTime,
+		func(labels map[string]string) bool {
+			return labels["container"] != ""
+		},
 	)
 }
 
@@ -676,8 +752,11 @@ func NewGPUsAllocatedMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		GPUsAllocatedID,
 		ContainerGPUAllocation,
-		[]string{"container", "uid", "pod", "namespace", "node", "cluster"},
+		[]string{"container", "uid", "pod", "namespace", "node"},
 		AverageOverTime,
+		func(labels map[string]string) bool {
+			return labels["container"] != "" && labels["container"] != "POD" && labels["node"] != ""
+		},
 	)
 }
 
@@ -698,8 +777,11 @@ func NewIsGPUSharedMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		IsGPUSharedID,
 		KubePodContainerResourceRequests,
-		[]string{"container", "uid", "pod", "namespace", "node", "cluster"},
+		[]string{"container", "uid", "pod", "namespace", "node"},
 		AverageOverTime,
+		func(labels map[string]string) bool {
+			return labels["container"] != "" && labels["node"] != "" && labels["pod"] != "" && labels["unit"] == "integer"
+		},
 	)
 }
 
@@ -716,8 +798,11 @@ func NewGPUInfoMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		GPUInfoID,
 		DCGMFIDEVDECUTIL,
-		[]string{"container", "uid", "pod", "namespace", "device", "modelName", "uuid", "cluster"},
+		[]string{"container", "uid", "pod", "namespace", "device", "modelName", "uuid"},
 		AverageOverTime,
+		func(labels map[string]string) bool {
+			return labels["container"] != ""
+		},
 	)
 }
 
@@ -733,8 +818,9 @@ func NewNodeCPUPricePerHourMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		NodeCPUPricePerHourID,
 		NodeCPUHourlyCost,
-		[]string{"node", "cluster", "instance_type", "provider_id"},
+		[]string{"node", "instance_type", "provider_id"},
 		AverageOverTime,
+		nil,
 	)
 }
 
@@ -750,8 +836,9 @@ func NewNodeRAMPricePerGiBHourMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		NodeRAMPricePerGiBHourID,
 		NodeRAMHourlyCost,
-		[]string{"node", "cluster", "instance_type", "provider_id"},
+		[]string{"node", "instance_type", "provider_id"},
 		AverageOverTime,
+		nil,
 	)
 }
 
@@ -767,8 +854,9 @@ func NewNodeGPUPricePerHourMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		NodeGPUPricePerHourID,
 		NodeGPUHourlyCost,
-		[]string{"node", "cluster", "instance_type", "provider_id"},
+		[]string{"node", "instance_type", "provider_id"},
 		AverageOverTime,
+		nil,
 	)
 }
 
@@ -782,8 +870,9 @@ func NewNodeIsSpotMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		NodeIsSpotID,
 		KubecostNodeIsSpot,
-		[]string{"node", "cluster"}, // Todo are these the correct labels
+		[]string{"node"}, // Todo are these the correct labels
 		AverageOverTime,
+		nil,
 	)
 }
 
@@ -799,8 +888,9 @@ func NewPodPVCAllocationMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		PodPVCAllocationID,
 		PodPVCAllocation,
-		[]string{"persistentvolume", "persistentvolumeclaim", "pod", "namespace", "cluster"},
+		[]string{"persistentvolume", "persistentvolumeclaim", "pod", "namespace"},
 		AverageOverTime,
+		nil,
 	)
 }
 
@@ -816,8 +906,9 @@ func NewPVCBytesRequestedMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		PVCBytesRequestedID,
 		KubePersistentVolumeClaimResourceRequestsStorageBytes,
-		[]string{"persistentvolumeclaim", "namespace", "cluster"},
+		[]string{"persistentvolumeclaim", "namespace"},
 		AverageOverTime,
+		nil,
 	)
 }
 
@@ -833,8 +924,9 @@ func NewPVBytesMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		PVBytesID,
 		KubePersistentVolumeCapacityBytes,
-		[]string{"persistentvolume", "cluster"},
+		[]string{"persistentvolume"},
 		AverageOverTime,
+		nil,
 	)
 }
 
@@ -850,8 +942,9 @@ func NewPVCostPerGiBHourMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		PVCostPerGiBHourID,
 		PVHourlyCost,
-		[]string{"volumename", "cluster"},
+		[]string{"volumename"},
 		AverageOverTime,
+		nil,
 	)
 }
 
@@ -867,8 +960,9 @@ func NewPVInfoMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		PVInfoID,
 		KubecostPVInfo,
-		[]string{"cluster", "storageclass", "persistentvolume", "provider_id"},
+		[]string{"storageclass", "persistentvolume", "provider_id"},
 		AverageOverTime,
+		nil,
 	)
 }
 
@@ -887,8 +981,11 @@ func NewNetZoneGiBMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		NetZoneGiBID,
 		KubecostPodNetworkEgressBytesTotal,
-		[]string{"pod", "namespace", "cluster"},
+		[]string{"pod_name", "namespace"},
 		Increase,
+		func(labels map[string]string) bool {
+			return labels["internet"] == "false" && labels["same_zone"] == "false" && labels["same_region"] == "true"
+		},
 	)
 }
 
@@ -906,6 +1003,7 @@ func NewNetZonePricePerGiBMetricCollector() *MetricCollector {
 		KubecostNetworkZoneEgressCost,
 		[]string{"cluster"},
 		AverageOverTime,
+		nil,
 	)
 }
 
@@ -924,8 +1022,11 @@ func NewNetRegionGiBMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		NetRegionGiBID,
 		KubecostPodNetworkEgressBytesTotal,
-		[]string{"pod", "namespace", "cluster"},
+		[]string{"pod_name", "namespace"},
 		Increase,
+		func(labels map[string]string) bool {
+			return labels["internet"] == "false" && labels["same_zone"] == "false" && labels["same_region"] == "false"
+		},
 	)
 }
 
@@ -943,6 +1044,7 @@ func NewNetRegionPricePerGiBMetricCollector() *MetricCollector {
 		KubecostNetworkRegionEgressCost,
 		[]string{"cluster"},
 		AverageOverTime,
+		nil,
 	)
 }
 
@@ -959,8 +1061,11 @@ func NewNetInternetGiBMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		NetInternetGiBID,
 		KubecostPodNetworkEgressBytesTotal,
-		[]string{"pod", "namespace", "cluster"},
+		[]string{"pod_name", "namespace"},
 		Increase,
+		func(labels map[string]string) bool {
+			return labels["internet"] == "true"
+		},
 	)
 }
 
@@ -978,6 +1083,7 @@ func NewNetInternetPricePerGiBMetricCollector() *MetricCollector {
 		KubecostNetworkInternetEgressCost,
 		[]string{"cluster"},
 		AverageOverTime,
+		nil,
 	)
 }
 
@@ -994,8 +1100,11 @@ func NewNetReceiveBytesMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		NetReceiveBytesID,
 		ContainerNetworkReceiveBytesTotal,
-		[]string{"pod", "namespace", "cluster"},
+		[]string{"pod_name", "pod", "namespace"},
 		Increase,
+		func(labels map[string]string) bool {
+			return labels["pod"] != ""
+		},
 	)
 }
 
@@ -1012,8 +1121,11 @@ func NewNetTransferBytesMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		NetTransferBytesID,
 		ContainerNetworkTransmitBytesTotal,
-		[]string{"pod", "namespace", "cluster"},
+		[]string{"pod_name", "pod", "namespace"},
 		Increase,
+		func(labels map[string]string) bool {
+			return labels["pod"] != ""
+		},
 	)
 }
 
@@ -1029,6 +1141,7 @@ func NewNamespaceLabelsMetricCollector() *MetricCollector {
 		KubeNamespaceLabels,
 		[]string{},
 		Info,
+		nil,
 	)
 }
 
@@ -1044,6 +1157,7 @@ func NewNamespaceAnnotationsMetricCollector() *MetricCollector {
 		KubeNamespaceAnnotations,
 		[]string{},
 		Info,
+		nil,
 	)
 }
 
@@ -1059,6 +1173,7 @@ func NewPodLabelsMetricCollector() *MetricCollector {
 		KubePodLabels,
 		[]string{},
 		Info,
+		nil,
 	)
 }
 
@@ -1074,6 +1189,7 @@ func NewPodAnnotationsMetricCollector() *MetricCollector {
 		KubePodAnnotations,
 		[]string{},
 		Info,
+		nil,
 	)
 }
 
@@ -1089,6 +1205,7 @@ func NewServiceLabelsMetricCollector() *MetricCollector {
 		ServiceSelectorLabels,
 		[]string{},
 		Info,
+		nil,
 	)
 }
 
@@ -1104,6 +1221,7 @@ func NewDeploymentLabelsMetricCollector() *MetricCollector {
 		DeploymentMatchLabels,
 		[]string{},
 		Info,
+		nil,
 	)
 }
 
@@ -1119,6 +1237,7 @@ func NewStatefulSetLabelsMetricCollector() *MetricCollector {
 		StatefulSetMatchLabels,
 		[]string{},
 		Info,
+		nil,
 	)
 }
 
@@ -1135,8 +1254,11 @@ func NewDaemonSetLabelsMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		DaemonSetLabelsID,
 		KubePodOwner,
-		[]string{"pod", "owner_name", "namespace", "cluster_id"},
+		[]string{"pod", "owner_name", "namespace"},
 		Info,
+		func(labels map[string]string) bool {
+			return labels["owner_kind"] == "DaemonSet"
+		},
 	)
 }
 
@@ -1153,8 +1275,11 @@ func NewJobLabelsMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		JobLabelsID,
 		KubePodOwner,
-		[]string{"pod", "owner_name", "namespace", "cluster_id"},
+		[]string{"pod", "owner_name", "namespace"},
 		Info,
+		func(labels map[string]string) bool {
+			return labels["owner_kind"] == "Job"
+		},
 	)
 }
 
@@ -1171,8 +1296,11 @@ func NewPodsWithReplicaSetOwnerMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		PodsWithReplicaSetOwnerID,
 		KubePodOwner,
-		[]string{"pod", "owner_name", "namespace", "cluster_id"},
+		[]string{"pod", "owner_name", "namespace"},
 		Info,
+		func(labels map[string]string) bool {
+			return labels["owner_kind"] == "ReplicaSet"
+		},
 	)
 }
 
@@ -1190,8 +1318,11 @@ func NewReplicaSetsWithoutOwnersMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		ReplicaSetsWithoutOwnersID,
 		KubeReplicasetOwner,
-		[]string{"replicaset", "namespace", "cluster_id"},
+		[]string{"replicaset", "namespace"},
 		Info,
+		func(labels map[string]string) bool {
+			return labels["owner_kind"] == "<none>" && labels["owner_name"] == "<none>"
+		},
 	)
 }
 
@@ -1208,7 +1339,10 @@ func NewReplicaSetsWithRolloutMetricCollector() *MetricCollector {
 	return NewMetricCollector(
 		ReplicaSetsWithRolloutID,
 		KubeReplicasetOwner,
-		[]string{"replicaset", "namespace", "owner_kind", "owner_name", "cluster_id"},
+		[]string{"replicaset", "namespace", "owner_kind", "owner_name"},
 		Info,
+		func(labels map[string]string) bool {
+			return labels["owner_kind"] == "Rollout"
+		},
 	)
 }

+ 400 - 0
modules/collector-source/pkg/collector/metricsquerier.go

@@ -0,0 +1,400 @@
+package collector
+
+import (
+	"time"
+
+	"github.com/opencost/opencost/core/pkg/source"
+)
+
+type CollectorProvider interface {
+	GetCollector(start, end time.Time) MetricsCollector
+}
+type CollectorMetricsQuerier struct {
+	collectorProvider CollectorProvider
+}
+
+func (c CollectorMetricsQuerier) QueryPVActiveMinutes(start, end time.Time) *source.Future[source.PVActiveMinutesResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryPVUsedAverage(start, end time.Time) *source.Future[source.PVUsedAvgResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryPVUsedMax(start, end time.Time) *source.Future[source.PVUsedMaxResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryLocalStorageActiveMinutes(start, end time.Time) *source.Future[source.LocalStorageActiveMinutesResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryLocalStorageCost(start, end time.Time) *source.Future[source.LocalStorageCostResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryLocalStorageUsedCost(start, end time.Time) *source.Future[source.LocalStorageUsedCostResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryLocalStorageUsedAvg(start, end time.Time) *source.Future[source.LocalStorageUsedAvgResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryLocalStorageUsedMax(start, end time.Time) *source.Future[source.LocalStorageUsedMaxResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryLocalStorageBytes(start, end time.Time) *source.Future[source.LocalStorageBytesResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryNodeActiveMinutes(start, end time.Time) *source.Future[source.NodeActiveMinutesResult] {
+	collector := c.collectorProvider.GetCollector(start, end)
+	results, err := collector.Query(NodeActiveMinutesID)
+	queryResults := source.NewQueryResults(string(NodeActiveMinutesID))
+	queryResults.Error = err
+	for _, result := range results {
+		queryResults.Results = append(queryResults.Results, result.ToQueryResult())
+	}
+
+	ch := make(source.QueryResultsChan)
+	go func() {
+		ch <- queryResults
+	}()
+	return source.NewFuture[source.NodeActiveMinutesResult](source.DecodeNodeActiveMinutesResult, ch)
+}
+
+func (c CollectorMetricsQuerier) QueryNodeCPUCoresCapacity(start, end time.Time) *source.Future[source.NodeCPUCoresCapacityResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryNodeCPUCoresAllocatable(start, end time.Time) *source.Future[source.NodeCPUCoresAllocatableResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryNodeRAMBytesCapacity(start, end time.Time) *source.Future[source.NodeRAMBytesCapacityResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryNodeRAMBytesAllocatable(start, end time.Time) *source.Future[source.NodeRAMBytesAllocatableResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryNodeGPUCount(start, end time.Time) *source.Future[source.NodeGPUCountResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryNodeCPUModeTotal(start, end time.Time) *source.Future[source.NodeCPUModeTotalResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryNodeIsSpot(start, end time.Time) *source.Future[source.NodeIsSpotResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryNodeRAMSystemPercent(start, end time.Time) *source.Future[source.NodeRAMSystemPercentResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryNodeRAMUserPercent(start, end time.Time) *source.Future[source.NodeRAMUserPercentResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryLBActiveMinutes(start, end time.Time) *source.Future[source.LBActiveMinutesResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryLBPricePerHr(start, end time.Time) *source.Future[source.LBPricePerHrResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryClusterManagementDuration(start, end time.Time) *source.Future[source.ClusterManagementDurationResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryClusterManagementPricePerHr(start, end time.Time) *source.Future[source.ClusterManagementPricePerHrResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryPods(start, end time.Time) *source.Future[source.PodsResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryPodsUID(start, end time.Time) *source.Future[source.PodsResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryRAMBytesAllocated(start, end time.Time) *source.Future[source.RAMBytesAllocatedResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryRAMRequests(start, end time.Time) *source.Future[source.RAMRequestsResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryRAMUsageAvg(start, end time.Time) *source.Future[source.RAMUsageAvgResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryRAMUsageMax(start, end time.Time) *source.Future[source.RAMUsageMaxResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryNodeRAMPricePerGiBHr(start, end time.Time) *source.Future[source.NodeRAMPricePerGiBHrResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryCPUCoresAllocated(start, end time.Time) *source.Future[source.CPUCoresAllocatedResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryCPURequests(start, end time.Time) *source.Future[source.CPURequestsResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryCPUUsageAvg(start, end time.Time) *source.Future[source.CPUUsageAvgResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryCPUUsageMax(start, end time.Time) *source.Future[source.CPUUsageMaxResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryNodeCPUPricePerHr(start, end time.Time) *source.Future[source.NodeCPUPricePerHrResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryGPUsAllocated(start, end time.Time) *source.Future[source.GPUsAllocatedResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryGPUsRequested(start, end time.Time) *source.Future[source.GPUsRequestedResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryGPUsUsageAvg(start, end time.Time) *source.Future[source.GPUsUsageAvgResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryGPUsUsageMax(start, end time.Time) *source.Future[source.GPUsUsageMaxResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryNodeGPUPricePerHr(start, end time.Time) *source.Future[source.NodeGPUPricePerHrResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryGPUInfo(start, end time.Time) *source.Future[source.GPUInfoResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryIsGPUShared(start, end time.Time) *source.Future[source.IsGPUSharedResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryPodPVCAllocation(start, end time.Time) *source.Future[source.PodPVCAllocationResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryPVCBytesRequested(start, end time.Time) *source.Future[source.PVCBytesRequestedResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryPVCInfo(start, end time.Time) *source.Future[source.PVCInfoResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryPVBytes(start, end time.Time) *source.Future[source.PVBytesResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryPVPricePerGiBHour(start, end time.Time) *source.Future[source.PVPricePerGiBHourResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryPVInfo(start, end time.Time) *source.Future[source.PVInfoResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryNetZoneGiB(start, end time.Time) *source.Future[source.NetZoneGiBResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryNetZonePricePerGiB(start, end time.Time) *source.Future[source.NetZonePricePerGiBResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryNetRegionGiB(start, end time.Time) *source.Future[source.NetRegionGiBResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryNetRegionPricePerGiB(start, end time.Time) *source.Future[source.NetRegionPricePerGiBResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryNetInternetGiB(start, end time.Time) *source.Future[source.NetInternetGiBResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryNetInternetPricePerGiB(start, end time.Time) *source.Future[source.NetInternetPricePerGiBResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryNetInternetServiceGiB(start, end time.Time) *source.Future[source.NetInternetServiceGiBResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryNetTransferBytes(start, end time.Time) *source.Future[source.NetTransferBytesResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryNetZoneIngressGiB(start, end time.Time) *source.Future[source.NetZoneIngressGiBResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryNetRegionIngressGiB(start, end time.Time) *source.Future[source.NetRegionIngressGiBResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryNetInternetIngressGiB(start, end time.Time) *source.Future[source.NetInternetIngressGiBResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryNetInternetServiceIngressGiB(start, end time.Time) *source.Future[source.NetInternetServiceIngressGiBResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryNetReceiveBytes(start, end time.Time) *source.Future[source.NetReceiveBytesResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryNamespaceAnnotations(start, end time.Time) *source.Future[source.NamespaceAnnotationsResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryPodAnnotations(start, end time.Time) *source.Future[source.PodAnnotationsResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryNodeLabels(start, end time.Time) *source.Future[source.NodeLabelsResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryNamespaceLabels(start, end time.Time) *source.Future[source.NamespaceLabelsResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryPodLabels(start, end time.Time) *source.Future[source.PodLabelsResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryServiceLabels(start, end time.Time) *source.Future[source.ServiceLabelsResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryDeploymentLabels(start, end time.Time) *source.Future[source.DeploymentLabelsResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryStatefulSetLabels(start, end time.Time) *source.Future[source.StatefulSetLabelsResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryDaemonSetLabels(start, end time.Time) *source.Future[source.DaemonSetLabelsResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryJobLabels(start, end time.Time) *source.Future[source.JobLabelsResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryPodsWithReplicaSetOwner(start, end time.Time) *source.Future[source.PodsWithReplicaSetOwnerResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryReplicaSetsWithoutOwners(start, end time.Time) *source.Future[source.ReplicaSetsWithoutOwnersResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryReplicaSetsWithRollout(start, end time.Time) *source.Future[source.ReplicaSetsWithRolloutResult] {
+	//TODO implement me
+	panic("implement me")
+}
+
+func (c CollectorMetricsQuerier) QueryDataCoverage(limitDays int) (time.Time, time.Time, error) {
+	//TODO implement me
+	panic("implement me")
+}

+ 74 - 0
modules/collector-source/pkg/collector/metricsquerier_test.go

@@ -0,0 +1,74 @@
+package collector
+
+import (
+	"reflect"
+	"testing"
+	"time"
+
+	"github.com/opencost/opencost/core/pkg/source"
+	"github.com/opencost/opencost/core/pkg/util"
+)
+
+var start1Str = "2025-01-01T00:00:00Z00:00"
+var end1Str = "2025-01-01T00:01:00Z00:00"
+
+type MockCollectorProvider struct {
+	metricsCollector MetricsCollector
+}
+
+func (m *MockCollectorProvider) GetCollector(start, end time.Time) MetricsCollector {
+	return m.metricsCollector
+}
+
+func GetMockCollectorProvider() CollectorProvider {
+	collector := NewOpenCostMetricCollector()
+
+	start1, _ := time.Parse(time.RFC3339, start1Str)
+	end1, _ := time.Parse(time.RFC3339, end1Str)
+
+	node1Info := map[string]string{
+		"node":        "node1",
+		"provider_id": "node1",
+	}
+
+	collector.Update(NodeTotalHourlyCost, node1Info, 0, &start1, nil)
+	collector.Update(NodeTotalHourlyCost, node1Info, 0, &end1, nil)
+
+	return &MockCollectorProvider{
+		metricsCollector: collector,
+	}
+}
+
+func TestCollectorMetricsQuerier_QueryNodeActiveMinutes(t *testing.T) {
+	start1, _ := time.Parse(time.RFC3339, start1Str)
+	end1, _ := time.Parse(time.RFC3339, end1Str)
+
+	c := CollectorMetricsQuerier{
+		collectorProvider: GetMockCollectorProvider(),
+	}
+	resChActiveMins := c.QueryNodeActiveMinutes(time.Now(), time.Now())
+	resActiveMins, err := resChActiveMins.Await()
+	if err != nil {
+		t.Errorf("unexpected error: %v", err.Error())
+	}
+	expected := []*source.NodeActiveMinutesResult{
+		{
+			Cluster:    "",
+			Node:       "node1",
+			ProviderID: "node1",
+			Data: []*util.Vector{
+				{
+					Timestamp: float64(start1.Unix()),
+					Value:     1,
+				},
+				{
+					Timestamp: float64(end1.Unix()),
+					Value:     1,
+				},
+			},
+		},
+	}
+	if !reflect.DeepEqual(resActiveMins, expected) {
+		t.Errorf("QueryNodeActiveMinutes() = %v, want %v", resActiveMins, expected)
+	}
+}

+ 87 - 0
modules/collector-source/pkg/collector/mock.go

@@ -0,0 +1,87 @@
+package collector
+
+import (
+	"fmt"
+	"time"
+
+	"github.com/opencost/opencost/modules/collector-source/pkg/metrics/target"
+	"golang.org/x/exp/maps"
+)
+
+// UpdateRecorderCollector is a mock MetricsCollector which records the arguments passed to the update function in an array
+type UpdateRecorderCollector struct {
+	updateArgs []UpdateArgs
+}
+
+func (u *UpdateRecorderCollector) Register(collector *MetricCollector) error {
+	panic("this mock does not support this action")
+}
+
+func (u *UpdateRecorderCollector) Unregister(collectorID MetricCollectorID) bool {
+	panic("this mock does not support this action")
+}
+
+func (u *UpdateRecorderCollector) Query(collectorID MetricCollectorID) ([]*MetricResult, error) {
+	panic("this mock does not support this action")
+}
+
+func (u *UpdateRecorderCollector) Update(metricName string, labels map[string]string, value float64, timestamp *time.Time, additionalInformation map[string]string) {
+	u.updateArgs = append(u.updateArgs, UpdateArgs{
+		metricName:            metricName,
+		labels:                labels,
+		value:                 value,
+		timestamp:             timestamp,
+		additionalInformation: additionalInformation,
+	})
+}
+
+type UpdateArgs struct {
+	metricName            string
+	labels                map[string]string
+	value                 float64
+	timestamp             *time.Time
+	additionalInformation map[string]string
+}
+
+func (u UpdateArgs) equals(that UpdateArgs) error {
+	if u.metricName != that.metricName {
+		return fmt.Errorf("expected metric name %s, got %s", u.metricName, that.metricName)
+	}
+
+	if !maps.Equal(u.labels, that.labels) {
+		return fmt.Errorf("expected labels %s, got %s", u.labels, that.labels)
+	}
+
+	if u.value != that.value {
+		return fmt.Errorf("expected value %f, got %f", u.value, that.value)
+	}
+
+	if that.timestamp != nil {
+		if u.timestamp == nil {
+			return fmt.Errorf("expected timestamp nil, got %v", that.timestamp)
+		}
+		if !u.timestamp.Equal(*that.timestamp) {
+			return fmt.Errorf("expected timestamp %s, got %s", u.timestamp, that.timestamp)
+		}
+	} else if u.timestamp != nil {
+		return fmt.Errorf("expected timestamp %v, got nil", u.timestamp)
+	}
+
+	if !maps.Equal(u.additionalInformation, that.additionalInformation) {
+		return fmt.Errorf("expected additionalInformation %v, got %v", u.additionalInformation, that.additionalInformation)
+	}
+
+	return nil
+}
+
+type MockTargetProvider struct {
+	targets []target.ScrapeTarget
+}
+
+func NewMockTargetProvider(targets ...target.ScrapeTarget) *MockTargetProvider {
+	return &MockTargetProvider{targets: targets}
+}
+
+func (m *MockTargetProvider) GetTargets() []target.ScrapeTarget {
+	return m.targets
+}

+ 45 - 0
modules/collector-source/pkg/collector/networktargetprovider.go

@@ -0,0 +1,45 @@
+package collector
+
+import (
+	"context"
+	"fmt"
+
+	"github.com/opencost/opencost/core/pkg/log"
+	"github.com/opencost/opencost/modules/collector-source/pkg/metrics/target"
+	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+	"k8s.io/client-go/kubernetes"
+)
+
+type NetworkTargetProvider struct {
+	releaseName   string
+	port          int
+	kubeClientSet kubernetes.Interface
+}
+
+func NewNetworkTargetProvider(releaseName string, port int, k8s kubernetes.Interface) *NetworkTargetProvider {
+	return &NetworkTargetProvider{
+		releaseName:   releaseName,
+		port:          port,
+		kubeClientSet: k8s,
+	}
+}
+
+func (n NetworkTargetProvider) GetTargets() []target.ScrapeTarget {
+	k8s := n.kubeClientSet
+
+	pods, err := k8s.CoreV1().Pods("").List(context.Background(), metav1.ListOptions{
+		LabelSelector: fmt.Sprintf("app=%s-network-costs", n.releaseName),
+	})
+	if err != nil {
+		log.Errorf("NetworkTargetProvider: failed to retieve nodes from kubernetes client: %s", err.Error())
+		return nil
+	}
+
+	var targets []target.ScrapeTarget
+	for _, pod := range pods.Items {
+		t := target.NewUrlTarget(fmt.Sprintf("http://%s:%d/metrics", pod.Status.PodIP, n.port))
+		targets = append(targets, t)
+	}
+
+	return targets
+}

+ 342 - 0
modules/collector-source/pkg/collector/scraper.go

@@ -0,0 +1,342 @@
+package collector
+
+import (
+	"fmt"
+	"slices"
+	"strings"
+	"time"
+
+	"github.com/opencost/opencost/core/pkg/log"
+	"github.com/opencost/opencost/core/pkg/util/promutil"
+	"github.com/opencost/opencost/pkg/clustercache"
+	"golang.org/x/exp/maps"
+	v1 "k8s.io/api/core/v1"
+	"k8s.io/apimachinery/pkg/api/resource"
+	"k8s.io/apimachinery/pkg/util/validation"
+)
+
+type kubernetesScraper struct {
+	clusterCache clustercache.ClusterCache
+	collector    MetricsCollector
+}
+
+func (ks *kubernetesScraper) Scrape() {
+	timestamp := time.Now().UTC()
+	nodes := ks.clusterCache.GetAllNodes()
+	deployments := ks.clusterCache.GetAllDeployments()
+	namespaces := ks.clusterCache.GetAllNamespaces()
+	pods := ks.clusterCache.GetAllPods()
+	pvcs := ks.clusterCache.GetAllPersistentVolumeClaims()
+	pvs := ks.clusterCache.GetAllPersistentVolumes()
+	services := ks.clusterCache.GetAllServices()
+	statefulSets := ks.clusterCache.GetAllStatefulSets()
+
+	ks.scrapeNodes(nodes, timestamp)
+	ks.scrapeDeployments(deployments, timestamp)
+	ks.scrapeNamespaces(namespaces, timestamp)
+	ks.scrapePods(pods, timestamp)
+	ks.scrapePVCs(pvcs, timestamp)
+	ks.scrapePVs(pvs, timestamp)
+	ks.scrapeServices(services, timestamp)
+	ks.scrapeStatefulSets(statefulSets, timestamp)
+}
+
+func (ks *kubernetesScraper) scrapeNodes(nodes []*clustercache.Node, timestamp time.Time) {
+	for _, node := range nodes {
+		nodeInfo := map[string]string{
+			"node":        node.Name,
+			"provider_id": node.SpecProviderID,
+		}
+
+		// Node Capacity
+		if node.Status.Capacity != nil {
+			if quantity, ok := node.Status.Capacity[v1.ResourceCPU]; ok {
+				_, _, value := toResourceUnitValue(v1.ResourceCPU, quantity)
+				ks.collector.Update(KubeNodeStatusCapacityCPUCores, nodeInfo, value, &timestamp, nil)
+			}
+
+			if quantity, ok := node.Status.Capacity[v1.ResourceMemory]; ok {
+				_, _, value := toResourceUnitValue(v1.ResourceMemory, quantity)
+				ks.collector.Update(KubeNodeStatusCapacityMemoryBytes, nodeInfo, value, &timestamp, nil)
+			}
+		}
+
+		// Node Allocatable Resources
+		if node.Status.Allocatable != nil {
+			if quantity, ok := node.Status.Allocatable[v1.ResourceCPU]; ok {
+				_, _, value := toResourceUnitValue(v1.ResourceCPU, quantity)
+				ks.collector.Update(KubeNodeStatusAllocatableCPUCores, nodeInfo, value, &timestamp, nil)
+			}
+
+			if quantity, ok := node.Status.Allocatable[v1.ResourceMemory]; ok {
+				_, _, value := toResourceUnitValue(v1.ResourceMemory, quantity)
+				ks.collector.Update(KubeNodeStatusAllocatableMemoryBytes, nodeInfo, value, &timestamp, nil)
+			}
+		}
+
+		// node labels
+		labelNames, labelValues := promutil.KubeLabelsToLabels(node.Labels)
+		nodeLabels := toMap(labelNames, labelValues)
+
+		ks.collector.Update(KubeNodeLabels, nodeInfo, 0, &timestamp, nodeLabels)
+
+	}
+}
+
+func (ks *kubernetesScraper) scrapeDeployments(deployments []*clustercache.Deployment, timestamp time.Time) {
+	for _, deployment := range deployments {
+		deploymentInfo := map[string]string{
+			"deployment": deployment.Name,
+			"namespace":  deployment.Namespace,
+		}
+
+		// deployment labels
+		labelNames, labelValues := promutil.KubeLabelsToLabels(deployment.MatchLabels)
+		deploymentLabels := toMap(labelNames, labelValues)
+
+		ks.collector.Update(DeploymentMatchLabels, deploymentInfo, 0, &timestamp, deploymentLabels)
+
+	}
+}
+
+func (ks *kubernetesScraper) scrapeNamespaces(namespaces []*clustercache.Namespace, timestamp time.Time) {
+	for _, namespace := range namespaces {
+		namespaceInfo := map[string]string{
+			"namespace": namespace.Name,
+		}
+
+		// namespace labels
+		labelNames, labelValues := promutil.KubeLabelsToLabels(namespace.Labels)
+		namespaceLabels := toMap(labelNames, labelValues)
+		ks.collector.Update(KubeNamespaceLabels, namespaceInfo, 0, &timestamp, namespaceLabels)
+
+		// namespace annotations
+		annotationNames, annotationValues := promutil.KubeAnnotationsToLabels(namespace.Annotations)
+		namespaceAnnotations := toMap(annotationNames, annotationValues)
+		ks.collector.Update(KubeNamespaceAnnotations, namespaceInfo, 0, &timestamp, namespaceAnnotations)
+	}
+}
+
+func (ks *kubernetesScraper) scrapePods(pods []*clustercache.Pod, timestamp time.Time) {
+	for _, pod := range pods {
+		podInfo := map[string]string{
+			"name":      pod.Name,
+			"namespace": pod.Namespace,
+			"uid":       string(pod.UID),
+			"node":      pod.Spec.NodeName,
+		}
+
+		// pod labels
+		labelNames, labelValues := promutil.KubeLabelsToLabels(pod.Labels)
+		podLabels := toMap(labelNames, labelValues)
+		ks.collector.Update(KubePodLabels, podInfo, 0, &timestamp, podLabels)
+
+		// pod annotations
+		annotationNames, annotationValues := promutil.KubeAnnotationsToLabels(pod.Annotations)
+		podAnnotations := toMap(annotationNames, annotationValues)
+		ks.collector.Update(KubePodAnnotations, podInfo, 0, &timestamp, podAnnotations)
+
+		// Pod owner metric
+		for _, owner := range pod.OwnerReferences {
+			ownerInfo := maps.Clone(podInfo)
+			ownerInfo["owner_kind"] = owner.Kind
+			ownerInfo["owner_name"] = owner.Name
+			ownerInfo["owner_is_controller"] = fmt.Sprintf("%t", owner.Controller != nil)
+			ks.collector.Update(KubePodOwner, ownerInfo, 0, &timestamp, nil)
+		}
+
+		// Container Status
+		for _, status := range pod.Status.ContainerStatuses {
+			if status.State.Running != nil {
+				containerInfo := maps.Clone(podInfo)
+				containerInfo["container"] = status.Name
+				ks.collector.Update(KubePodContainerStatusRunning, containerInfo, 0, &timestamp, nil)
+			}
+		}
+
+		for _, container := range pod.Spec.Containers {
+			containerInfo := maps.Clone(podInfo)
+			containerInfo["container"] = container.Name
+			// Requests
+			if container.Resources.Requests != nil {
+				// sorting keys here for testing purposes
+				keys := maps.Keys(container.Resources.Requests)
+				slices.Sort(keys)
+				for _, resourceName := range keys {
+					quantity := container.Resources.Requests[resourceName]
+					resource, unit, value := toResourceUnitValue(resourceName, quantity)
+
+					// failed to parse the resource type
+					if resource == "" {
+						log.DedupedWarningf(5, "Failed to parse resource units and quantity for resource: %s", resourceName)
+						continue
+					}
+
+					resourceRequestInfo := maps.Clone(containerInfo)
+					resourceRequestInfo["resource"] = resource
+					resourceRequestInfo["unit"] = unit
+					ks.collector.Update(KubePodContainerResourceRequests, resourceRequestInfo, value, &timestamp, nil)
+				}
+			}
+		}
+	}
+}
+
+func (ks *kubernetesScraper) scrapePVCs(pvcs []*clustercache.PersistentVolumeClaim, timestamp time.Time) {
+	for _, pvc := range pvcs {
+		pvcInfo := map[string]string{
+			"name":         pvc.Name,
+			"namespace":    pvc.Namespace,
+			"volumename":   pvc.Spec.VolumeName,
+			"storageclass": getPersistentVolumeClaimClass(pvc),
+		}
+
+		ks.collector.Update(KubePersistenVolumeClaimInfo, pvcInfo, 0, &timestamp, nil)
+
+		if storage, ok := pvc.Spec.Resources.Requests[v1.ResourceStorage]; ok {
+			ks.collector.Update(KubePersistentVolumeClaimResourceRequestsStorageBytes, pvcInfo, float64(storage.Value()), &timestamp, nil)
+		}
+	}
+}
+
+func (ks *kubernetesScraper) scrapePVs(pvs []*clustercache.PersistentVolume, timestamp time.Time) {
+	for _, pv := range pvs {
+		providerID := pv.Name
+		// if a more accurate provider ID is available, use that
+		if pv.Spec.CSI != nil && pv.Spec.CSI.VolumeHandle != "" {
+			providerID = pv.Spec.CSI.VolumeHandle
+		}
+		pvInfo := map[string]string{
+			"name":         pv.Name,
+			"storageClass": pv.Spec.StorageClassName,
+			"providerID":   providerID,
+		}
+
+		ks.collector.Update(KubecostPVInfo, pvInfo, 0, &timestamp, nil)
+
+		if storage, ok := pv.Spec.Capacity[v1.ResourceStorage]; ok {
+			ks.collector.Update(KubePersistentVolumeCapacityBytes, pvInfo, float64(storage.Value()), &timestamp, nil)
+		}
+	}
+}
+
+func (ks *kubernetesScraper) scrapeServices(services []*clustercache.Service, timestamp time.Time) {
+	for _, service := range services {
+		serviceInfo := map[string]string{
+			"service":   service.Name,
+			"namespace": service.Namespace,
+		}
+
+		// service labels
+		labelNames, labelValues := promutil.KubeLabelsToLabels(service.SpecSelector)
+		serviceLabels := toMap(labelNames, labelValues)
+		ks.collector.Update(ServiceSelectorLabels, serviceInfo, 0, &timestamp, serviceLabels)
+
+	}
+}
+
+func (ks *kubernetesScraper) scrapeStatefulSets(statefulSets []*clustercache.StatefulSet, timestamp time.Time) {
+	for _, statefulSet := range statefulSets {
+		statefulSetInfo := map[string]string{
+			"name":      statefulSet.Name,
+			"namespace": statefulSet.Namespace,
+		}
+
+		// statefulSet labels
+		labelNames, labelValues := promutil.KubeLabelsToLabels(statefulSet.SpecSelector.MatchLabels)
+		statefulSetLabels := toMap(labelNames, labelValues)
+		ks.collector.Update(StatefulSetMatchLabels, statefulSetInfo, 0, &timestamp, statefulSetLabels)
+
+	}
+}
+
+// getPersistentVolumeClaimClass returns StorageClassName. If no storage class was
+// requested, it returns "".
+func getPersistentVolumeClaimClass(claim *clustercache.PersistentVolumeClaim) string {
+	// Use beta annotation first
+	if class, found := claim.Annotations[v1.BetaStorageClassAnnotation]; found {
+		return class
+	}
+
+	if claim.Spec.StorageClassName != nil {
+		return *claim.Spec.StorageClassName
+	}
+
+	// Special non-empty string to indicate absence of storage class.
+	return ""
+}
+
+// toResourceUnitValue accepts a resource name and quantity and returns the sanitized resource, the unit, and the value in the units.
+// Returns an empty string for resource and unit if there was a failure.
+func toResourceUnitValue(resourceName v1.ResourceName, quantity resource.Quantity) (resource string, unit string, value float64) {
+	resource = promutil.SanitizeLabelName(string(resourceName))
+
+	switch resourceName {
+	case v1.ResourceCPU:
+		unit = "core"
+		value = float64(quantity.MilliValue()) / 1000
+		return
+
+	case v1.ResourceStorage:
+		fallthrough
+	case v1.ResourceEphemeralStorage:
+		fallthrough
+	case v1.ResourceMemory:
+		unit = "byte"
+		value = float64(quantity.Value())
+		return
+	case v1.ResourcePods:
+		unit = "integer"
+		value = float64(quantity.Value())
+		return
+	default:
+		if isHugePageResourceName(resourceName) || isAttachableVolumeResourceName(resourceName) {
+			unit = "byte"
+			value = float64(quantity.Value())
+			return
+		}
+
+		if isExtendedResourceName(resourceName) {
+			unit = "integer"
+			value = float64(quantity.Value())
+			return
+		}
+	}
+
+	resource = ""
+	unit = ""
+	value = 0.0
+	return
+}
+
+// isHugePageResourceName checks for a huge page container resource name
+func isHugePageResourceName(name v1.ResourceName) bool {
+	return strings.HasPrefix(string(name), v1.ResourceHugePagesPrefix)
+}
+
+// isAttachableVolumeResourceName checks for attached volume container resource name
+func isAttachableVolumeResourceName(name v1.ResourceName) bool {
+	return strings.HasPrefix(string(name), v1.ResourceAttachableVolumesPrefix)
+}
+
+// isExtendedResourceName checks for extended container resource name
+func isExtendedResourceName(name v1.ResourceName) bool {
+	if isNativeResource(name) || strings.HasPrefix(string(name), v1.DefaultResourceRequestsPrefix) {
+		return false
+	}
+	// Ensure it satisfies the rules in IsQualifiedName() after converted into quota resource name
+	nameForQuota := fmt.Sprintf("%s%s", v1.DefaultResourceRequestsPrefix, string(name))
+	if errs := validation.IsQualifiedName(nameForQuota); len(errs) != 0 {
+		return false
+	}
+	return true
+}
+
+// isNativeResource checks for a kubernetes.io/ prefixed resource name
+func isNativeResource(name v1.ResourceName) bool {
+	return !strings.Contains(string(name), "/") || isPrefixedNativeResource(name)
+}
+
+func isPrefixedNativeResource(name v1.ResourceName) bool {
+	return strings.Contains(string(name), v1.ResourceDefaultNamespacePrefix)
+}

+ 796 - 0
modules/collector-source/pkg/collector/scraper_test.go

@@ -0,0 +1,796 @@
+package collector
+
+import (
+	"testing"
+	"time"
+
+	"github.com/opencost/opencost/pkg/clustercache"
+	v1 "k8s.io/api/core/v1"
+	"k8s.io/apimachinery/pkg/api/resource"
+	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+)
+
+func Test_kubernetesScraper_scrapeNodes(t *testing.T) {
+
+	start1, _ := time.Parse(time.RFC3339, start1Str)
+
+	type scrape struct {
+		nodes     []*clustercache.Node
+		timestamp time.Time
+	}
+	tests := []struct {
+		name     string
+		scrapes  []scrape
+		expected []UpdateArgs
+	}{
+		{
+			name: "simple",
+			scrapes: []scrape{
+				{
+					nodes: []*clustercache.Node{
+						{
+							Name:           "node1",
+							SpecProviderID: "i-1",
+							Status: v1.NodeStatus{
+								Capacity: v1.ResourceList{
+									v1.ResourceCPU:    resource.MustParse("2"),
+									v1.ResourceMemory: resource.MustParse("2048"),
+								},
+								Allocatable: v1.ResourceList{
+									v1.ResourceCPU:    resource.MustParse("1"),
+									v1.ResourceMemory: resource.MustParse("1024"),
+								},
+							},
+							Labels: map[string]string{
+								"test1": "blah",
+								"test2": "blah2",
+							},
+						},
+					},
+					timestamp: start1,
+				},
+			},
+			expected: []UpdateArgs{
+				{
+					metricName: KubeNodeStatusCapacityCPUCores,
+					labels: map[string]string{
+						"node":        "node1",
+						"provider_id": "i-1",
+					},
+					value:                 2.0,
+					timestamp:             &start1,
+					additionalInformation: nil,
+				},
+				{
+					metricName: KubeNodeStatusCapacityMemoryBytes,
+					labels: map[string]string{
+						"node":        "node1",
+						"provider_id": "i-1",
+					},
+					value:                 2048.0,
+					timestamp:             &start1,
+					additionalInformation: nil,
+				},
+				{
+					metricName: KubeNodeStatusAllocatableCPUCores,
+					labels: map[string]string{
+						"node":        "node1",
+						"provider_id": "i-1",
+					},
+					value:                 1.0,
+					timestamp:             &start1,
+					additionalInformation: nil,
+				},
+				{
+					metricName: KubeNodeStatusAllocatableMemoryBytes,
+					labels: map[string]string{
+						"node":        "node1",
+						"provider_id": "i-1",
+					},
+					value:                 1024.0,
+					timestamp:             &start1,
+					additionalInformation: nil,
+				},
+				{
+					metricName: KubeNodeLabels,
+					labels: map[string]string{
+						"node":        "node1",
+						"provider_id": "i-1",
+					},
+					value:     0,
+					timestamp: &start1,
+					additionalInformation: map[string]string{
+						"label_test1": "blah",
+						"label_test2": "blah2",
+					},
+				},
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			updateRecorder := UpdateRecorderCollector{}
+			ks := &kubernetesScraper{
+				collector: &updateRecorder,
+			}
+			for _, s := range tt.scrapes {
+				ks.scrapeNodes(s.nodes, s.timestamp)
+			}
+
+			if len(updateRecorder.updateArgs) != len(tt.expected) {
+				t.Errorf("Expected result length of %d, got %d", len(tt.expected), len(updateRecorder.updateArgs))
+			}
+
+			for i, expected := range tt.expected {
+				updateArg := updateRecorder.updateArgs[i]
+				err := expected.equals(updateArg)
+				if err != nil {
+					t.Errorf("Result did not match expected at index %d: %s", i, err.Error())
+				}
+			}
+		})
+	}
+}
+
+func Test_kubernetesScraper_scrapeDeployments(t *testing.T) {
+
+	start1, _ := time.Parse(time.RFC3339, start1Str)
+
+	type scrape struct {
+		deployments []*clustercache.Deployment
+		timestamp   time.Time
+	}
+	tests := []struct {
+		name     string
+		scrapes  []scrape
+		expected []UpdateArgs
+	}{
+		{
+			name: "simple",
+			scrapes: []scrape{
+				{
+					deployments: []*clustercache.Deployment{
+						{
+							Name:      "deployment1",
+							Namespace: "namespace1",
+							MatchLabels: map[string]string{
+								"test1": "blah",
+								"test2": "blah2",
+							},
+						},
+					},
+					timestamp: start1,
+				},
+			},
+			expected: []UpdateArgs{
+
+				{
+					metricName: DeploymentMatchLabels,
+					labels: map[string]string{
+						"deployment": "deployment1",
+						"namespace":  "namespace1",
+					},
+					value:     0,
+					timestamp: &start1,
+					additionalInformation: map[string]string{
+						"label_test1": "blah",
+						"label_test2": "blah2",
+					},
+				},
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			updateRecorder := UpdateRecorderCollector{}
+			ks := &kubernetesScraper{
+				collector: &updateRecorder,
+			}
+			for _, s := range tt.scrapes {
+				ks.scrapeDeployments(s.deployments, s.timestamp)
+			}
+
+			if len(updateRecorder.updateArgs) != len(tt.expected) {
+				t.Errorf("Expected result length of %d, got %d", len(tt.expected), len(updateRecorder.updateArgs))
+			}
+
+			for i, expected := range tt.expected {
+				updateArg := updateRecorder.updateArgs[i]
+				err := expected.equals(updateArg)
+				if err != nil {
+					t.Errorf("Result did not match expected at index %d: %s", i, err.Error())
+				}
+			}
+		})
+	}
+}
+
+func Test_kubernetesScraper_scrapeNamespaces(t *testing.T) {
+
+	start1, _ := time.Parse(time.RFC3339, start1Str)
+
+	type scrape struct {
+		namespaces []*clustercache.Namespace
+		timestamp  time.Time
+	}
+	tests := []struct {
+		name     string
+		scrapes  []scrape
+		expected []UpdateArgs
+	}{
+		{
+			name: "simple",
+			scrapes: []scrape{
+				{
+					namespaces: []*clustercache.Namespace{
+						{
+							Name: "namespace1",
+							Labels: map[string]string{
+								"test1": "blah",
+								"test2": "blah2",
+							},
+							Annotations: map[string]string{
+								"test3": "blah3",
+								"test4": "blah4",
+							},
+						},
+					},
+					timestamp: start1,
+				},
+			},
+			expected: []UpdateArgs{
+				{
+					metricName: KubeNamespaceLabels,
+					labels: map[string]string{
+						"namespace": "namespace1",
+					},
+					value:     0,
+					timestamp: &start1,
+					additionalInformation: map[string]string{
+						"label_test1": "blah",
+						"label_test2": "blah2",
+					},
+				},
+				{
+					metricName: KubeNamespaceAnnotations,
+					labels: map[string]string{
+						"namespace": "namespace1",
+					},
+					value:     0,
+					timestamp: &start1,
+					additionalInformation: map[string]string{
+						"annotation_test3": "blah3",
+						"annotation_test4": "blah4",
+					},
+				},
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			updateRecorder := UpdateRecorderCollector{}
+			ks := &kubernetesScraper{
+				collector: &updateRecorder,
+			}
+			for _, s := range tt.scrapes {
+				ks.scrapeNamespaces(s.namespaces, s.timestamp)
+			}
+
+			if len(updateRecorder.updateArgs) != len(tt.expected) {
+				t.Errorf("Expected result length of %d, got %d", len(tt.expected), len(updateRecorder.updateArgs))
+			}
+
+			for i, expected := range tt.expected {
+				updateArg := updateRecorder.updateArgs[i]
+				err := expected.equals(updateArg)
+				if err != nil {
+					t.Errorf("Result did not match expected at index %d: %s", i, err.Error())
+				}
+			}
+		})
+	}
+}
+
+func Test_kubernetesScraper_scrapePods(t *testing.T) {
+
+	start1, _ := time.Parse(time.RFC3339, start1Str)
+
+	type scrape struct {
+		pods      []*clustercache.Pod
+		timestamp time.Time
+	}
+	tests := []struct {
+		name     string
+		scrapes  []scrape
+		expected []UpdateArgs
+	}{
+		{
+			name: "simple",
+			scrapes: []scrape{
+				{
+					pods: []*clustercache.Pod{
+						{
+							Name:      "pod1",
+							Namespace: "namespace1",
+							UID:       "uuid1",
+							Spec: clustercache.PodSpec{
+								NodeName: "node1",
+								Containers: []clustercache.Container{
+									{
+										Name: "container1",
+										Resources: v1.ResourceRequirements{
+											Requests: map[v1.ResourceName]resource.Quantity{
+												v1.ResourceCPU:    resource.MustParse("500m"),
+												v1.ResourceMemory: resource.MustParse("512"),
+											},
+										},
+									},
+								},
+							},
+							Labels: map[string]string{
+								"test1": "blah",
+								"test2": "blah2",
+							},
+							Annotations: map[string]string{
+								"test3": "blah3",
+								"test4": "blah4",
+							},
+							OwnerReferences: []metav1.OwnerReference{
+								{
+									Kind:       "deployment",
+									Name:       "deployment1",
+									Controller: nil,
+								},
+							},
+							Status: clustercache.PodStatus{
+								ContainerStatuses: []v1.ContainerStatus{
+									{
+										Name: "container1",
+										State: v1.ContainerState{
+											Running: &v1.ContainerStateRunning{},
+										},
+									},
+								},
+							},
+						},
+					},
+					timestamp: start1,
+				},
+			},
+			expected: []UpdateArgs{
+				{
+					metricName: KubePodLabels,
+					labels: map[string]string{
+						"name":      "pod1",
+						"namespace": "namespace1",
+						"uid":       "uuid1",
+						"node":      "node1",
+					},
+					value:     0,
+					timestamp: &start1,
+					additionalInformation: map[string]string{
+						"label_test1": "blah",
+						"label_test2": "blah2",
+					},
+				},
+				{
+					metricName: KubePodAnnotations,
+					labels: map[string]string{
+						"name":      "pod1",
+						"namespace": "namespace1",
+						"uid":       "uuid1",
+						"node":      "node1",
+					},
+					value:     0,
+					timestamp: &start1,
+					additionalInformation: map[string]string{
+						"annotation_test3": "blah3",
+						"annotation_test4": "blah4",
+					},
+				},
+				{
+					metricName: KubePodOwner,
+					labels: map[string]string{
+						"name":                "pod1",
+						"namespace":           "namespace1",
+						"uid":                 "uuid1",
+						"node":                "node1",
+						"owner_kind":          "deployment",
+						"owner_name":          "deployment1",
+						"owner_is_controller": "false",
+					},
+					value:                 0,
+					timestamp:             &start1,
+					additionalInformation: nil,
+				},
+				{
+					metricName: KubePodContainerStatusRunning,
+					labels: map[string]string{
+						"name":      "pod1",
+						"namespace": "namespace1",
+						"uid":       "uuid1",
+						"node":      "node1",
+						"container": "container1",
+					},
+					value:                 0,
+					timestamp:             &start1,
+					additionalInformation: nil,
+				},
+				{
+					metricName: KubePodContainerResourceRequests,
+					labels: map[string]string{
+						"name":      "pod1",
+						"namespace": "namespace1",
+						"uid":       "uuid1",
+						"node":      "node1",
+						"container": "container1",
+						"resource":  "cpu",
+						"unit":      "core",
+					},
+					value:                 0.5,
+					timestamp:             &start1,
+					additionalInformation: nil,
+				},
+				{
+					metricName: KubePodContainerResourceRequests,
+					labels: map[string]string{
+						"name":      "pod1",
+						"namespace": "namespace1",
+						"uid":       "uuid1",
+						"node":      "node1",
+						"container": "container1",
+						"resource":  "memory",
+						"unit":      "byte",
+					},
+					value:                 512,
+					timestamp:             &start1,
+					additionalInformation: nil,
+				},
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			updateRecorder := UpdateRecorderCollector{}
+			ks := &kubernetesScraper{
+				collector: &updateRecorder,
+			}
+			for _, s := range tt.scrapes {
+				ks.scrapePods(s.pods, s.timestamp)
+			}
+
+			if len(updateRecorder.updateArgs) != len(tt.expected) {
+				t.Errorf("Expected result length of %d, got %d", len(tt.expected), len(updateRecorder.updateArgs))
+			}
+
+			for i, expected := range tt.expected {
+				updateArg := updateRecorder.updateArgs[i]
+				err := expected.equals(updateArg)
+				if err != nil {
+					t.Errorf("Result did not match expected at index %d: %s", i, err.Error())
+				}
+			}
+		})
+	}
+}
+
+func Test_kubernetesScraper_scrapePVCs(t *testing.T) {
+
+	start1, _ := time.Parse(time.RFC3339, start1Str)
+
+	type scrape struct {
+		pvcs      []*clustercache.PersistentVolumeClaim
+		timestamp time.Time
+	}
+	tests := []struct {
+		name     string
+		scrapes  []scrape
+		expected []UpdateArgs
+	}{
+		{
+			name: "simple",
+			scrapes: []scrape{
+				{
+					pvcs: []*clustercache.PersistentVolumeClaim{
+						{
+							Name:      "pvc1",
+							Namespace: "namespace1",
+							Spec: v1.PersistentVolumeClaimSpec{
+								VolumeName:       "vol1",
+								StorageClassName: ptr("storageClass1"),
+								Resources: v1.VolumeResourceRequirements{
+									Requests: v1.ResourceList{
+										v1.ResourceStorage: resource.MustParse("4096"),
+									},
+								},
+							},
+						},
+					},
+					timestamp: start1,
+				},
+			},
+			expected: []UpdateArgs{
+				{
+					metricName: KubePersistenVolumeClaimInfo,
+					labels: map[string]string{
+						"name":         "pvc1",
+						"namespace":    "namespace1",
+						"volumename":   "vol1",
+						"storageclass": "storageClass1",
+					},
+					value:                 0,
+					timestamp:             &start1,
+					additionalInformation: nil,
+				},
+				{
+					metricName: KubePersistentVolumeClaimResourceRequestsStorageBytes,
+					labels: map[string]string{
+						"name":         "pvc1",
+						"namespace":    "namespace1",
+						"volumename":   "vol1",
+						"storageclass": "storageClass1",
+					},
+					value:                 4096,
+					timestamp:             &start1,
+					additionalInformation: nil,
+				},
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			updateRecorder := UpdateRecorderCollector{}
+			ks := &kubernetesScraper{
+				collector: &updateRecorder,
+			}
+			for _, s := range tt.scrapes {
+				ks.scrapePVCs(s.pvcs, s.timestamp)
+			}
+
+			if len(updateRecorder.updateArgs) != len(tt.expected) {
+				t.Errorf("Expected result length of %d, got %d", len(tt.expected), len(updateRecorder.updateArgs))
+			}
+
+			for i, expected := range tt.expected {
+				updateArg := updateRecorder.updateArgs[i]
+				err := expected.equals(updateArg)
+				if err != nil {
+					t.Errorf("Result did not match expected at index %d: %s", i, err.Error())
+				}
+			}
+		})
+	}
+}
+
+func Test_kubernetesScraper_scrapePVs(t *testing.T) {
+
+	start1, _ := time.Parse(time.RFC3339, start1Str)
+
+	type scrape struct {
+		pvs       []*clustercache.PersistentVolume
+		timestamp time.Time
+	}
+	tests := []struct {
+		name     string
+		scrapes  []scrape
+		expected []UpdateArgs
+	}{
+		{
+			name: "simple",
+			scrapes: []scrape{
+				{
+					pvs: []*clustercache.PersistentVolume{
+						{
+							Name: "pv1",
+							Spec: v1.PersistentVolumeSpec{
+								StorageClassName: "storageClass1",
+								PersistentVolumeSource: v1.PersistentVolumeSource{
+									CSI: &v1.CSIPersistentVolumeSource{
+										VolumeHandle: "vol-1",
+									},
+								},
+								Capacity: v1.ResourceList{
+									v1.ResourceStorage: resource.MustParse("4096"),
+								},
+							},
+						},
+					},
+					timestamp: start1,
+				},
+			},
+			expected: []UpdateArgs{
+				{
+					metricName: KubecostPVInfo,
+					labels: map[string]string{
+						"name":         "pv1",
+						"providerID":   "vol-1",
+						"storageClass": "storageClass1",
+					},
+					value:                 0,
+					timestamp:             &start1,
+					additionalInformation: nil,
+				},
+				{
+					metricName: KubePersistentVolumeCapacityBytes,
+					labels: map[string]string{
+						"name":         "pv1",
+						"providerID":   "vol-1",
+						"storageClass": "storageClass1",
+					},
+					value:                 4096,
+					timestamp:             &start1,
+					additionalInformation: nil,
+				},
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			updateRecorder := UpdateRecorderCollector{}
+			ks := &kubernetesScraper{
+				collector: &updateRecorder,
+			}
+			for _, s := range tt.scrapes {
+				ks.scrapePVs(s.pvs, s.timestamp)
+			}
+
+			if len(updateRecorder.updateArgs) != len(tt.expected) {
+				t.Errorf("Expected result length of %d, got %d", len(tt.expected), len(updateRecorder.updateArgs))
+			}
+
+			for i, expected := range tt.expected {
+				updateArg := updateRecorder.updateArgs[i]
+				err := expected.equals(updateArg)
+				if err != nil {
+					t.Errorf("Result did not match expected at index %d: %s", i, err.Error())
+				}
+			}
+		})
+	}
+}
+
+func Test_kubernetesScraper_scrapeServices(t *testing.T) {
+
+	start1, _ := time.Parse(time.RFC3339, start1Str)
+
+	type scrape struct {
+		services  []*clustercache.Service
+		timestamp time.Time
+	}
+	tests := []struct {
+		name     string
+		scrapes  []scrape
+		expected []UpdateArgs
+	}{
+		{
+			name: "simple",
+			scrapes: []scrape{
+				{
+					services: []*clustercache.Service{
+						{
+							Name:      "service1",
+							Namespace: "namespace1",
+							SpecSelector: map[string]string{
+								"test1": "blah",
+								"test2": "blah2",
+							},
+						},
+					},
+					timestamp: start1,
+				},
+			},
+			expected: []UpdateArgs{
+				{
+					metricName: ServiceSelectorLabels,
+					labels: map[string]string{
+						"service":   "service1",
+						"namespace": "namespace1",
+					},
+					value:     0,
+					timestamp: &start1,
+					additionalInformation: map[string]string{
+						"label_test1": "blah",
+						"label_test2": "blah2",
+					},
+				},
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			updateRecorder := UpdateRecorderCollector{}
+			ks := &kubernetesScraper{
+				collector: &updateRecorder,
+			}
+			for _, s := range tt.scrapes {
+				ks.scrapeServices(s.services, s.timestamp)
+			}
+
+			if len(updateRecorder.updateArgs) != len(tt.expected) {
+				t.Errorf("Expected result length of %d, got %d", len(tt.expected), len(updateRecorder.updateArgs))
+			}
+
+			for i, expected := range tt.expected {
+				updateArg := updateRecorder.updateArgs[i]
+				err := expected.equals(updateArg)
+				if err != nil {
+					t.Errorf("Result did not match expected at index %d: %s", i, err.Error())
+				}
+			}
+		})
+	}
+}
+
+func Test_kubernetesScraper_scrapeStatefulSets(t *testing.T) {
+
+	start1, _ := time.Parse(time.RFC3339, start1Str)
+
+	type scrape struct {
+		statefulSets []*clustercache.StatefulSet
+		timestamp    time.Time
+	}
+	tests := []struct {
+		name     string
+		scrapes  []scrape
+		expected []UpdateArgs
+	}{
+		{
+			name: "simple",
+			scrapes: []scrape{
+				{
+					statefulSets: []*clustercache.StatefulSet{
+						{
+							Name:      "statefulSet1",
+							Namespace: "namespace1",
+							SpecSelector: &metav1.LabelSelector{
+								MatchLabels: map[string]string{
+									"test1": "blah",
+									"test2": "blah2",
+								},
+							},
+						},
+					},
+					timestamp: start1,
+				},
+			},
+			expected: []UpdateArgs{
+				{
+					metricName: StatefulSetMatchLabels,
+					labels: map[string]string{
+						"name":      "statefulSet1",
+						"namespace": "namespace1",
+					},
+					value:     0,
+					timestamp: &start1,
+					additionalInformation: map[string]string{
+						"label_test1": "blah",
+						"label_test2": "blah2",
+					},
+				},
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			updateRecorder := UpdateRecorderCollector{}
+			ks := &kubernetesScraper{
+				collector: &updateRecorder,
+			}
+			for _, s := range tt.scrapes {
+				ks.scrapeStatefulSets(s.statefulSets, s.timestamp)
+			}
+
+			if len(updateRecorder.updateArgs) != len(tt.expected) {
+				t.Errorf("Expected result length of %d, got %d", len(tt.expected), len(updateRecorder.updateArgs))
+			}
+
+			for i, expected := range tt.expected {
+				updateArg := updateRecorder.updateArgs[i]
+				err := expected.equals(updateArg)
+				if err != nil {
+					t.Errorf("Result did not match expected at index %d: %s", i, err.Error())
+				}
+			}
+		})
+	}
+}

+ 32 - 0
modules/collector-source/pkg/collector/targetscraper.go

@@ -0,0 +1,32 @@
+package collector
+
+import (
+	"github.com/opencost/opencost/core/pkg/log"
+	"github.com/opencost/opencost/modules/collector-source/pkg/metrics/parser"
+	"github.com/opencost/opencost/modules/collector-source/pkg/metrics/target"
+)
+
+type TargetScraper struct {
+	targetProvider target.TargetProvider
+	collector      MetricsCollector
+}
+
+func (s *TargetScraper) Scrape() {
+	targets := s.targetProvider.GetTargets()
+	for _, target := range targets {
+		f, err := target.Load()
+		if err != nil {
+			log.Errorf("failed to scrape target: %s", err.Error())
+			continue
+		}
+		results, err := parser.Parse(f)
+		if err != nil {
+			log.Errorf("failed to parse target: %s", err.Error())
+			continue
+		}
+
+		for _, result := range results {
+			s.collector.Update(result.Name, result.Labels, result.Value, result.Timestamp, nil)
+		}
+	}
+}

+ 116 - 0
modules/collector-source/pkg/collector/targetscraper_test.go

@@ -0,0 +1,116 @@
+package collector
+
+import (
+	"testing"
+
+	"github.com/opencost/opencost/modules/collector-source/pkg/metrics/target"
+)
+
+const networkScape = `
+# HELP kubecost_pod_network_egress_bytes kubecost_pod_network_egress_bytes_total egressed byte counts by pod.
+# TYPE kubecost_pod_network_egress_bytes counter
+kubecost_pod_network_egress_bytes_total{pod_name="pod1",namespace="namespace1",internet="false",same_region="true",same_zone="true",service="service1"} 3127969647
+kubecost_pod_network_egress_bytes_total{pod_name="pod2",namespace="namespace1",internet="true",same_region="false",same_zone="false",service=""} 335188219
+# HELP kubecost_pod_network_ingress_bytes kubecost_pod_network_ingress_bytes_total ingressed byte counts by pod.
+# TYPE kubecost_pod_network_ingress_bytes counter
+kubecost_pod_network_ingress_bytes_total{pod_name="pod1",namespace="namespace1",internet="true",same_region="false",same_zone="false",service="service1"} 17941460
+kubecost_pod_network_ingress_bytes_total{pod_name="pod2",namespace="namespace1",internet="false",same_region="true",same_zone="false",service=""} 13948766
+# HELP kubecost_network_costs_parsed_entries kubecost_network_costs_parsed_entries total parsed conntrack entries.
+# TYPE kubecost_network_costs_parsed_entries gauge
+# HELP kubecost_network_costs_parse_time kubecost_network_costs_parse_time total time in milliseconds it took to parse conntrack entries.
+# TYPE kubecost_network_costs_parse_time gauge
+# EOF
+`
+
+func TestTargetScraper_Scrape(t *testing.T) {
+	tests := []struct {
+		name     string
+		target   target.ScrapeTarget
+		expected []UpdateArgs
+	}{
+		{
+			name:   "Network Scrape",
+			target: target.NewStringTarget(networkScape),
+			expected: []UpdateArgs{
+				{
+					metricName: KubecostPodNetworkEgressBytesTotal,
+					labels: map[string]string{
+						"pod_name":    "pod1",
+						"namespace":   "namespace1",
+						"internet":    "false",
+						"same_region": "true",
+						"same_zone":   "true",
+						"service":     "service1",
+					},
+					value:                 3127969647,
+					timestamp:             nil,
+					additionalInformation: nil,
+				},
+				{
+					metricName: KubecostPodNetworkEgressBytesTotal,
+					labels: map[string]string{
+						"pod_name":    "pod2",
+						"namespace":   "namespace1",
+						"internet":    "true",
+						"same_region": "false",
+						"same_zone":   "false",
+						"service":     "",
+					},
+					value:                 335188219,
+					timestamp:             nil,
+					additionalInformation: nil,
+				},
+				{
+					metricName: "kubecost_pod_network_ingress_bytes_total",
+					labels: map[string]string{
+						"pod_name":    "pod1",
+						"namespace":   "namespace1",
+						"internet":    "true",
+						"same_region": "false",
+						"same_zone":   "false",
+						"service":     "service1",
+					},
+					value:                 17941460,
+					timestamp:             nil,
+					additionalInformation: nil,
+				},
+				{
+					metricName: "kubecost_pod_network_ingress_bytes_total",
+					labels: map[string]string{
+						"pod_name":    "pod2",
+						"namespace":   "namespace1",
+						"internet":    "false",
+						"same_region": "true",
+						"same_zone":   "false",
+						"service":     "",
+					},
+					value:                 13948766,
+					timestamp:             nil,
+					additionalInformation: nil,
+				},
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			updateRecorder := UpdateRecorderCollector{}
+			scrapper := &TargetScraper{
+				targetProvider: NewMockTargetProvider(tt.target),
+				collector:      &updateRecorder,
+			}
+			scrapper.Scrape()
+
+			if len(updateRecorder.updateArgs) != len(tt.expected) {
+				t.Errorf("Expected result length of %d, got %d", len(tt.expected), len(updateRecorder.updateArgs))
+			}
+
+			for i, expected := range tt.expected {
+				updateArg := updateRecorder.updateArgs[i]
+				err := expected.equals(updateArg)
+				if err != nil {
+					t.Errorf("Result did not match expected at index %d: %s", i, err.Error())
+				}
+			}
+		})
+	}
+}

+ 20 - 0
modules/collector-source/pkg/metrics/target/stringtarget.go

@@ -0,0 +1,20 @@
+package target
+
+import (
+	"io"
+	"strings"
+)
+
+type StringTarget struct {
+	raw string
+}
+
+func NewStringTarget(raw string) *StringTarget {
+	return &StringTarget{
+		raw: raw,
+	}
+}
+
+func (t *StringTarget) Load() (io.Reader, error) {
+	return strings.NewReader(t.raw), nil
+}

+ 4 - 0
modules/collector-source/pkg/metrics/target/target.go

@@ -7,3 +7,7 @@ import "io"
 type ScrapeTarget interface {
 	Load() (io.Reader, error)
 }
+
+type TargetProvider interface {
+	GetTargets() []ScrapeTarget
+}

+ 11 - 1
pkg/env/costmodelenv.go

@@ -9,7 +9,8 @@ import (
 )
 
 const (
-	APIPortEnvVar = "API_PORT"
+	APIPortEnvVar          = "API_PORT"
+	NetworkCostsPortEnvVar = "NETWORK_COSTS_PORT"
 
 	AWSAccessKeyIDEnvVar     = "AWS_ACCESS_KEY_ID"
 	AWSAccessKeySecretEnvVar = "AWS_SECRET_ACCESS_KEY"
@@ -23,6 +24,7 @@ const (
 	AzureBillingAccountEnvVar            = "AZURE_BILLING_ACCOUNT"
 	AzureDownloadBillingDataToDiskEnvVar = "AZURE_DOWNLOAD_BILLING_DATA_TO_DISK"
 
+	ReleaseNameEnvVar              = "RELEASE_NAME"
 	KubecostNamespaceEnvVar        = "KUBECOST_NAMESPACE"
 	PodNameEnvVar                  = "POD_NAME"
 	ClusterIDEnvVar                = "CLUSTER_ID"
@@ -531,3 +533,11 @@ func GetExportBucketConfigFile() string {
 func GetUseCacheV1() bool {
 	return env.GetBool(UseCacheV1, false)
 }
+
+func GetReleaseName() string {
+	return env.Get(ReleaseNameEnvVar, "kubecost")
+}
+
+func GetNetworkCostsPort() int {
+	return env.GetInt(NetworkCostsPortEnvVar, 3001)
+}