Browse Source

Add exporter helpers and groups for hourly/daily exports.

Matt Bolt 1 năm trước cách đây
mục cha
commit
4c91cb7a86

+ 74 - 11
core/pkg/exporter/controller.go

@@ -2,34 +2,61 @@ package exporter
 
 import (
 	"reflect"
+	"strings"
 	"time"
 
 	"github.com/opencost/opencost/core/pkg/log"
 	"github.com/opencost/opencost/core/pkg/opencost"
 	"github.com/opencost/opencost/core/pkg/source"
 	"github.com/opencost/opencost/core/pkg/util/atomic"
+	"github.com/opencost/opencost/core/pkg/util/timeutil"
 )
 
+// ExportController is a controller interface that is responsible for exporting data on a specific interval.
+type ExportController interface {
+	// Name returns the name of the controller
+	Name() string
+
+	// Start starts a background compute processing loop, which will compute the data for the current resolution and export it
+	// on the provided interval. This function will return `true` if the loop was started successfully, and `false` if it was
+	// already running.
+	Start(interval time.Duration) bool
+
+	// Stops the compute processing loop
+	Stop()
+}
+
 // ComputeExportController[T] is a controller type which leverages a `ComputeSource[T]` and `Exporter[T]`
 // to regularly compute the data for the current resolution and export it on a specific interval.
 type ComputeExportController[T any] struct {
-	runState   atomic.AtomicRunState
-	source     ComputeSource[T]
-	exporter   Exporter[T]
-	resolution time.Duration
-	typeName   string
+	runState         atomic.AtomicRunState
+	source           ComputeSource[T]
+	exporter         Exporter[T]
+	resolution       time.Duration
+	sourceResolution time.Duration
+	typeName         string
 }
 
 // NewComputeExportController creates a new `ComputeExportController[T]` instance.
