Просмотр исходного кода

Sth/kcm 5348 (#3744)

Signed-off-by: Sean Holcomb <seanholcomb@gmail.com>
Sean Holcomb 1 неделя назад
Родитель
Сommit
84a5cc8f36

+ 1 - 1
core/pkg/exporter/controller.go

@@ -277,7 +277,7 @@ func (g *ComputeExportControllerGroup[T]) Name() string {
 
 func (g *ComputeExportControllerGroup[T]) Start(interval time.Duration) bool {
 	if len(g.controllers) == 0 {
-		log.Warnf("ComputeExportControllerGroup[%s] has no controllers to start", typeutil.TypeOf[T]())
+		log.Debugf("ComputeExportControllerGroup[%s] has no controllers to start", typeutil.TypeOf[T]())
 		return false
 	}
 

+ 2 - 10
core/pkg/exporter/pathing/bingenpath.go

@@ -7,14 +7,12 @@ import (
 
 	"github.com/opencost/opencost/core/pkg/exporter/pathing/pathutils"
 	"github.com/opencost/opencost/core/pkg/opencost"
-	"github.com/opencost/opencost/core/pkg/pipelines"
 	"github.com/opencost/opencost/core/pkg/util/timeutil"
 )
 
 const (
-	DefaultRootDir   string = "federated"
-	BaseStorageDir   string = "etl/bingen"
-	FinOpsAgentAppID string = "finops-agent"
+	DefaultRootDir string = "federated"
+	BaseStorageDir string = "etl/bingen"
 )
 
 // BingenStoragePathFormatter is an implementation of the StoragePathFormatter interface for
@@ -34,12 +32,6 @@ func NewDefaultStoragePathFormatter(clusterId, pipeline string, resolution *time
 		res = timeutil.FormatStoreResolution(*resolution)
 	}
 
-	// KubeModel uses a distinct pathing pattern which breaks with the original
-	// Allocations and Assets bingen pathing.
-	if pipeline == pipelines.KubeModelPipelineName {
-		return NewKubeModelStoragePathFormatter(FinOpsAgentAppID, clusterId, res)
-	}
-
 	return NewBingenStoragePathFormatter(DefaultRootDir, clusterId, pipeline, res)
 }
 

+ 6 - 2
core/pkg/opencost/exporter/controllers.go

@@ -31,6 +31,7 @@ type ComputePipelineSource interface {
 // PipelinesExportConfig is a configuration struct that contains the export resolutions for
 // allocation, assets, and network insights pipelines.
 type PipelinesExportConfig struct {
+	AppName                           string
 	ClusterUID                        string
 	ClusterName                       string
 	AllocationPiplineResolutions      []time.Duration
@@ -50,8 +51,9 @@ func defaultPipelineExportResolutions() []time.Duration {
 
 // NewPipelinesExportConfig returns the default export configuration for all pipelines
 // which is set to export hourly and daily for allocations, assets, and network insights.
-func NewPipelinesExportConfig(clusterUID, clusterName string) PipelinesExportConfig {
+func NewPipelinesExportConfig(appName, clusterUID, clusterName string) PipelinesExportConfig {
 	return PipelinesExportConfig{
+		AppName:                           appName,
 		ClusterUID:                        clusterUID,
 		ClusterName:                       clusterName,
 		AllocationPiplineResolutions:      defaultPipelineExportResolutions(),
@@ -150,7 +152,7 @@ func NewPipelineExportControllers(store storage.Storage, cm ComputePipelineSourc
 			continue
 		}
 
-		kubeModelController, err := NewComputePipelineExportController(config.ClusterUID, store, kubeModelSource, res)
+		kubeModelController, err := NewKubeModelComputePipelineExportController(config.AppName, config.ClusterUID, store, kubeModelSource, res)
 		if err != nil {
 			log.Errorf("Failed to create KubeModel export controller for resolution: %s - %v", timeutil.DurationString(res), err)
 			continue
@@ -171,10 +173,12 @@ func (pec *PipelineExportControllers) Start(interval time.Duration) {
 	pec.AllocationExportController.Start(interval)
 	pec.AssetExportController.Start(interval)
 	pec.NetworkInsightExportController.Start(interval)
+	pec.KubeModelExportController.Start(interval)
 }
 
 func (pec *PipelineExportControllers) Stop() {
 	pec.AllocationExportController.Stop()
 	pec.AssetExportController.Stop()
 	pec.NetworkInsightExportController.Stop()
+	pec.KubeModelExportController.Stop()
 }

+ 30 - 24
core/pkg/opencost/exporter/exporter_test.go

@@ -14,11 +14,14 @@ import (
 	"github.com/opencost/opencost/core/pkg/pipelines"
 	"github.com/opencost/opencost/core/pkg/source"
 	"github.com/opencost/opencost/core/pkg/storage"
+	"github.com/opencost/opencost/core/pkg/util/timeutil"
 )
 
 const (
-	TestClusterId  = "test-cluster"
-	TestResolution = 24 * time.Hour
+	TestAppName     = "test-app"
+	TestClusterID   = "test-cluster-id"
+	TestClusterName = "test-cluster"
+	TestResolution  = 24 * time.Hour
 )
 
 type GenerateMockSet[T any] func(start, end time.Time) *T
@@ -159,12 +162,12 @@ func TestExporters(t *testing.T) {
 	t.Run("allocation exporter", func(t *testing.T) {
 		allocSource := NewMockAllocationSource()
 		memStore := storage.NewMemoryStorage()
-		p, err := pathing.NewDefaultStoragePathFormatter(TestClusterId, pipelines.AllocationPipelineName, ptr(TestResolution))
+		p, err := pathing.NewDefaultStoragePathFormatter(TestClusterName, pipelines.AllocationPipelineName, ptr(TestResolution))
 		if err != nil {
 			t.Fatalf("failed to create path formatter: %v", err)
 		}
 
-		allocExporter, err := NewComputePipelineExporter[opencost.AllocationSet](TestClusterId, TestResolution, memStore)
+		allocExporter, err := NewComputePipelineExporter[opencost.AllocationSet](TestClusterName, TestResolution, memStore)
 		if err != nil {
 			t.Fatalf("failed to create allocation exporter: %v", err)
 		}
@@ -188,12 +191,12 @@ func TestExporters(t *testing.T) {
 	t.Run("asset exporter", func(t *testing.T) {
 		assetSource := NewMockAssetSource()
 		memStore := storage.NewMemoryStorage()
-		p, err := pathing.NewDefaultStoragePathFormatter(TestClusterId, pipelines.AssetsPipelineName, ptr(TestResolution))
+		p, err := pathing.NewDefaultStoragePathFormatter(TestClusterName, pipelines.AssetsPipelineName, ptr(TestResolution))
 		if err != nil {
 			t.Fatalf("failed to create path formatter: %v", err)
 		}
 
-		assetExporter, err := NewComputePipelineExporter[opencost.AssetSet](TestClusterId, TestResolution, memStore)
+		assetExporter, err := NewComputePipelineExporter[opencost.AssetSet](TestClusterName, TestResolution, memStore)
 		if err != nil {
 			t.Fatalf("failed to create allocation exporter: %v", err)
 		}
@@ -217,12 +220,12 @@ func TestExporters(t *testing.T) {
 	t.Run("network insight exporter", func(t *testing.T) {
 		netInsightSource := NewMockNetworkInsightSource()
 		memStore := storage.NewMemoryStorage()
-		p, err := pathing.NewDefaultStoragePathFormatter(TestClusterId, pipelines.NetworkInsightPipelineName, ptr(TestResolution))
+		p, err := pathing.NewDefaultStoragePathFormatter(TestClusterName, pipelines.NetworkInsightPipelineName, ptr(TestResolution))
 		if err != nil {
 			t.Fatalf("failed to create path formatter: %v", err)
 		}
 
-		netInsightExporter, err := NewComputePipelineExporter[opencost.NetworkInsightSet](TestClusterId, TestResolution, memStore)
+		netInsightExporter, err := NewComputePipelineExporter[opencost.NetworkInsightSet](TestClusterName, TestResolution, memStore)
 		if err != nil {
 			t.Fatalf("failed to create net insights exporter: %v", err)
 		}
@@ -246,12 +249,13 @@ func TestExporters(t *testing.T) {
 	t.Run("KubeModel exporter", func(t *testing.T) {
 		kubeModelSource := NewMockKubeModelSource()
 		memStore := storage.NewMemoryStorage()
-		p, err := pathing.NewDefaultStoragePathFormatter(TestClusterId, pipelines.KubeModelPipelineName, ptr(TestResolution))
+		res := timeutil.FormatStoreResolution(TestResolution)
+		p, err := pathing.NewKubeModelStoragePathFormatter(TestAppName, TestClusterID, res)
 		if err != nil {
 			t.Fatalf("failed to create path formatter: %v", err)
 		}
 
-		kubeModelExporter, err := NewComputePipelineExporter[kubemodel.KubeModelSet](TestClusterId, TestResolution, memStore)
+		kubeModelExporter, err := NewKubeModelComputePipelineExporter[kubemodel.KubeModelSet](TestAppName, TestClusterID, TestResolution, memStore)
 		if err != nil {
 			t.Fatalf("failed to create KubeModel exporter: %v", err)
 		}
@@ -276,7 +280,7 @@ func TestExporters(t *testing.T) {
 		memStore := storage.NewMemoryStorage()
 
 		// Invalid pipeline
-		_, err := NewComputePipelineExporter[UnknownSet](TestClusterId, TestResolution, memStore)
+		_, err := NewComputePipelineExporter[UnknownSet](TestClusterName, TestResolution, memStore)
 		if err == nil {
 			t.Fatalf("expected error creating unknown pipeline exporter, got nil")
 		}
@@ -295,8 +299,9 @@ func TestPipelineExportControllers(t *testing.T) {
 		memStore := storage.NewMemoryStorage()
 
 		exportControllers := NewPipelineExportControllers(memStore, pipelineComputeSource, PipelinesExportConfig{
-			ClusterUID:                        TestClusterId,
-			ClusterName:                       TestClusterId,
+			AppName:                           TestAppName,
+			ClusterUID:                        TestClusterID,
+			ClusterName:                       TestClusterName,
 			AllocationPiplineResolutions:      []time.Duration{TestResolution},
 			AssetPipelineResolutons:           []time.Duration{TestResolution},
 			NetworkInsightPipelineResolutions: []time.Duration{TestResolution},
@@ -310,15 +315,15 @@ func TestPipelineExportControllers(t *testing.T) {
 		time.Sleep(time.Second + (750 * time.Millisecond))
 		exportControllers.Stop()
 
-		allocPath, err := pathing.NewDefaultStoragePathFormatter(TestClusterId, pipelines.AllocationPipelineName, ptr(TestResolution))
+		allocPath, err := pathing.NewDefaultStoragePathFormatter(TestClusterName, pipelines.AllocationPipelineName, ptr(TestResolution))
 		if err != nil {
 			t.Fatalf("failed to create allocations path formatter: %v", err)
 		}
-		assetPath, err := pathing.NewDefaultStoragePathFormatter(TestClusterId, pipelines.AssetsPipelineName, ptr(TestResolution))
+		assetPath, err := pathing.NewDefaultStoragePathFormatter(TestClusterName, pipelines.AssetsPipelineName, ptr(TestResolution))
 		if err != nil {
 			t.Fatalf("failed to create assets path formatter: %v", err)
 		}
-		netPath, err := pathing.NewDefaultStoragePathFormatter(TestClusterId, pipelines.NetworkInsightPipelineName, ptr(TestResolution))
+		netPath, err := pathing.NewDefaultStoragePathFormatter(TestClusterName, pipelines.NetworkInsightPipelineName, ptr(TestResolution))
 		if err != nil {
 			t.Fatalf("failed to create net insights path formatter: %v", err)
 		}
@@ -333,8 +338,9 @@ func TestPipelineExportControllers(t *testing.T) {
 		memStore := storage.NewMemoryStorage()
 
 		exportControllers := NewPipelineExportControllers(memStore, pipelineComputeSource, PipelinesExportConfig{
-			ClusterUID:                        TestClusterId,
-			ClusterName:                       TestClusterId,
+			AppName:                           TestAppName,
+			ClusterUID:                        TestClusterID,
+			ClusterName:                       TestClusterName,
 			AllocationPiplineResolutions:      []time.Duration{TestResolution},
 			AssetPipelineResolutons:           []time.Duration{TestResolution},
 			NetworkInsightPipelineResolutions: []time.Duration{TestResolution},
@@ -348,15 +354,15 @@ func TestPipelineExportControllers(t *testing.T) {
 		time.Sleep(time.Second + (750 * time.Millisecond))
 		exportControllers.Stop()
 
-		allocPath, err := pathing.NewDefaultStoragePathFormatter(TestClusterId, pipelines.AllocationPipelineName, ptr(TestResolution))
+		allocPath, err := pathing.NewDefaultStoragePathFormatter(TestClusterName, pipelines.AllocationPipelineName, ptr(TestResolution))
 		if err != nil {
 			t.Fatalf("failed to create allocations path formatter: %v", err)
 		}
-		assetPath, err := pathing.NewDefaultStoragePathFormatter(TestClusterId, pipelines.AssetsPipelineName, ptr(TestResolution))
+		assetPath, err := pathing.NewDefaultStoragePathFormatter(TestClusterName, pipelines.AssetsPipelineName, ptr(TestResolution))
 		if err != nil {
 			t.Fatalf("failed to create assets path formatter: %v", err)
 		}
-		netPath, err := pathing.NewDefaultStoragePathFormatter(TestClusterId, pipelines.NetworkInsightPipelineName, ptr(TestResolution))
+		netPath, err := pathing.NewDefaultStoragePathFormatter(TestClusterName, pipelines.NetworkInsightPipelineName, ptr(TestResolution))
 		if err != nil {
 			t.Fatalf("failed to create net insights path formatter: %v", err)
 		}
@@ -370,7 +376,7 @@ func TestPipelineExportControllers(t *testing.T) {
 		pipelineComputeSource := NewMockPipelineComputeSource()
 		memStore := storage.NewMemoryStorage()
 
-		exportControllers := NewPipelineExportControllers(memStore, pipelineComputeSource, NewPipelinesExportConfig(TestClusterId, TestClusterId))
+		exportControllers := NewPipelineExportControllers(memStore, pipelineComputeSource, NewPipelinesExportConfig(TestAppName, TestClusterID, TestClusterName))
 
 		if len(exportControllers.AllocationExportController.Resolutions()) != 2 {
 			t.Fatalf("expected 2 allocation resolutions, got %d", len(exportControllers.AllocationExportController.Resolutions()))
@@ -388,7 +394,7 @@ func TestPipelineExportControllers(t *testing.T) {
 		pipelineComputeSource := NewMockPipelineComputeSourceWith(48 * time.Hour)
 		memStore := storage.NewMemoryStorage()
 
-		exportControllers := NewPipelineExportControllers(memStore, pipelineComputeSource, NewPipelinesExportConfig(TestClusterId, TestClusterId))
+		exportControllers := NewPipelineExportControllers(memStore, pipelineComputeSource, NewPipelinesExportConfig(TestAppName, TestClusterID, TestClusterName))
 
 		if len(exportControllers.AllocationExportController.Resolutions()) != 0 {
 			t.Fatalf("expected 0 allocation resolutions, got %d", len(exportControllers.AllocationExportController.Resolutions()))
@@ -405,7 +411,7 @@ func TestPipelineExportControllers(t *testing.T) {
 		pipelineComputeSource := NewMockPipelineComputeSource()
 		memStore := storage.NewMemoryStorage()
 
-		exportControllers := NewPipelineExportControllers(memStore, pipelineComputeSource, NewPipelinesExportConfig("", ""))
+		exportControllers := NewPipelineExportControllers(memStore, pipelineComputeSource, NewPipelinesExportConfig("", "", ""))
 
 		if len(exportControllers.AllocationExportController.Resolutions()) != 0 {
 			t.Fatalf("expected 0 allocation resolutions, got %d", len(exportControllers.AllocationExportController.Resolutions()))

+ 50 - 8
core/pkg/opencost/exporter/exporters.go

@@ -9,13 +9,14 @@ import (
 	"github.com/opencost/opencost/core/pkg/exporter/validator"
 	"github.com/opencost/opencost/core/pkg/pipelines"
 	"github.com/opencost/opencost/core/pkg/storage"
+	"github.com/opencost/opencost/core/pkg/util/timeutil"
 	"github.com/opencost/opencost/core/pkg/util/typeutil"
 )
 
 // NewComputePipelineExporter creates a new `ComputeExporter[T]` instance which is used to export computed data
 // by window for a specific pipeline.
 func NewComputePipelineExporter[T any, U export.BinaryMarshalerPtr[T], S validator.SetConstraint[T]](
-	clusterId string,
+	clusterName string,
 	resolution time.Duration,
 	store storage.Storage,
 ) (export.ComputeExporter[T], error) {
@@ -24,17 +25,13 @@ func NewComputePipelineExporter[T any, U export.BinaryMarshalerPtr[T], S validat
 		return nil, fmt.Errorf("failed to extract pipeline name for type: %s", typeutil.TypeOf[T]())
 	}
 
-	pathing, err := pathing.NewDefaultStoragePathFormatter(clusterId, pipelineName, &resolution)
+	pathing, err := pathing.NewDefaultStoragePathFormatter(clusterName, pipelineName, &resolution)
 	if err != nil {
 		return nil, fmt.Errorf("failed to create path formatter: %w", err)
 	}
 
 	var encoder export.Encoder[T]
-	if pipelineName == pipelines.KubeModelPipelineName {
-		encoder = export.NewBingenFileEncoder[T, U]()
-	} else {
-		encoder = export.NewBingenEncoder[T, U]()
-	}
+	encoder = export.NewBingenEncoder[T, U]()
 
 	return export.NewComputeStorageExporter(
 		pathing,
@@ -47,12 +44,57 @@ func NewComputePipelineExporter[T any, U export.BinaryMarshalerPtr[T], S validat
 // NewComputePipelineExportController creates a new `ComputeExportController[T]` instance which is used to export computed data
 // using the provided source, storage, resolution, and source resolution.
 func NewComputePipelineExportController[T any, U export.BinaryMarshalerPtr[T], S validator.SetConstraint[T]](
+	clusterName string,
+	store storage.Storage,
+	source export.ComputeSource[T],
+	resolution time.Duration,
+) (*export.ComputeExportController[T], error) {
+	exporter, err := NewComputePipelineExporter[T, U, S](clusterName, resolution, store)
+	if err != nil {
+		return nil, fmt.Errorf("failed to create compute exporter: %w", err)
+	}
+
+	return export.NewComputeExportController(source, exporter, resolution), nil
+}
+
+// NewKubeModelComputePipelineExporter creates a new `ComputeExporter[T]` instance which is used to export computed data
+// by window for a specific pipeline.
+func NewKubeModelComputePipelineExporter[T any, U export.BinaryMarshalerPtr[T], S validator.SetConstraint[T]](
+	appName string,
+	clusterId string,
+	resolution time.Duration,
+	store storage.Storage,
+) (export.ComputeExporter[T], error) {
+	pipelineName := pipelines.NameFor[T]()
+	if pipelineName == "" {
+		return nil, fmt.Errorf("failed to extract pipeline name for type: %s", typeutil.TypeOf[T]())
+	}
+	res := timeutil.FormatStoreResolution(resolution)
+	pathing, err := pathing.NewKubeModelStoragePathFormatter(appName, clusterId, res)
+	if err != nil {
+		return nil, fmt.Errorf("failed to create path formatter: %w", err)
+	}
+
+	encoder := export.NewBingenFileEncoder[T, U]()
+
+	return export.NewComputeStorageExporter(
+		pathing,
+		encoder,
+		store,
+		validator.NewSetValidator[T, S](resolution),
+	), nil
+}
+
+// NewKubeModelComputePipelineExportController creates a new `ComputeExportController[T]` instance which is used to export computed data
+// using the provided source, storage, resolution, and source resolution.
+func NewKubeModelComputePipelineExportController[T any, U export.BinaryMarshalerPtr[T], S validator.SetConstraint[T]](
+	appName string,
 	clusterId string,
 	store storage.Storage,
 	source export.ComputeSource[T],
 	resolution time.Duration,
 ) (*export.ComputeExportController[T], error) {
-	exporter, err := NewComputePipelineExporter[T, U, S](clusterId, resolution, store)
+	exporter, err := NewKubeModelComputePipelineExporter[T, U, S](appName, clusterId, resolution, store)
 	if err != nil {
 		return nil, fmt.Errorf("failed to create compute exporter: %w", err)
 	}

+ 33 - 0
core/pkg/storage/storefactory.go

@@ -5,6 +5,7 @@ import (
 	"os"
 
 	"github.com/opencost/opencost/core/pkg/env"
+	"github.com/opencost/opencost/core/pkg/log"
 )
 
 // GetDefaultStorage initializes the default shared storage which is required for kubecost. Panics
@@ -17,6 +18,38 @@ func GetDefaultStorage() Storage {
 	return store
 }
 
+// GetConfiguredStorage retrieves the default shared storage which is required for running an opencost.
+func GetConfiguredStorage() Storage {
+	const warningMessage = `Failed to create local directory '%s' - %s.
+		Did you mean to enable the collector? For persistent storage, it's recommended to use Prometheus, 
+		or set a storage bucket configuration at %s. 
+
+		%s`
+
+	// Try bucket storage if it exists
+	store, err := TryGetDefaultStorage()
+	if err == nil {
+		return store
+	}
+
+	// Fallback to a local storage bucket
+	dir := env.GetConfigPath()
+	err = os.MkdirAll(dir, os.ModePerm)
+	if err != nil {
+		log.Warnf(
+			warningMessage,
+			dir,
+			err.Error(),
+			env.GetDefaultStorageConfigFilePath(),
+			"Falling back to an in-memory file system for collector, which will lose any persistent storage upon restart.",
+		)
+
+		return NewMemoryStorage()
+	}
+
+	return NewFileStorage(dir)
+}
+
 // TryGetDefaultStorage will attempt to load the default bucket configuration, but will not panic
 // if the config file does not exist.
 func TryGetDefaultStorage() (Storage, error) {

+ 5 - 0
pkg/cmd/costmodel/costmodel.go

@@ -57,6 +57,7 @@ func Execute(conf *Config) error {
 		if conf.CarbonEstimatesEnabled {
 			router.GET("/assets/carbon", a.ComputeAssetsCarbonHandler)
 		}
+		router.GET("/kubemodel", a.KubeModelHandler)
 
 	}
 
@@ -123,6 +124,10 @@ func Execute(conf *Config) error {
 	case <-ctx.Done():
 		log.Infof("Shutdown signal received, starting graceful shutdown...")
 
+		if a.KubeModelPipeline != nil {
+			a.KubeModelPipeline.Stop()
+		}
+
 		if customCostPipelineService != nil {
 			customCostPipelineService.Stop()
 		}

+ 30 - 0
pkg/costmodel/handlers.go

@@ -105,3 +105,33 @@ func (a *Accesses) ComputeAssetsFromCostmodel(window opencost.Window, filterStri
 
 	return assetSet, nil
 }
+
+// KubeModelHandler returns KubeModelSets from pipeline storage for a given window and resolution.
+//
+// Query params:
+//
+//	window     (required) — same format as allocation/asset endpoints, e.g. "today", "7d", "2024-04-01T00:00:00Z,2024-04-02T00:00:00Z"
+func (a *Accesses) KubeModelHandler(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
+	w.Header().Set("Content-Type", "application/json")
+
+	if a.KubeModelQuerier == nil {
+		http.Error(w, "KubeModel pipeline not initialized", http.StatusServiceUnavailable)
+		return
+	}
+
+	qp := httputil.NewQueryParams(r.URL.Query())
+
+	window, err := opencost.ParseWindowWithOffset(qp.Get("window", ""), env.GetParsedUTCOffset())
+	if err != nil {
+		http.Error(w, fmt.Sprintf("Invalid 'window' parameter: %s", err), http.StatusBadRequest)
+		return
+	}
+
+	sets, err := a.KubeModelQuerier.Query(window)
+	if err != nil {
+		http.Error(w, fmt.Sprintf("Error querying KubeModel: %s", err), http.StatusInternalServerError)
+		return
+	}
+
+	WriteData(w, sets, nil)
+}

+ 104 - 0
pkg/costmodel/handlers_test.go

@@ -0,0 +1,104 @@
+package costmodel
+
+import (
+	"encoding/json"
+	"fmt"
+	"net/http"
+	"net/http/httptest"
+	"testing"
+	"time"
+
+	"github.com/julienschmidt/httprouter"
+	coremodel "github.com/opencost/opencost/core/pkg/model/kubemodel"
+	"github.com/opencost/opencost/core/pkg/opencost"
+	"github.com/stretchr/testify/require"
+)
+
+// mockKubeModelQuerier implements kubeModelQuerier for handler tests.
+type mockKubeModelQuerier struct {
+	results []*coremodel.KubeModelSet
+	err     error
+}
+
+func (m *mockKubeModelQuerier) Query(_ opencost.Window) ([]*coremodel.KubeModelSet, error) {
+	return m.results, m.err
+}
+
+// newKubeModelRequest builds a GET request to /kubemodel with the given window query param.
+func newKubeModelRequest(window string) *http.Request {
+	url := "/kubemodel"
+	if window != "" {
+		url += "?window=" + window
+	}
+	r, _ := http.NewRequest(http.MethodGet, url, nil)
+	return r
+}
+
+func TestKubeModelHandler_NilQuerier_Returns503(t *testing.T) {
+	a := &Accesses{KubeModelQuerier: nil}
+
+	w := httptest.NewRecorder()
+	a.KubeModelHandler(w, newKubeModelRequest("1d"), httprouter.Params{})
+
+	require.Equal(t, http.StatusServiceUnavailable, w.Code)
+}
+
+func TestKubeModelHandler_MissingWindow_Returns400(t *testing.T) {
+	a := &Accesses{KubeModelQuerier: &mockKubeModelQuerier{}}
+
+	w := httptest.NewRecorder()
+	a.KubeModelHandler(w, newKubeModelRequest(""), httprouter.Params{})
+
+	require.Equal(t, http.StatusBadRequest, w.Code)
+}
+
+func TestKubeModelHandler_InvalidWindow_Returns400(t *testing.T) {
+	a := &Accesses{KubeModelQuerier: &mockKubeModelQuerier{}}
+
+	w := httptest.NewRecorder()
+	a.KubeModelHandler(w, newKubeModelRequest("notawindow"), httprouter.Params{})
+
+	require.Equal(t, http.StatusBadRequest, w.Code)
+}
+
+func TestKubeModelHandler_QuerierError_Returns500(t *testing.T) {
+	a := &Accesses{
+		KubeModelQuerier: &mockKubeModelQuerier{err: fmt.Errorf("storage unavailable")},
+	}
+
+	w := httptest.NewRecorder()
+	a.KubeModelHandler(w, newKubeModelRequest("1d"), httprouter.Params{})
+
+	require.Equal(t, http.StatusInternalServerError, w.Code)
+}
+
+func TestKubeModelHandler_Success_Returns200WithData(t *testing.T) {
+	now := time.Now().UTC().Truncate(time.Hour)
+	kms := coremodel.NewMockKubeModelSet(now.Add(-time.Hour), now)
+	a := &Accesses{
+		KubeModelQuerier: &mockKubeModelQuerier{results: []*coremodel.KubeModelSet{kms}},
+	}
+
+	w := httptest.NewRecorder()
+	a.KubeModelHandler(w, newKubeModelRequest("1d"), httprouter.Params{})
+
+	require.Equal(t, http.StatusOK, w.Code)
+	require.Equal(t, "application/json", w.Header().Get("Content-Type"))
+
+	// Response must be valid JSON and contain a non-empty data payload.
+	var body map[string]json.RawMessage
+	require.NoError(t, json.Unmarshal(w.Body.Bytes(), &body))
+	_, hasData := body["data"]
+	require.True(t, hasData, "response body should contain a 'data' field")
+}
+
+func TestKubeModelHandler_EmptyResults_Returns200(t *testing.T) {
+	a := &Accesses{
+		KubeModelQuerier: &mockKubeModelQuerier{results: []*coremodel.KubeModelSet{}},
+	}
+
+	w := httptest.NewRecorder()
+	a.KubeModelHandler(w, newKubeModelRequest("1d"), httprouter.Params{})
+
+	require.Equal(t, http.StatusOK, w.Code)
+}

+ 17 - 33
pkg/costmodel/router.go

@@ -41,6 +41,7 @@ import (
 	"github.com/opencost/opencost/pkg/cloud/models"
 	clusterc "github.com/opencost/opencost/pkg/clustercache"
 	"github.com/opencost/opencost/pkg/env"
+	km "github.com/opencost/opencost/pkg/kubemodel"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 
 	"github.com/patrickmn/go-cache"
@@ -73,6 +74,8 @@ type Accesses struct {
 	ClusterInfoProvider clusters.ClusterInfoProvider
 	Model               *CostModel
 	MetricsEmitter      *CostModelMetricsEmitter
+	KubeModelPipeline   *km.Pipeline
+	KubeModelQuerier    km.Querier
 	// SettingsCache stores current state of app settings
 	SettingsCache *cache.Cache
 	// settingsSubscribers tracks channels through which changes to different
@@ -445,6 +448,8 @@ func Initialize(router *httprouter.Router, additionalConfigWatchers ...*watcher.
 	// Create ConfigFileManager for synchronization of shared configuration
 	confManager := config.NewConfigFileManager(nil)
 
+	store := storage.GetConfiguredStorage()
+
 	cloudProviderKey := env.GetCloudProviderAPIKey()
 	cloudProvider, err := provider.NewProvider(k8sCache, cloudProviderKey, confManager)
 	if err != nil {
@@ -480,7 +485,6 @@ func Initialize(router *httprouter.Router, additionalConfigWatchers ...*watcher.
 	}
 	if env.IsCollectorDataSourceEnabled() {
 		fn = func() (source.OpenCostDataSource, error) {
-			store := GetDefaultCollectorStorage()
 			nodeStatConf, err := NewNodeClientConfigFromEnv()
 			if err != nil {
 				return nil, fmt.Errorf("failed to get node client config: %w", err)
@@ -527,6 +531,16 @@ func Initialize(router *httprouter.Router, additionalConfigWatchers ...*watcher.
 	costModel := NewCostModel(clusterUID, dataSource, cloudProvider, k8sCache, clusterMap, dataSource.BatchDuration())
 	metricsEmitter := NewCostModelMetricsEmitter(k8sCache, cloudProvider, clusterInfoProvider, costModel)
 
+	appName := sysenv.GetAppName()
+	var kubeModelPipeline *km.Pipeline
+	if p, err := km.NewPipeline(appName, clusterUID, store, costModel); err != nil {
+		log.Errorf("Failed to initialize KubeModel pipeline: %v", err)
+	} else {
+		p.Start()
+		kubeModelPipeline = p
+	}
+	kubeModelQuerier := km.NewQuerier(appName, clusterUID, store)
+
 	a := &Accesses{
 		DataSource:          dataSource,
 		KubeClientSet:       kubeClientset,
@@ -537,6 +551,8 @@ func Initialize(router *httprouter.Router, additionalConfigWatchers ...*watcher.
 		ClusterInfoProvider: clusterInfoProvider,
 		Model:               costModel,
 		MetricsEmitter:      metricsEmitter,
+		KubeModelPipeline:   kubeModelPipeline,
+		KubeModelQuerier:    kubeModelQuerier,
 		SettingsCache:       settingsCache,
 	}
 
@@ -574,38 +590,6 @@ func Initialize(router *httprouter.Router, additionalConfigWatchers ...*watcher.
 	return a
 }
 
-// GetDefaultStorage retrieves the default shared storage which is required for running an opencost collector.
-func GetDefaultCollectorStorage() storage.Storage {
-	const warningMessage = `Failed to create local collector directory '%s' - %s.
-		Did you mean to enable to collector? For persistent storage, it's recommended to use Prometheus, 
-		or set a storage bucket configuration at %s. 
-
-		%s`
-
-	// Try bucket storage if it exists
-	store, err := storage.TryGetDefaultStorage()
-	if err == nil {
-		return store
-	}
-
-	// Fallback to a local storage bucket
-	dir := env.GetLocalCollectorDirectory()
-	err = os.MkdirAll(dir, os.ModePerm)
-	if err != nil {
-		log.Warnf(
-			warningMessage,
-			dir,
-			err.Error(),
-			sysenv.GetDefaultStorageConfigFilePath(),
-			"Falling back to an in-memory file system for collector, which will lose any persistent storage upon restart.",
-		)
-
-		return storage.NewMemoryStorage()
-	}
-
-	return storage.NewFileStorage(dir)
-}
-
 // InitializeCloudCost Initializes Cloud Cost pipeline and querier and registers endpoints
 func InitializeCloudCost(router *httprouter.Router) *cloudcost.PipelineService {
 	log.Debugf("Cloud Cost config path: %s", env.GetCloudCostConfigPath())

+ 2 - 8
pkg/env/costmodel.go

@@ -11,9 +11,8 @@ import (
 const (
 	ClusterInfoFile = "cluster-info.json"
 	ClusterCacheFile
-	GCPAuthSecretFile        = "key.json"
-	MetricConfigFile         = "metrics.json"
-	DefaultLocalCollectorDir = "collector"
+	GCPAuthSecretFile = "key.json"
+	MetricConfigFile  = "metrics.json"
 )
 
 // Env Variables
@@ -403,11 +402,6 @@ func GetMetricConfigFile() string {
 	return env.GetPathFromConfig(MetricConfigFile)
 }
 
-func GetLocalCollectorDirectory() string {
-	dir := env.Get(LocalCollectorDirectoryEnvVar, DefaultLocalCollectorDir)
-	return env.GetPathFromConfig(dir)
-}
-
 func GetDOKSPricingURL() string {
 	return env.Get(ProviderPricingURL, "https://api.digitalocean.com/v2/sizes")
 }

+ 174 - 0
pkg/kubemodel/janitor.go

@@ -0,0 +1,174 @@
+package kubemodel
+
+import (
+	"path"
+	"strings"
+	"sync/atomic"
+	"time"
+
+	coreenv "github.com/opencost/opencost/core/pkg/env"
+	"github.com/opencost/opencost/core/pkg/exporter/pathing"
+	"github.com/opencost/opencost/core/pkg/log"
+	"github.com/opencost/opencost/core/pkg/pipelines"
+	"github.com/opencost/opencost/core/pkg/storage"
+	"github.com/opencost/opencost/core/pkg/util/timeutil"
+)
+
+const (
+	janitorDefault1dRetention  = 30 // days
+	janitorDefault1hRetention  = 49 // hours
+	janitorDefault10mRetention = 36 // 10-minute segments (6 hours)
+)
+
+// Janitor removes KubeModelSet files from storage that exceed the retention period for each resolution.
+// Retention durations are read from the standard RESOLUTION_*_RETENTION env vars.
+type Janitor struct {
+	store       storage.Storage
+	appName     string
+	clusterId   string
+	resolutions []time.Duration
+	isRunning   atomic.Bool
+	isStopping  atomic.Bool
+	exitCh      chan struct{}
+}
+
+// NewJanitor creates a Janitor for the given storage backend, cluster, and active resolutions.
+func NewJanitor(store storage.Storage, appName, clusterId string, resolutions []time.Duration) *Janitor {
+	return &Janitor{
+		store:       store,
+		appName:     appName,
+		clusterId:   clusterId,
+		resolutions: resolutions,
+	}
+}
+
+// Start launches the background retention cleanup loop. No-op if already running.
+func (j *Janitor) Start(interval time.Duration) {
+	if !j.isRunning.CompareAndSwap(false, true) {
+		return
+	}
+	j.exitCh = make(chan struct{})
+	go func() {
+		ticker := time.NewTicker(interval)
+		defer ticker.Stop()
+		for {
+			select {
+			case <-j.exitCh:
+				j.isRunning.Store(false)
+				j.isStopping.Store(false)
+				return
+			case <-ticker.C:
+				j.cleanup()
+			}
+		}
+	}()
+}
+
+// Stop halts the cleanup loop.
+func (j *Janitor) Stop() {
+	if !j.isStopping.CompareAndSwap(false, true) {
+		return
+	}
+	close(j.exitCh)
+}
+
+func (j *Janitor) cleanup() {
+	for _, res := range j.resolutions {
+		retention := retentionFor(res)
+		if retention == 0 {
+			continue
+		}
+		resStr := timeutil.FormatStoreResolution(res)
+		baseDir := path.Join(j.appName, j.clusterId, pipelines.KubeModelPipelineName, resStr)
+		cutoff := time.Now().UTC().Truncate(res).Add(-retention)
+		j.pruneResolution(baseDir, cutoff)
+	}
+}
+
+// retentionFor returns the total retention duration for a given export resolution, using the
+// standard RESOLUTION_*_RETENTION env vars (without a prefix, so the same values apply across pipelines).
+func retentionFor(res time.Duration) time.Duration {
+	switch {
+	case res >= timeutil.Day:
+		n := coreenv.GetInt(coreenv.Resolution1dRetentionEnvVar, janitorDefault1dRetention)
+		return timeutil.Day * time.Duration(n)
+	case res >= time.Hour:
+		n := coreenv.GetInt(coreenv.Resolution1hRetentionEnvVar, janitorDefault1hRetention)
+		return time.Hour * time.Duration(n)
+	default:
+		n := coreenv.GetInt(coreenv.Resolution10mRetentionEnvVar, janitorDefault10mRetention)
+		return 10 * time.Minute * time.Duration(n)
+	}
+}
+
+func (j *Janitor) pruneResolution(baseDir string, cutoff time.Time) {
+	cutoffDay := cutoff.Truncate(timeutil.Day)
+	years, err := j.store.ListDirectories(baseDir)
+	if err != nil {
+		if !storage.IsNotExist(err) {
+			log.Errorf("KubeModel janitor: listing %s: %v", baseDir, err)
+		}
+		return
+	}
+	for _, yearInfo := range years {
+		// ListDirectories returns the full path in Name with a trailing slash.
+		yearDir := strings.TrimSuffix(yearInfo.Name, "/")
+		months, err := j.store.ListDirectories(yearDir)
+		if err != nil {
+			log.Warnf("KubeModel janitor: listing %s: %v", yearDir, err)
+			continue
+		}
+		for _, monthInfo := range months {
+			monthDir := strings.TrimSuffix(monthInfo.Name, "/")
+			days, err := j.store.ListDirectories(monthDir)
+			if err != nil {
+				log.Warnf("KubeModel janitor: listing %s: %v", monthDir, err)
+				continue
+			}
+			for _, dayInfo := range days {
+				dayDir := strings.TrimSuffix(dayInfo.Name, "/")
+				dateStr := path.Base(yearDir) + "/" + path.Base(monthDir) + "/" + path.Base(dayDir)
+				date, err := time.Parse("2006/01/02", dateStr)
+				if err != nil {
+					log.Warnf("KubeModel janitor: cannot parse date from %s: %v", dateStr, err)
+					continue
+				}
+				if !date.After(cutoffDay) {
+					j.cleanDay(dayDir, cutoff)
+				}
+			}
+		}
+	}
+}
+
+// cleanDay removes files in dayDir whose embedded timestamp is before cutoff.
+func (j *Janitor) cleanDay(dayDir string, cutoff time.Time) {
+	files, err := j.store.List(dayDir)
+	if err != nil {
+		log.Warnf("KubeModel janitor: listing files in %s: %v", dayDir, err)
+		return
+	}
+	for _, f := range files {
+		ts, err := parseKubeModelFileTimestamp(f.Name)
+		if err != nil {
+			log.Warnf("KubeModel janitor: cannot parse timestamp from %s: %v", f.Name, err)
+			continue
+		}
+		if ts.Before(cutoff) {
+			filePath := path.Join(dayDir, f.Name)
+			if err := j.store.Remove(filePath); err != nil {
+				log.Warnf("KubeModel janitor: removing %s: %v", filePath, err)
+			} else {
+				log.Infof("KubeModel janitor: removed expired file %s", filePath)
+			}
+		}
+	}
+}
+
+// parseKubeModelFileTimestamp extracts the start time from a KubeModel file name.
+// File names have the format <YYYYMMDDHHmmSS>.<ext> (or <prefix>.<YYYYMMDDHHmmSS>.<ext>
+// when a prefix is present, but the pipeline writes files without a prefix).
+func parseKubeModelFileTimestamp(name string) (time.Time, error) {
+	fileParts := strings.Split(name, ".")
+	return time.ParseInLocation(pathing.KubeModelStorageTimeFormat, fileParts[0], time.UTC)
+}

+ 200 - 0
pkg/kubemodel/janitor_test.go

@@ -0,0 +1,200 @@
+package kubemodel
+
+import (
+	"fmt"
+	"testing"
+	"time"
+
+	"github.com/opencost/opencost/core/pkg/exporter/pathing"
+	"github.com/opencost/opencost/core/pkg/opencost"
+	"github.com/opencost/opencost/core/pkg/storage"
+	"github.com/opencost/opencost/core/pkg/util/timeutil"
+	"github.com/stretchr/testify/require"
+)
+
+// storeFile writes a single zero-byte placeholder at the path the janitor would
+// find for a given resolution and window start time.
+func storeFile(t *testing.T, store storage.Storage, appName, clusterID string, res time.Duration, start time.Time) string {
+	t.Helper()
+	resStr := timeutil.FormatStoreResolution(res)
+	f, err := pathing.NewKubeModelStoragePathFormatter(appName, clusterID, resStr)
+	require.NoError(t, err)
+
+	w := opencost.NewClosedWindow(start, start.Add(res))
+	p := f.ToFullPath("", w, "bingen")
+	require.NoError(t, store.Write(p, []byte("data")))
+	return p
+}
+
+// fileExists reports whether path p is present in store.
+func fileExists(t *testing.T, store storage.Storage, p string) bool {
+	t.Helper()
+	ok, err := store.Exists(p)
+	require.NoError(t, err)
+	return ok
+}
+
+// --- parseKubeModelFileTimestamp ---
+
+func TestParseKubeModelFileTimestamp(t *testing.T) {
+	cases := []struct {
+		name    string
+		input   string
+		want    time.Time
+		wantErr bool
+	}{
+		{
+			name:  "bare timestamp with extension",
+			input: "20240115103000.bingen",
+			want:  time.Date(2024, 1, 15, 10, 30, 0, 0, time.UTC),
+		},
+		{
+			name:  "timestamp without extension",
+			input: "20240115103000",
+			want:  time.Date(2024, 1, 15, 10, 30, 0, 0, time.UTC),
+		},
+		{
+			name:    "completely invalid name",
+			input:   "notadate.bingen",
+			wantErr: true,
+		},
+		{
+			name:    "empty string",
+			input:   "",
+			wantErr: true,
+		},
+	}
+
+	for _, tc := range cases {
+		t.Run(tc.name, func(t *testing.T) {
+			got, err := parseKubeModelFileTimestamp(tc.input)
+			if tc.wantErr {
+				require.Error(t, err)
+				return
+			}
+			require.NoError(t, err)
+			require.Equal(t, tc.want, got)
+		})
+	}
+}
+
+// --- retentionFor ---
+
+func TestRetentionFor(t *testing.T) {
+	cases := []struct {
+		name string
+		res  time.Duration
+		want time.Duration
+	}{
+		{"1d resolution uses day retention", timeutil.Day, time.Duration(janitorDefault1dRetention) * timeutil.Day},
+		{"1h resolution uses hour retention", time.Hour, time.Duration(janitorDefault1hRetention) * time.Hour},
+		{"10m resolution uses 10m retention", 10 * time.Minute, time.Duration(janitorDefault10mRetention) * 10 * time.Minute},
+	}
+
+	for _, tc := range cases {
+		t.Run(tc.name, func(t *testing.T) {
+			got := retentionFor(tc.res)
+			require.Equal(t, tc.want, got)
+		})
+	}
+}
+
+// --- pruneResolution / cleanDay ---
+
+func TestJanitor_PrunesExpiredFiles(t *testing.T) {
+	store := storage.NewMemoryStorage()
+	j := NewJanitor(store, testAppName, testClusterID, []time.Duration{time.Hour})
+
+	now := time.Now().UTC().Truncate(time.Hour)
+	cutoff := now.Add(-3 * time.Hour)
+
+	// Two files before the cutoff — should be removed.
+	expiredPath1 := storeFile(t, store, testAppName, testClusterID, time.Hour, cutoff.Add(-2*time.Hour))
+	expiredPath2 := storeFile(t, store, testAppName, testClusterID, time.Hour, cutoff.Add(-time.Hour))
+
+	// One file after the cutoff — should be kept.
+	keptPath := storeFile(t, store, testAppName, testClusterID, time.Hour, cutoff.Add(time.Hour))
+
+	resStr := timeutil.FormatStoreResolution(time.Hour)
+	baseDir := fmt.Sprintf("%s/%s/kubemodel/%s", testAppName, testClusterID, resStr)
+	j.pruneResolution(baseDir, cutoff)
+
+	require.False(t, fileExists(t, store, expiredPath1), "expired file 1 should be removed")
+	require.False(t, fileExists(t, store, expiredPath2), "expired file 2 should be removed")
+	require.True(t, fileExists(t, store, keptPath), "recent file should be kept")
+}
+
+func TestJanitor_KeepsFilesExactlyAtCutoff(t *testing.T) {
+	store := storage.NewMemoryStorage()
+	j := NewJanitor(store, testAppName, testClusterID, []time.Duration{time.Hour})
+
+	now := time.Now().UTC().Truncate(time.Hour)
+	cutoff := now.Add(-3 * time.Hour)
+
+	// File whose timestamp equals the cutoff exactly — should be kept (not Before).
+	atCutoffPath := storeFile(t, store, testAppName, testClusterID, time.Hour, cutoff)
+
+	resStr := timeutil.FormatStoreResolution(time.Hour)
+	baseDir := fmt.Sprintf("%s/%s/kubemodel/%s", testAppName, testClusterID, resStr)
+	j.pruneResolution(baseDir, cutoff)
+
+	require.True(t, fileExists(t, store, atCutoffPath), "file at cutoff boundary should not be removed")
+}
+
+func TestJanitor_EmptyStorageIsNoop(t *testing.T) {
+	store := storage.NewMemoryStorage()
+	j := NewJanitor(store, testAppName, testClusterID, []time.Duration{time.Hour})
+
+	resStr := timeutil.FormatStoreResolution(time.Hour)
+	baseDir := fmt.Sprintf("%s/%s/kubemodel/%s", testAppName, testClusterID, resStr)
+
+	// Should not panic or error on an empty store.
+	require.NotPanics(t, func() {
+		j.pruneResolution(baseDir, time.Now().UTC())
+	})
+}
+
+func TestJanitor_FilesWithUnparsableNamesAreSkipped(t *testing.T) {
+	store := storage.NewMemoryStorage()
+	j := NewJanitor(store, testAppName, testClusterID, []time.Duration{time.Hour})
+
+	now := time.Now().UTC().Truncate(time.Hour)
+	cutoff := now.Add(-time.Hour)
+
+	// Write a file at a valid day-dir path but with an unparsable name — janitor
+	// should skip it rather than panic or remove it.
+	resStr := timeutil.FormatStoreResolution(time.Hour)
+	badPath := fmt.Sprintf("%s/%s/kubemodel/%s/%s/garbage.bingen",
+		testAppName, testClusterID, resStr,
+		cutoff.Add(-24*time.Hour).Format("2006/01/02"),
+	)
+	require.NoError(t, store.Write(badPath, []byte("data")))
+
+	baseDir := fmt.Sprintf("%s/%s/kubemodel/%s", testAppName, testClusterID, resStr)
+	require.NotPanics(t, func() {
+		j.pruneResolution(baseDir, cutoff)
+	})
+
+	// File with bad name must not have been deleted.
+	require.True(t, fileExists(t, store, badPath))
+}
+
+// --- Start / Stop idempotency ---
+
+func TestJanitor_StartIsIdempotent(t *testing.T) {
+	store := storage.NewMemoryStorage()
+	j := NewJanitor(store, testAppName, testClusterID, []time.Duration{time.Hour})
+
+	j.Start(time.Hour)
+	j.Start(time.Hour) // second call must not panic or spawn a second goroutine
+	j.Stop()
+}
+
+func TestJanitor_StopIsIdempotent(t *testing.T) {
+	store := storage.NewMemoryStorage()
+	j := NewJanitor(store, testAppName, testClusterID, []time.Duration{time.Hour})
+
+	j.Start(time.Hour)
+	j.Stop()
+	j.Stop() // second stop must not panic (closing a closed channel would panic)
+}

+ 57 - 0
pkg/kubemodel/pipeline.go

@@ -0,0 +1,57 @@
+package kubemodel
+
+import (
+	"fmt"
+	"time"
+
+	ocexporter "github.com/opencost/opencost/core/pkg/opencost/exporter"
+	"github.com/opencost/opencost/core/pkg/storage"
+	"github.com/opencost/opencost/core/pkg/util/timeutil"
+)
+
+var (
+	exportInterval     = 5 * time.Minute
+	janitorInterval    = timeutil.Day
+	defaultResolutions = []time.Duration{time.Hour, timeutil.Day}
+)
+
+// Pipeline manages the KubeModel export controller group and the retention janitor.
+type Pipeline struct {
+	controllers *ocexporter.PipelineExportControllers
+	janitor     *Janitor
+}
+
+// NewPipeline creates a new pipeline with preset settings
+func NewPipeline(appName, clusterUID string, store storage.Storage, cm ocexporter.ComputePipelineSource) (*Pipeline, error) {
+	if store == nil {
+		return nil, fmt.Errorf("NewPipeline: store cannot be nil")
+	}
+	if clusterUID == "" {
+		return nil, fmt.Errorf("NewPipeline: clusterUID cannot be empty")
+	}
+
+	config := ocexporter.PipelinesExportConfig{
+		AppName:                      appName,
+		ClusterUID:                   clusterUID,
+		KubeModelPipelineResolutions: defaultResolutions,
+	}
+
+	controllers := ocexporter.NewPipelineExportControllers(store, cm, config)
+
+	return &Pipeline{
+		controllers: controllers,
+		janitor:     NewJanitor(store, appName, clusterUID, config.KubeModelPipelineResolutions),
+	}, nil
+}
+
+// Start launches the export controllers and the retention janitor.
+func (p *Pipeline) Start() {
+	p.controllers.Start(exportInterval)
+	p.janitor.Start(janitorInterval)
+}
+
+// Stop halts the export controllers and the retention janitor.
+func (p *Pipeline) Stop() {
+	p.controllers.Stop()
+	p.janitor.Stop()
+}

+ 100 - 0
pkg/kubemodel/querier.go

@@ -0,0 +1,100 @@
+package kubemodel
+
+import (
+	"fmt"
+	"time"
+
+	"github.com/opencost/opencost/core/pkg/exporter"
+	"github.com/opencost/opencost/core/pkg/exporter/pathing"
+	coremodel "github.com/opencost/opencost/core/pkg/model/kubemodel"
+	"github.com/opencost/opencost/core/pkg/opencost"
+	"github.com/opencost/opencost/core/pkg/storage"
+	"github.com/opencost/opencost/core/pkg/util/timeutil"
+)
+
+// supportedResolutions lists the resolutions written by the pipeline, in ascending order.
+var supportedResolutions = []time.Duration{time.Hour, timeutil.Day}
+
+// Querier is the query interface used by KubeModelHandler, allowing the
+// concrete *km.Querier to be swapped for a test double.
+type Querier interface {
+	Query(opencost.Window) ([]*coremodel.KubeModelSet, error)
+}
+
+// Querier reads KubeModelSets written by the pipeline from storage.
+type querier struct {
+	appName   string
+	clusterId string
+	store     storage.Storage
+}
+
+// NewQuerier creates a Querier backed by the given storage and cluster.
+func NewQuerier(appName, clusterId string, store storage.Storage) Querier {
+	return &querier{store: store, appName: appName, clusterId: clusterId}
+}
+
+// Query returns KubeModelSets covering the given window split into resolution-sized sub-windows.
+// resolution is snapped to the nearest supported pipeline resolution (1h or 1d).
+// Sub-windows with no file in storage are silently skipped.
+func (q *querier) Query(window opencost.Window) ([]*coremodel.KubeModelSet, error) {
+	if window.IsOpen() {
+		return nil, fmt.Errorf("kubemodel querier: window must be closed")
+	}
+
+	res := snapResolution(window)
+	resStr := timeutil.FormatStoreResolution(res)
+	formatter, err := pathing.NewKubeModelStoragePathFormatter(q.appName, q.clusterId, resStr)
+	if err != nil {
+		return nil, fmt.Errorf("kubemodel querier: %w", err)
+	}
+
+	start := window.Start().Truncate(res)
+	end := window.End().Truncate(res)
+	subWindows, err := opencost.GetWindowsForQueryWindow(start, end, res)
+	if err != nil {
+		return nil, fmt.Errorf("kubemodel querier: splitting window: %w", err)
+	}
+
+	results := make([]*coremodel.KubeModelSet, 0, len(subWindows))
+	for _, w := range subWindows {
+		kms, err := q.readWindow(formatter, w)
+		if err != nil {
+			if storage.IsNotExist(err) {
+				continue
+			}
+			return nil, fmt.Errorf("kubemodel querier: reading window %s: %w", w, err)
+		}
+		results = append(results, kms)
+	}
+
+	return results, nil
+}
+
+func (q *querier) readWindow(formatter pathing.StoragePathFormatter[opencost.Window], window opencost.Window) (*coremodel.KubeModelSet, error) {
+	path := formatter.ToFullPath("", window, exporter.BingenExt)
+
+	data, err := q.store.Read(path)
+	if err != nil {
+		return nil, err
+	}
+
+	kms := new(coremodel.KubeModelSet)
+	if err := kms.UnmarshalBinary(data); err != nil {
+		return nil, fmt.Errorf("decoding KubeModelSet: %w", err)
+	}
+
+	return kms, nil
+}
+
+// snapResolution returns the largest supported resolution that evenly divides
+// the window duration. Falls back to the smallest supported resolution if none
+// divides evenly.
+func snapResolution(window opencost.Window) time.Duration {
+	dur := window.Duration()
+	for i := len(supportedResolutions) - 1; i >= 0; i-- {
+		if dur%supportedResolutions[i] == 0 {
+			return supportedResolutions[i]
+		}
+	}
+	return supportedResolutions[0]
+}

+ 196 - 0
pkg/kubemodel/querier_test.go

@@ -0,0 +1,196 @@
+package kubemodel
+
+import (
+	"testing"
+	"time"
+
+	"github.com/opencost/opencost/core/pkg/exporter"
+	"github.com/opencost/opencost/core/pkg/exporter/pathing"
+	coremodel "github.com/opencost/opencost/core/pkg/model/kubemodel"
+	"github.com/opencost/opencost/core/pkg/opencost"
+	"github.com/opencost/opencost/core/pkg/storage"
+	"github.com/opencost/opencost/core/pkg/util/timeutil"
+	"github.com/stretchr/testify/require"
+)
+
+const (
+	testAppName   = "test-app"
+	testClusterID = "test-cluster"
+)
+
+// storeKMS marshals kms and writes it to store at the path the querier expects for the given window.
+func storeKMS(t *testing.T, store storage.Storage, appName, clusterID string, res time.Duration, w opencost.Window) {
+	t.Helper()
+	resStr := timeutil.FormatStoreResolution(res)
+	f, err := pathing.NewKubeModelStoragePathFormatter(appName, clusterID, resStr)
+	require.NoError(t, err)
+
+	p := f.ToFullPath("", w, exporter.BingenExt)
+	kms := coremodel.NewMockKubeModelSet(*w.Start(), *w.End())
+	data, err := kms.MarshalBinary()
+	require.NoError(t, err)
+	require.NoError(t, store.Write(p, data))
+}
+
+// --- snapResolution ---
+
+func TestSnapResolution(t *testing.T) {
+	cases := []struct {
+		name     string
+		window   opencost.Window
+		expected time.Duration
+	}{
+		{
+			name:     "1h window snaps to 1h",
+			window:   opencost.NewClosedWindow(time.Time{}, time.Time{}.Add(time.Hour)),
+			expected: time.Hour,
+		},
+		{
+			name:     "3h window snaps to 1h",
+			window:   opencost.NewClosedWindow(time.Time{}, time.Time{}.Add(3*time.Hour)),
+			expected: time.Hour,
+		},
+		{
+			name:     "12h window snaps to 1h",
+			window:   opencost.NewClosedWindow(time.Time{}, time.Time{}.Add(12*time.Hour)),
+			expected: time.Hour,
+		},
+		{
+			name:     "24h window snaps to 1d",
+			window:   opencost.NewClosedWindow(time.Time{}, time.Time{}.Add(timeutil.Day)),
+			expected: timeutil.Day,
+		},
+		{
+			name:     "48h window snaps to 1d",
+			window:   opencost.NewClosedWindow(time.Time{}, time.Time{}.Add(2*timeutil.Day)),
+			expected: timeutil.Day,
+		},
+		{
+			name:     "90m window (not evenly divisible) falls back to 1h",
+			window:   opencost.NewClosedWindow(time.Time{}, time.Time{}.Add(90*time.Minute)),
+			expected: time.Hour,
+		},
+	}
+
+	for _, tc := range cases {
+		t.Run(tc.name, func(t *testing.T) {
+			got := snapResolution(tc.window)
+			if got != tc.expected {
+				t.Errorf("snapResolution(%v) = %v, want %v", tc.window.Duration(), got, tc.expected)
+			}
+		})
+	}
+}
+
+// --- Querier.Query ---
+
+func TestQuerier_Query_RejectsOpenWindow(t *testing.T) {
+	q := NewQuerier(testAppName, testClusterID, storage.NewMemoryStorage())
+
+	end := time.Now().UTC()
+	_, err := q.Query(opencost.NewWindow(nil, &end))
+	require.Error(t, err, "nil start should be rejected")
+
+	start := time.Now().UTC()
+	_, err = q.Query(opencost.NewWindow(&start, nil))
+	require.Error(t, err, "nil end should be rejected")
+}
+
+func TestQuerier_Query_EmptyStorageReturnsNoResults(t *testing.T) {
+	q := NewQuerier(testAppName, testClusterID, storage.NewMemoryStorage())
+
+	start := time.Now().UTC().Truncate(time.Hour)
+	results, err := q.Query(opencost.NewClosedWindow(start, start.Add(3*time.Hour)))
+	require.NoError(t, err)
+	require.Empty(t, results)
+}
+
+func TestQuerier_Query_SingleHourlyWindow(t *testing.T) {
+	store := storage.NewMemoryStorage()
+	q := NewQuerier(testAppName, testClusterID, store)
+
+	start := time.Now().UTC().Truncate(time.Hour)
+	window := opencost.NewClosedWindow(start, start.Add(time.Hour))
+	storeKMS(t, store, testAppName, testClusterID, time.Hour, window)
+
+	results, err := q.Query(window)
+	require.NoError(t, err)
+	require.Len(t, results, 1)
+}
+
+func TestQuerier_Query_MultipleHourlySubWindows(t *testing.T) {
+	store := storage.NewMemoryStorage()
+	q := NewQuerier(testAppName, testClusterID, store)
+
+	start := time.Now().UTC().Truncate(time.Hour)
+	for i := range 3 {
+		w := opencost.NewClosedWindow(start.Add(time.Duration(i)*time.Hour), start.Add(time.Duration(i+1)*time.Hour))
+		storeKMS(t, store, testAppName, testClusterID, time.Hour, w)
+	}
+
+	results, err := q.Query(opencost.NewClosedWindow(start, start.Add(3*time.Hour)))
+	require.NoError(t, err)
+	require.Len(t, results, 3)
+}
+
+func TestQuerier_Query_SkipsMissingSubWindows(t *testing.T) {
+	store := storage.NewMemoryStorage()
+	q := NewQuerier(testAppName, testClusterID, store)
+
+	start := time.Now().UTC().Truncate(time.Hour)
+	// Write first and third sub-windows; leave the middle missing.
+	storeKMS(t, store, testAppName, testClusterID, time.Hour, opencost.NewClosedWindow(start, start.Add(time.Hour)))
+	storeKMS(t, store, testAppName, testClusterID, time.Hour, opencost.NewClosedWindow(start.Add(2*time.Hour), start.Add(3*time.Hour)))
+
+	results, err := q.Query(opencost.NewClosedWindow(start, start.Add(3*time.Hour)))
+	require.NoError(t, err)
+	require.Len(t, results, 2)
+}
+
+func TestQuerier_Query_DailyResolutionForFullDayWindow(t *testing.T) {
+	store := storage.NewMemoryStorage()
+	q := NewQuerier(testAppName, testClusterID, store)
+
+	start := time.Now().UTC().Truncate(timeutil.Day)
+	window := opencost.NewClosedWindow(start, start.Add(timeutil.Day))
+	storeKMS(t, store, testAppName, testClusterID, timeutil.Day, window)
+
+	results, err := q.Query(window)
+	require.NoError(t, err)
+	require.Len(t, results, 1)
+}
+
+func TestQuerier_Query_TruncatesWindowToResolution(t *testing.T) {
+	store := storage.NewMemoryStorage()
+	q := NewQuerier(testAppName, testClusterID, store)
+
+	// Aligned hour boundary for the one sub-window we expect to be queried.
+	alignedStart := time.Now().UTC().Truncate(time.Hour)
+	alignedWindow := opencost.NewClosedWindow(alignedStart, alignedStart.Add(time.Hour))
+	storeKMS(t, store, testAppName, testClusterID, time.Hour, alignedWindow)
+
+	// Query with start/end that are 15 minutes into the hour — both truncate to
+	// the same aligned boundary, so we still get the one stored sub-window.
+	unalignedStart := alignedStart.Add(15 * time.Minute)
+	unalignedEnd := alignedStart.Add(time.Hour + 15*time.Minute)
+	results, err := q.Query(opencost.NewClosedWindow(unalignedStart, unalignedEnd))
+	require.NoError(t, err)
+	require.Len(t, results, 1)
+}
+
+func TestQuerier_Query_CorruptDataReturnsError(t *testing.T) {
+	store := storage.NewMemoryStorage()
+	q := NewQuerier(testAppName, testClusterID, store)
+
+	start := time.Now().UTC().Truncate(time.Hour)
+	window := opencost.NewClosedWindow(start, start.Add(time.Hour))
+
+	resStr := timeutil.FormatStoreResolution(time.Hour)
+	f, err := pathing.NewKubeModelStoragePathFormatter(testAppName, testClusterID, resStr)
+	require.NoError(t, err)
+	p := f.ToFullPath("", window, exporter.BingenExt)
+	require.NoError(t, store.Write(p, []byte("not valid binary data")))
+
+	_, err = q.Query(window)
+	require.Error(t, err)
+}