-func NewComputeExportController[T any](source ComputeSource[T], exporter Exporter[T], resolution time.Duration) *ComputeExportController[T] {
+func NewComputeExportController[T any](
+	source ComputeSource[T],
+	exporter Exporter[T],
+	sourceResolution time.Duration,
+) *ComputeExportController[T] {
 	return &ComputeExportController[T]{
-		source:     source,
-		resolution: resolution,
-		exporter:   exporter,
-		typeName:   reflect.TypeOf((*T)(nil)).Elem().String(),
+		source:           source,
+		resolution:       exporter.Resolution(),
+		sourceResolution: sourceResolution,
+		exporter:         exporter,
+		typeName:         reflect.TypeOf((*T)(nil)).Elem().String(),
 	}
 }
 
+// Name returns the name of the controller, which is a combination of the type name and the resolution
+func (cd *ComputeExportController[T]) Name() string {
+	return cd.typeName + "-" + timeutil.FormatStoreResolution(cd.resolution)
+}
+
 // Start starts a background compute processing loop, which will compute the data for the current resolution and export it
 // on the provided interval. This function will return `true` if the loop was started successfully, and `false` if it was
 // already running.
@@ -68,7 +95,7 @@ func (cd *ComputeExportController[T]) Start(interval time.Duration) bool {
 				continue
 			}
 
-			set, err := cd.source.Compute(start, end, cd.resolution)
+			set, err := cd.source.Compute(start, end, cd.sourceResolution)
 
 			// If a NoDataError or ErrorCollection is returned, we expect that an empty set will
 			// also be returned. Like an EOF error, this is an expected state
@@ -105,3 +132,39 @@ func (cd *ComputeExportController[T]) Stop() {
 func logErrors(start, end time.Time, warnings []string, errors []string) {
 
 }
+
+type ComputeExportControllerGroup[T any] struct {
+	controllers []*ComputeExportController[T]
+}
+
+func NewComputeExportControllerGroup[T any](controllers ...*ComputeExportController[T]) *ComputeExportControllerGroup[T] {
+	return &ComputeExportControllerGroup[T]{controllers: controllers}
+}
+
+func (g *ComputeExportControllerGroup[T]) Name() string {
+	var sb strings.Builder
+	sb.WriteRune('[')
+	for i, c := range g.controllers {
+		if i > 0 {
+			sb.WriteRune('/')
+		}
+		sb.WriteString(c.Name())
+	}
+	sb.WriteRune(']')
+	return sb.String()
+}
+
+func (g *ComputeExportControllerGroup[T]) Start(interval time.Duration) bool {
+	for _, c := range g.controllers {
+		if !c.Start(interval) {
+			return false
+		}
+	}
+	return true
+}
+
+func (g *ComputeExportControllerGroup[T]) Stop() {
+	for _, c := range g.controllers {
+		c.Stop()
+	}
+}

+ 9 - 0
core/pkg/exporter/exporter.go

@@ -13,7 +13,11 @@ import (
 
 // Exporter[T] is a generic interface for exporting T instances to a specific storage destination.
 type Exporter[T any] interface {
+	// Export performs the export operation for the given window and data.
 	Export(window opencost.Window, data *T) error
+
+	// Resolution contains the resolution of the data being exported
+	Resolution() time.Duration
 }
 
 // StorageExporter[T] is an implementation of Exporter[T] that writes data to a storage backend using
@@ -85,3 +89,8 @@ func (se *StorageExporter[T]) Export(window opencost.Window, data *T) error {
 
 	return nil
 }
+
+// Resolution returns the resolution of the data being exported.
+func (se *StorageExporter[T]) Resolution() time.Duration {
+	return se.resolution
+}

+ 8 - 8
modules/prometheus-source/go.mod

@@ -21,7 +21,7 @@ require (
 	github.com/fsnotify/fsnotify v1.6.0 // indirect
 	github.com/fxamacker/cbor/v2 v2.7.0 // indirect
 	github.com/go-logr/logr v1.4.2 // indirect
-	github.com/goccy/go-json v0.10.2 // indirect
+	github.com/goccy/go-json v0.10.5 // indirect
 	github.com/gogo/protobuf v1.3.2 // indirect
 	github.com/google/gofuzz v1.2.0 // indirect
 	github.com/hashicorp/hcl v1.0.0 // indirect
@@ -33,7 +33,7 @@ require (
 	github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
 	github.com/pelletier/go-toml v1.9.3 // indirect
 	github.com/prometheus/client_model v0.6.1 // indirect
-	github.com/prometheus/common v0.55.0 // indirect
+	github.com/prometheus/common v0.63.0 // indirect
 	github.com/rs/zerolog v1.26.1 // indirect
 	github.com/spf13/afero v1.6.0 // indirect
 	github.com/spf13/cast v1.3.1 // indirect
@@ -42,13 +42,13 @@ require (
 	github.com/spf13/viper v1.8.1 // indirect
 	github.com/subosito/gotenv v1.2.0 // indirect
 	github.com/x448/float16 v0.8.4 // indirect
-	golang.org/x/net v0.30.0 // indirect
-	golang.org/x/oauth2 v0.23.0 // indirect
-	golang.org/x/sys v0.26.0 // indirect
-	golang.org/x/term v0.25.0 // indirect
-	golang.org/x/text v0.19.0 // indirect
+	golang.org/x/net v0.37.0 // indirect
+	golang.org/x/oauth2 v0.25.0 // indirect
+	golang.org/x/sys v0.31.0 // indirect
+	golang.org/x/term v0.30.0 // indirect
+	golang.org/x/text v0.23.0 // indirect
 	golang.org/x/time v0.7.0 // indirect
-	google.golang.org/protobuf v1.35.1 // indirect
+	google.golang.org/protobuf v1.36.5 // indirect
 	gopkg.in/inf.v0 v0.9.1 // indirect
 	gopkg.in/ini.v1 v1.67.0 // indirect
 	k8s.io/api v0.32.0 // indirect

+ 20 - 20
modules/prometheus-source/go.sum

@@ -90,8 +90,8 @@ github.com/go-openapi/jsonreference v0.20.2 h1:3sVjiK66+uXK/6oQ8xgcRKcFgQ5KXa2Kv
 github.com/go-openapi/jsonreference v0.20.2/go.mod h1:Bl1zwGIM8/wsvqjsOQLJ/SH+En5Ap4rVB5KVcIDZG2k=
 github.com/go-openapi/swag v0.23.0 h1:vsEVJDUo2hPJ2tu0/Xc+4noaxyEffXNIs3cOULZ+GrE=
 github.com/go-openapi/swag v0.23.0/go.mod h1:esZ8ITTYEsH1V2trKHjAN8Ai7xHb8RV+YSZ577vPjgQ=
-github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
-github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
+github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4=
+github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
 github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
 github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
 github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
@@ -142,8 +142,8 @@ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
 github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
 github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
 github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
-github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
-github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
+github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
+github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
 github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
 github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0=
 github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
@@ -259,8 +259,8 @@ github.com/prometheus/client_golang v1.20.5/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/j
 github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
 github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E=
 github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY=
-github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G1dc=
-github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8=
+github.com/prometheus/common v0.63.0 h1:YR/EIY1o3mEFP/kZCD7iDMnLPlGyuU2Gb3HIcXnA98k=
+github.com/prometheus/common v0.63.0/go.mod h1:VVFF/fBIoToEnWRVkYoXEkq3R3paCoxG9PXP74SnV18=
 github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
 github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
 github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
@@ -291,8 +291,8 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P
 github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
 github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
 github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
-github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
-github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
+github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
+github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
 github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
 github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
 github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
@@ -396,8 +396,8 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v
 golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc=
 golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
 golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
-golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4=
-golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU=
+golang.org/x/net v0.37.0 h1:1zLorHbz+LYj7MQlSf1+2tPIIgibq2eL5xkrGk6f+2c=
+golang.org/x/net v0.37.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8=
 golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
 golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
 golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -410,8 +410,8 @@ golang.org/x/oauth2 v0.0.0-20210218202405-ba52d332ba99/go.mod h1:KelEdhl1UZF7XfJ
 golang.org/x/oauth2 v0.0.0-20210220000619-9bb904979d93/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
 golang.org/x/oauth2 v0.0.0-20210313182246-cd4f82c27b84/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
 golang.org/x/oauth2 v0.0.0-20210402161424-2e8d93401602/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
-golang.org/x/oauth2 v0.23.0 h1:PbgcYx2W7i4LvjJWEbf0ngHV6qJYr86PkAV3bXdLEbs=
-golang.org/x/oauth2 v0.23.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
+golang.org/x/oauth2 v0.25.0 h1:CY4y7XT9v0cRI9oupztF8AgiIu99L/ksR/Xp/6jrZ70=
+golang.org/x/oauth2 v0.25.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
 golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@@ -467,11 +467,11 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w
 golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo=
-golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
+golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik=
+golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
 golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
-golang.org/x/term v0.25.0 h1:WtHI/ltw4NvSUig5KARz9h521QvRC8RmF/cuYqifU24=
-golang.org/x/term v0.25.0/go.mod h1:RPyXicDX+6vLxogjjRxjgD2TKtmAO6NZBsBRfrOLu7M=
+golang.org/x/term v0.30.0 h1:PQ39fJZ+mfadBm0y5WlL4vlM7Sx1Hgf13sMIY2+QS9Y=
+golang.org/x/term v0.30.0/go.mod h1:NYYFdzHoI5wRh/h5tDMdMqCqPJZEuNqVR5xJLd/n67g=
 golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
 golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
 golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@@ -480,8 +480,8 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
-golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM=
-golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
+golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY=
+golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4=
 golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
 golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
 golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
@@ -645,8 +645,8 @@ google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGj
 google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
 google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
 google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
-google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA=
-google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
+google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM=
+google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=

+ 1 - 48
pkg/costmodel/router.go

@@ -76,10 +76,6 @@ type Accesses struct {
 	ClusterInfoProvider clusters.ClusterInfoProvider
 	Model               *CostModel
 	MetricsEmitter      *CostModelMetricsEmitter
-	OutOfClusterCache   *cache.Cache
-	CostDataCache       *cache.Cache
-	ClusterCostsCache   *cache.Cache
-	CacheExpiration     map[time.Duration]time.Duration
 	// SettingsCache stores current state of app settings
 	SettingsCache *cache.Cache
 	// settingsSubscribers tracks channels through which changes to different
@@ -88,28 +84,6 @@ type Accesses struct {
 	settingsMutex       sync.Mutex
 }
 
-// GetCacheExpiration looks up and returns custom cache expiration for the given duration.
-// If one does not exists, it returns the default cache expiration, which is defined by
-// the particular cache.
-func (a *Accesses) GetCacheExpiration(dur time.Duration) time.Duration {
-	if expiration, ok := a.CacheExpiration[dur]; ok {
-		return expiration
-	}
-	return cache.DefaultExpiration
-}
-
-// GetCacheRefresh determines how long to wait before refreshing the cache for the given duration,
-// which is done 1 minute before we expect the cache to expire, or 1 minute if expiration is
-// not found or is less than 2 minutes.
-func (a *Accesses) GetCacheRefresh(dur time.Duration) time.Duration {
-	expiry := a.GetCacheExpiration(dur).Minutes()
-	if expiry <= 2.0 {
-		return time.Minute
-	}
-	mins := time.Duration(expiry/2.0) * time.Minute
-	return mins
-}
-
 // FilterFunc is a filter that returns true iff the given CostData should be filtered out, and the environment that was used as the filter criteria, if it was an aggregate
 type FilterFunc func(*CostData) (bool, string)
 
@@ -280,7 +254,7 @@ func (a *Accesses) UpdateSpotInfoConfigs(w http.ResponseWriter, r *http.Request,
 	w.Header().Set("Access-Control-Allow-Origin", "*")
 
 	data, err := a.CloudProvider.UpdateConfig(r.Body, aws.SpotInfoUpdateType)
-	WriteData(w, data, nil)
+	WriteData(w, data, err)
 
 	err = a.CloudProvider.DownloadPricingData()
 	if err != nil {
@@ -589,25 +563,8 @@ func Initialize(router *httprouter.Router, additionalConfigWatchers ...*watcher.
 	configWatchers.Watch()
 
 	clusterMap := dataSource.ClusterMap()
-
-	// cache responses from model and aggregation for a default of 10 minutes;
-	// clear expired responses every 20 minutes
-	costDataCache := cache.New(time.Minute*10, time.Minute*20)
-	clusterCostsCache := cache.New(cache.NoExpiration, cache.NoExpiration)
-	outOfClusterCache := cache.New(time.Minute*5, time.Minute*10)
 	settingsCache := cache.New(cache.NoExpiration, cache.NoExpiration)
 
-	// query durations that should be cached longer should be registered here
-	// use relatively prime numbers to minimize likelihood of synchronized
-	// attempts at cache warming
-	day := 24 * time.Hour
-	cacheExpiration := map[time.Duration]time.Duration{
-		day:      maxCacheMinutes1d * time.Minute,
-		2 * day:  maxCacheMinutes2d * time.Minute,
-		7 * day:  maxCacheMinutes7d * time.Minute,
-		30 * day: maxCacheMinutes30d * time.Minute,
-	}
-
 	costModel := NewCostModel(dataSource, cloudProvider, k8sCache, clusterMap, dataSource.BatchDuration())
 	metricsEmitter := NewCostModelMetricsEmitter(k8sCache, cloudProvider, clusterInfoProvider, costModel)
 
@@ -621,11 +578,7 @@ func Initialize(router *httprouter.Router, additionalConfigWatchers ...*watcher.
 		ClusterInfoProvider: clusterInfoProvider,
 		Model:               costModel,
 		MetricsEmitter:      metricsEmitter,
-		CostDataCache:       costDataCache,
-		ClusterCostsCache:   clusterCostsCache,
-		OutOfClusterCache:   outOfClusterCache,
 		SettingsCache:       settingsCache,
-		CacheExpiration:     cacheExpiration,
 	}
 
 	// Initialize mechanism for subscribing to settings changes

+ 0 - 1
pkg/costmodel/settings.go

@@ -52,7 +52,6 @@ func (a *Accesses) InitializeSettingsPubSub() {
 		for {
 			msg := <-costDataCacheCh
 			log.Infof("Flushing cost data caches: %s", msg)
-			a.CostDataCache.Flush()
 		}
 	}(a)
 }

+ 65 - 0
pkg/exporter/controllers.go

@@ -0,0 +1,65 @@
+package exporter
+
+import (
+	"time"
+
+	export "github.com/opencost/opencost/core/pkg/exporter"
+	"github.com/opencost/opencost/core/pkg/opencost"
+	"github.com/opencost/opencost/core/pkg/storage"
+	"github.com/opencost/opencost/pkg/costmodel"
+	"github.com/opencost/opencost/pkg/exporter/allocation"
+	"github.com/opencost/opencost/pkg/exporter/asset"
+	"github.com/opencost/opencost/pkg/exporter/networkinsight"
+)
+
+type PipelineExportControllers struct {
+	AllocationExportController     *export.ComputeExportControllerGroup[opencost.AllocationSet]
+	AssetExportController          *export.ComputeExportControllerGroup[opencost.AssetSet]
+	NetworkInsightExportController *export.ComputeExportControllerGroup[opencost.NetworkInsightSet]
+}
+
+func NewPipelineExportControllers(clusterId string, store storage.Storage, cm *costmodel.CostModel) *PipelineExportControllers {
+	mins := int(cm.DataSource.Resolution().Minutes())
+	if mins <= 0 {
+		mins = 1
+	}
+
+	// minimum source/query resolution
+	sourceResolution := time.Duration(mins) * time.Minute
+
+	// allocation sources and exporters
+	allocSource := allocation.NewAllocationComputeSource(cm)
+	hourlyAllocExporter := NewAllocationStorageExporter(clusterId, time.Hour, store)
+	dailyAllocExporter := NewAllocationStorageExporter(clusterId, 24*time.Hour, store)
+
+	hourlyAllocController := export.NewComputeExportController(allocSource, hourlyAllocExporter, sourceResolution)
+	dailyAllocController := export.NewComputeExportController(allocSource, dailyAllocExporter, sourceResolution)
+
+	allocController := export.NewComputeExportControllerGroup(hourlyAllocController, dailyAllocController)
+
+	// asset sources and exporters
+	assetSource := asset.NewAssetsComputeSource(cm)
+	hourlyAssetExporter := NewAssetsStorageExporter(clusterId, time.Hour, store)
+	dailyAssetExporter := NewAssetsStorageExporter(clusterId, 24*time.Hour, store)
+
+	hourlyAssetController := export.NewComputeExportController(assetSource, hourlyAssetExporter, sourceResolution)
+	dailyAssetController := export.NewComputeExportController(assetSource, dailyAssetExporter, sourceResolution)
+
+	assetController := export.NewComputeExportControllerGroup(hourlyAssetController, dailyAssetController)
+
+	// network insights sources and exporters
+	networkInsightSource := networkinsight.NewNetworkInsightsComputeSource(cm)
+	hourlyNetworkInsightExporter := NewNetworkInsightStorageExporter(clusterId, time.Hour, store)
+	dailyNetworkInsightExporter := NewNetworkInsightStorageExporter(clusterId, 24*time.Hour, store)
+
+	hourlyNetworkInsightController := export.NewComputeExportController(networkInsightSource, hourlyNetworkInsightExporter, sourceResolution)
+	dailyNetworkInsightController := export.NewComputeExportController(networkInsightSource, dailyNetworkInsightExporter, sourceResolution)
+
+	networkInsightController := export.NewComputeExportControllerGroup(hourlyNetworkInsightController, dailyNetworkInsightController)
+
+	return &PipelineExportControllers{
+		AllocationExportController:     allocController,
+		AssetExportController:          assetController,
+		NetworkInsightExportController: networkInsightController,
+	}
+}

+ 64 - 0
pkg/exporter/exporters.go

@@ -0,0 +1,64 @@
+package exporter
+
+import (
+	"time"
+
+	export "github.com/opencost/opencost/core/pkg/exporter"
+	"github.com/opencost/opencost/core/pkg/exporter/pathing"
+	"github.com/opencost/opencost/core/pkg/exporter/validator"
+	"github.com/opencost/opencost/core/pkg/log"
+	"github.com/opencost/opencost/core/pkg/opencost"
+	"github.com/opencost/opencost/core/pkg/pipelines"
+	"github.com/opencost/opencost/core/pkg/storage"
+)
+
+func NewAllocationStorageExporter(clusterId string, resolution time.Duration, store storage.Storage) export.Exporter[opencost.AllocationSet] {
+	pathing, err := pathing.NewBingenStoragePathFormatter("", clusterId, pipelines.AllocationPipelineName, &resolution)
+	if err != nil {
+		log.Errorf("failed to create pathing formatter: %v", err)
+		return nil
+	}
+
+	return export.NewStorageExporter(
+		pipelines.AllocationPipelineName,
+		resolution,
+		pathing,
+		NewAllocationEncoder(),
+		store,
+		validator.NewSetValidator[opencost.AllocationSet](resolution),
+	)
+}
+
+func NewAssetsStorageExporter(clusterId string, resolution time.Duration, store storage.Storage) export.Exporter[opencost.AssetSet] {
+	pathing, err := pathing.NewBingenStoragePathFormatter("", clusterId, pipelines.AssetsPipelineName, &resolution)
+	if err != nil {
+		log.Errorf("failed to create pathing formatter: %v", err)
+		return nil
+	}
+
+	return export.NewStorageExporter(
+		pipelines.AssetsPipelineName,
+		resolution,
+		pathing,
+		NewAssetsEncoder(),
+		store,
+		validator.NewSetValidator[opencost.AssetSet](resolution),
+	)
+}
+
+func NewNetworkInsightStorageExporter(clusterId string, resolution time.Duration, store storage.Storage) export.Exporter[opencost.NetworkInsightSet] {
+	pathing, err := pathing.NewBingenStoragePathFormatter("", clusterId, pipelines.NetworkInsightPipelineName, &resolution)
+	if err != nil {
+		log.Errorf("failed to create pathing formatter: %v", err)
+		return nil
+	}
+
+	return export.NewStorageExporter(
+		pipelines.NetworkInsightPipelineName,
+		resolution,
+		pathing,
+		NewNetworkInsightEncoder(),
+		store,
+		validator.NewSetValidator[opencost.NetworkInsightSet](resolution),
+	)
+}