Jelajahi Sumber

Cluster Storage (#3218)

Signed-off-by: Sean Holcomb <seanholcomb@gmail.com>
Sean Holcomb 9 bulan lalu
induk
melakukan
6739999170

+ 18 - 64
core/pkg/storage/azurestorage.go

@@ -8,7 +8,6 @@ import (
 	"bytes"
 	"context"
 	"fmt"
-	"net"
 	"net/http"
 	"strings"
 	"time"
@@ -44,11 +43,11 @@ var defaultAzureConfig = AzureConfig{
 	ReaderConfig: ReaderConfig{
 		MaxRetryRequests: 0,
 	},
-	HTTPConfig: AzureHTTPConfig{
-		IdleConnTimeout:       model.Duration(90 * time.Second),
-		ResponseHeaderTimeout: model.Duration(2 * time.Minute),
-		TLSHandshakeTimeout:   model.Duration(10 * time.Second),
-		ExpectContinueTimeout: model.Duration(1 * time.Second),
+	HTTPConfig: HTTPConfig{
+		IdleConnTimeout:       90 * time.Second,
+		ResponseHeaderTimeout: 2 * time.Minute,
+		TLSHandshakeTimeout:   10 * time.Second,
+		ExpectContinueTimeout: 1 * time.Second,
 		MaxIdleConns:          100,
 		MaxIdleConnsPerHost:   100,
 		MaxConnsPerHost:       0,
@@ -58,17 +57,17 @@ var defaultAzureConfig = AzureConfig{
 
 // AzureConfig Azure storage configuration.
 type AzureConfig struct {
-	StorageAccountName      string          `yaml:"storage_account"`
-	StorageAccountKey       string          `yaml:"storage_account_key"`
-	StorageConnectionString string          `yaml:"storage_connection_string"`
-	ContainerName           string          `yaml:"container"`
-	Endpoint                string          `yaml:"endpoint"`
-	MaxRetries              int             `yaml:"max_retries"`
-	MSIResource             string          `yaml:"msi_resource"`
-	UserAssignedID          string          `yaml:"user_assigned_id"`
-	PipelineConfig          PipelineConfig  `yaml:"pipeline_config"`
-	ReaderConfig            ReaderConfig    `yaml:"reader_config"`
-	HTTPConfig              AzureHTTPConfig `yaml:"http_config"`
+	StorageAccountName      string         `yaml:"storage_account"`
+	StorageAccountKey       string         `yaml:"storage_account_key"`
+	StorageConnectionString string         `yaml:"storage_connection_string"`
+	ContainerName           string         `yaml:"container"`
+	Endpoint                string         `yaml:"endpoint"`
+	MaxRetries              int            `yaml:"max_retries"`
+	MSIResource             string         `yaml:"msi_resource"`
+	UserAssignedID          string         `yaml:"user_assigned_id"`
+	PipelineConfig          PipelineConfig `yaml:"pipeline_config"`
+	ReaderConfig            ReaderConfig   `yaml:"reader_config"`
+	HTTPConfig              HTTPConfig     `yaml:"http_config"`
 }
 
 type ReaderConfig struct {
@@ -82,21 +81,6 @@ type PipelineConfig struct {
 	MaxRetryDelay model.Duration `yaml:"max_retry_delay"`
 }
 
-type AzureHTTPConfig struct {
-	IdleConnTimeout       model.Duration `yaml:"idle_conn_timeout"`
-	ResponseHeaderTimeout model.Duration `yaml:"response_header_timeout"`
-	InsecureSkipVerify    bool           `yaml:"insecure_skip_verify"`
-
-	TLSHandshakeTimeout   model.Duration `yaml:"tls_handshake_timeout"`
-	ExpectContinueTimeout model.Duration `yaml:"expect_continue_timeout"`
-	MaxIdleConns          int            `yaml:"max_idle_conns"`
-	MaxIdleConnsPerHost   int            `yaml:"max_idle_conns_per_host"`
-	MaxConnsPerHost       int            `yaml:"max_conns_per_host"`
-	DisableCompression    bool           `yaml:"disable_compression"`
-
-	TLSConfig TLSConfig `yaml:"tls_config"`
-}
-
 // AzureStorage implements the storage.Storage interface against Azure APIs.
 type AzureStorage struct {
 	name            string
@@ -423,40 +407,10 @@ func (b *AzureStorage) IsAccessDeniedErr(err error) bool {
 	return bloberror.HasCode(err, bloberror.AuthorizationPermissionMismatch) || bloberror.HasCode(err, bloberror.InsufficientAccountPermissions)
 }
 
-func DefaultAzureTransport(config AzureConfig) (*http.Transport, error) {
-	tlsConfig, err := NewTLSConfig(&config.HTTPConfig.TLSConfig)
-	if err != nil {
-		return nil, fmt.Errorf("error creating TLS config: %w", err)
-	}
-
-	if config.HTTPConfig.InsecureSkipVerify {
-		tlsConfig.InsecureSkipVerify = true
-	}
-	return &http.Transport{
-		Proxy: http.ProxyFromEnvironment,
-		DialContext: (&net.Dialer{
-			Timeout:   30 * time.Second,
-			KeepAlive: 30 * time.Second,
-			DualStack: true,
-		}).DialContext,
-
-		MaxIdleConns:          config.HTTPConfig.MaxIdleConns,
-		MaxIdleConnsPerHost:   config.HTTPConfig.MaxIdleConnsPerHost,
-		IdleConnTimeout:       time.Duration(config.HTTPConfig.IdleConnTimeout),
-		MaxConnsPerHost:       config.HTTPConfig.MaxConnsPerHost,
-		TLSHandshakeTimeout:   time.Duration(config.HTTPConfig.TLSHandshakeTimeout),
-		ExpectContinueTimeout: time.Duration(config.HTTPConfig.ExpectContinueTimeout),
-
-		ResponseHeaderTimeout: time.Duration(config.HTTPConfig.ResponseHeaderTimeout),
-		DisableCompression:    config.HTTPConfig.DisableCompression,
-		TLSClientConfig:       tlsConfig,
-	}, nil
-}
-
 func getContainerClient(conf AzureConfig) (*container.Client, error) {
-	dt, err := DefaultAzureTransport(conf)
+	dt, err := conf.HTTPConfig.GetHTTPTransport()
 	if err != nil {
-		return nil, fmt.Errorf("error creating default transport: %w", err)
+		return nil, fmt.Errorf("error creating transport: %w", err)
 	}
 	opt := &container.ClientOptions{
 		ClientOptions: azcore.ClientOptions{

+ 6 - 3
core/pkg/storage/bucketstorage.go

@@ -12,9 +12,10 @@ import (
 type StorageProvider string
 
 const (
-	S3    StorageProvider = "S3"
-	GCS   StorageProvider = "GCS"
-	AZURE StorageProvider = "AZURE"
+	S3      StorageProvider = "S3"
+	GCS     StorageProvider = "GCS"
+	AZURE   StorageProvider = "AZURE"
+	CLUSTER StorageProvider = "CLUSTER"
 )
 
 // StorageConfig is the configuration type used as the "parent" configuration. It contains a type, which will
@@ -48,6 +49,8 @@ func NewBucketStorage(config []byte) (Storage, error) {
 		storage, err = NewGCSStorage(config)
 	case string(AZURE):
 		storage, err = NewAzureStorage(config)
+	case string(CLUSTER):
+		storage, err = NewClusterStorage(config)
 	default:
 		return nil, errors.Errorf("storage with type %s is not supported", storageConfig.Type)
 	}

+ 5 - 400
core/pkg/storage/bucketstorage_test.go

@@ -3,28 +3,12 @@ package storage
 import (
 	"fmt"
 	"os"
-	"path"
 	"testing"
-
-	"github.com/opencost/opencost/core/pkg/util/json"
 )
 
 // This suite of integration tests is meant to validate if an implementation of Storage that relies on a could
 // bucket service properly implements the interface. To run these tests the env variable "TEST_BUCKET_CONFIG"
 // must be set with the path to a valid bucket config as defined in the NewBucketStorage() function.
-
-type testFileContent struct {
-	Field1 int    `json:"field_1"`
-	Field2 string `json:"field_2"`
-}
-
-var tfc = testFileContent{
-	Field1: 101,
-	Field2: "TEST_FILE_CONTENT",
-}
-
-const testpath = "opencost/storage/"
-
 func createStorage(configPath string) (Storage, error) {
 
 	bytes, err := os.ReadFile(configPath)
@@ -39,35 +23,6 @@ func createStorage(configPath string) (Storage, error) {
 	return store, nil
 }
 
-func createFiles(files []string, testName string, store Storage) error {
-	b, err := json.Marshal(tfc)
-	if err != nil {
-		return fmt.Errorf("failed to marshal file content: %w", err)
-	}
-
-	for _, fileName := range files {
-		filePath := path.Join(testpath, testName, fileName)
-		err = store.Write(filePath, b)
-		if err != nil {
-			return fmt.Errorf("failed to write file '%s': %w ", filePath, err)
-		}
-	}
-
-	return nil
-}
-
-func cleanupFiles(files []string, testName string, store Storage) error {
-	for _, fileName := range files {
-		filePath := path.Join(testpath, testName, fileName)
-		err := store.Remove(filePath)
-		if err != nil {
-			return fmt.Errorf("failed to remove file '%s': %w ", filePath, err)
-		}
-	}
-
-	return nil
-}
-
 func TestBucketStorage_List(t *testing.T) {
 	configPath := os.Getenv("TEST_BUCKET_CONFIG")
 	if configPath == "" {
@@ -79,92 +34,7 @@ func TestBucketStorage_List(t *testing.T) {
 		return
 	}
 
-	testName := "list"
-
-	fileNames := []string{
-		"/file0.json",
-		"/file1.json",
-		"/dir0/file2.json",
-		"/dir0/file3.json",
-	}
-
-	err = createFiles(fileNames, testName, store)
-	if err != nil {
-		t.Errorf("failed to create files: %s", err)
-	}
-
-	defer func() {
-		err = cleanupFiles(fileNames, testName, store)
-		if err != nil {
-			t.Errorf("failed to clean up files: %s", err)
-		}
-	}()
-
-	testCases := map[string]struct {
-		path      string
-		expected  []string
-		expectErr bool
-	}{
-		"base dir files": {
-			path: path.Join(testpath, testName),
-			expected: []string{
-				"file0.json",
-				"file1.json",
-			},
-			expectErr: false,
-		},
-		"single nested dir files": {
-			path: path.Join(testpath, testName, "dir0"),
-			expected: []string{
-				"file2.json",
-				"file3.json",
-			},
-			expectErr: false,
-		},
-		"nonexistent dir files": {
-			path:      path.Join(testpath, testName, "dir1"),
-			expected:  []string{},
-			expectErr: false,
-		},
-	}
-
-	for name, tc := range testCases {
-		t.Run(name, func(t *testing.T) {
-			fileList, err := store.List(tc.path)
-			if tc.expectErr == (err == nil) {
-				if tc.expectErr {
-					t.Errorf("expected error was not thrown")
-					return
-				}
-				t.Errorf("unexpected error: %s", err.Error())
-				return
-			}
-
-			if len(fileList) != len(tc.expected) {
-				t.Errorf("file list length does not match expected length, actual: %d, expected: %d", len(fileList), len(tc.expected))
-			}
-
-			expectedSet := map[string]struct{}{}
-			for _, expName := range tc.expected {
-				expectedSet[expName] = struct{}{}
-			}
-
-			for _, file := range fileList {
-				_, ok := expectedSet[file.Name]
-				if !ok {
-					t.Errorf("unexpect file in list %s", file.Name)
-				}
-
-				if file.Size == 0 {
-					t.Errorf("file size is not set")
-				}
-
-				if file.ModTime.IsZero() {
-					t.Errorf("file mod time is not set")
-				}
-			}
-		})
-	}
+	TestStorageList(t, store)
 }
 
 func TestBucketStorage_ListDirectories(t *testing.T) {
@@ -178,89 +48,7 @@ func TestBucketStorage_ListDirectories(t *testing.T) {
 		return
 	}
 
-	testName := "list_directories"
-
-	fileNames := []string{
-		"/file0.json",
-		"/dir0/file2.json",
-		"/dir0/file3.json",
-		"/dir0/dir1/file4.json",
-		"/dir0/dir2/file5.json",
-	}
-
-	err = createFiles(fileNames, testName, store)
-	if err != nil {
-		t.Errorf("failed to create files: %s", err)
-	}
-
-	defer func() {
-		err = cleanupFiles(fileNames, testName, store)
-		if err != nil {
-			t.Errorf("failed to clean up files: %s", err)
-		}
-	}()
-
-	testCases := map[string]struct {
-		path      string
-		expected  []string
-		expectErr bool
-	}{
-		"base dir dir": {
-			path: path.Join(testpath, testName),
-			expected: []string{
-				path.Join(testpath, testName, "dir0") + "/",
-			},
-			expectErr: false,
-		},
-		"single nested dir files": {
-			path: path.Join(testpath, testName, "dir0"),
-			expected: []string{
-				path.Join(testpath, testName, "dir0", "dir1") + "/",
-				path.Join(testpath, testName, "dir0", "dir2") + "/",
-			},
-			expectErr: false,
-		},
-		"dir with no sub dirs": {
-			path:      path.Join(testpath, testName, "dir0/dir1"),
-			expected:  []string{},
-			expectErr: false,
-		},
-		"non-existent dir": {
-			path:      path.Join(testpath, testName, "dir1"),
-			expected:  []string{},
-			expectErr: false,
-		},
-	}
-
-	for name, tc := range testCases {
-		t.Run(name, func(t *testing.T) {
-			dirList, err := store.ListDirectories(tc.path)
-			if tc.expectErr == (err == nil) {
-				if tc.expectErr {
-					t.Errorf("expected error was not thrown")
-					return
-				}
-				t.Errorf("unexpected error: %s", err.Error())
-				return
-			}
-
-			if len(dirList) != len(tc.expected) {
-				t.Errorf("dir list length does not match expected length, actual: %d, expected: %d", len(dirList), len(tc.expected))
-			}
-
-			expectedSet := map[string]struct{}{}
-			for _, expName := range tc.expected {
-				expectedSet[expName] = struct{}{}
-			}
-
-			for _, dir := range dirList {
-				_, ok := expectedSet[dir.Name]
-				if !ok {
-					t.Errorf("unexpect dir in list %s", dir.Name)
-				}
-			}
-		})
-	}
+	TestStorageListDirectories(t, store)
 }
 
 func TestBucketStorage_Exists(t *testing.T) {
@@ -274,63 +62,7 @@ func TestBucketStorage_Exists(t *testing.T) {
 		return
 	}
 
-	testName := "exists"
-
-	fileNames := []string{
-		"/file0.json",
-	}
-
-	err = createFiles(fileNames, testName, store)
-	if err != nil {
-		t.Errorf("failed to create files: %s", err)
-	}
-
-	defer func() {
-		err = cleanupFiles(fileNames, testName, store)
-		if err != nil {
-			t.Errorf("failed to clean up files: %s", err)
-		}
-	}()
-
-	testCases := map[string]struct {
-		path      string
-		expected  bool
-		expectErr bool
-	}{
-		"file exists": {
-			path:      path.Join(testpath, testName, "file0.json"),
-			expected:  true,
-			expectErr: false,
-		},
-		"file does not exist": {
-			path:      path.Join(testpath, testName, "file1.json"),
-			expected:  false,
-			expectErr: false,
-		},
-		"dir does not exist": {
-			path:      path.Join(testpath, testName, "dir0/file.json"),
-			expected:  false,
-			expectErr: false,
-		},
-	}
-
-	for name, tc := range testCases {
-		t.Run(name, func(t *testing.T) {
-			exists, err := store.Exists(tc.path)
-			if tc.expectErr == (err == nil) {
-				if tc.expectErr {
-					t.Errorf("expected error was not thrown")
-					return
-				}
-				t.Errorf("unexpected error: %s", err.Error())
-				return
-			}
-
-			if exists != tc.expected {
-				t.Errorf("file exists output did not match expected")
-			}
-		})
-	}
+	TestStorageExists(t, store)
 }
 
 func TestBucketStorage_Read(t *testing.T) {
@@ -344,68 +76,7 @@ func TestBucketStorage_Read(t *testing.T) {
 		return
 	}
 
-	testName := "read"
-
-	fileNames := []string{
-		"/file0.json",
-	}
-
-	err = createFiles(fileNames, testName, store)
-	if err != nil {
-		t.Errorf("failed to create files: %s", err)
-	}
-
-	defer func() {
-		err = cleanupFiles(fileNames, testName, store)
-		if err != nil {
-			t.Errorf("failed to clean up files: %s", err)
-		}
-	}()
-
-	testCases := map[string]struct {
-		path      string
-		expectErr bool
-	}{
-		"file exists": {
-			path:      path.Join(testpath, testName, "file0.json"),
-			expectErr: false,
-		},
-		"file does not exist": {
-			path:      path.Join(testpath, testName, "file1.json"),
-			expectErr: true,
-		},
-		"dir does not exist": {
-			path:      path.Join(testpath, testName, "dir0/file.json"),
-			expectErr: true,
-		},
-	}
-
-	for name, tc := range testCases {
-		t.Run(name, func(t *testing.T) {
-			b, err := store.Read(tc.path)
-			if tc.expectErr && err != nil {
-				return
-			}
-			if tc.expectErr == (err == nil) {
-				if tc.expectErr {
-					t.Errorf("expected error was not thrown")
-					return
-				}
-				t.Errorf("unexpected error: %s", err.Error())
-				return
-			}
-			var content testFileContent
-			err = json.Unmarshal(b, &content)
-			if err != nil {
-				t.Errorf("could not unmarshal file content")
-				return
-			}
-
-			if content != tfc {
-				t.Errorf("file content did not match writen value")
-			}
-		})
-	}
+	TestStorageRead(t, store)
 }
 
 func TestBucketStorage_Stat(t *testing.T) {
@@ -419,71 +90,5 @@ func TestBucketStorage_Stat(t *testing.T) {
 		return
 	}
 
-	testName := "stat"
-
-	fileNames := []string{
-		"/file0.json",
-	}
-
-	err = createFiles(fileNames, testName, store)
-	if err != nil {
-		t.Errorf("failed to create files: %s", err)
-	}
-
-	defer func() {
-		err = cleanupFiles(fileNames, testName, store)
-		if err != nil {
-			t.Errorf("failed to clean up files: %s", err)
-		}
-	}()
-
-	testCases := map[string]struct {
-		path      string
-		expected  *StorageInfo
-		expectErr bool
-	}{
-		"base dir": {
-			path: path.Join(testpath, testName, "file0.json"),
-			expected: &StorageInfo{
-				Name: "file0.json",
-				Size: 45,
-			},
-			expectErr: false,
-		},
-		"file does not exist": {
-			path:      path.Join(testpath, testName, "file1.json"),
-			expected:  nil,
-			expectErr: true,
-		},
-	}
-
-	for name, tc := range testCases {
-		t.Run(name, func(t *testing.T) {
-			status, err := store.Stat(tc.path)
-			if tc.expectErr && err != nil {
-				return
-			}
-			if tc.expectErr == (err == nil) {
-				if tc.expectErr {
-					t.Errorf("expected error was not thrown")
-					return
-				}
-				t.Errorf("unexpected error: %s", err.Error())
-				return
-			}
-
-			if status.Name != tc.expected.Name {
-				t.Errorf("status name did name match expected, actual: %s, expected: %s", status.Name, tc.expected.Name)
-			}
-
-			if status.Size != tc.expected.Size {
-				t.Errorf("status name did size match expected, actual: %d, expected: %d", status.Size, tc.expected.Size)
-			}
-
-			if status.ModTime.IsZero() {
-				t.Errorf("status mod time is not set")
-			}
-
-		})
-	}
+	TestStorageStat(t, store)
 }

+ 397 - 0
core/pkg/storage/clusterstorage.go

@@ -0,0 +1,397 @@
+package storage
+
+import (
+	"bytes"
+	"fmt"
+	"io"
+	"net"
+	"net/http"
+	"net/url"
+	"strings"
+	"time"
+
+	"github.com/opencost/opencost/core/pkg/log"
+	"github.com/opencost/opencost/core/pkg/util/httputil"
+	"github.com/opencost/opencost/core/pkg/util/json"
+	"gopkg.in/yaml.v2"
+)
+
+var defaultClusterConfig = ClusterConfig{
+	Host: "localhost",
+	Port: 9006,
+	HTTPConfig: HTTPConfig{
+		IdleConnTimeout:       90 * time.Second,
+		ResponseHeaderTimeout: 2 * time.Minute,
+		TLSHandshakeTimeout:   10 * time.Second,
+		ExpectContinueTimeout: 1 * time.Second,
+		MaxIdleConns:          100,
+		MaxIdleConnsPerHost:   100,
+		MaxConnsPerHost:       0,
+	},
+}
+
+// ClusterStorage is a Storage implementation which connects to a remote file storage over http
+type ClusterStorage struct {
+	client *http.Client
+	host   string
+	port   int
+}
+
+type ClusterConfig struct {
+	Host       string     `yaml:"host"`
+	Port       int        `yaml:"port"`
+	HTTPConfig HTTPConfig `yaml:"http_config"`
+}
+
+// parseConfig unmarshals a buffer into a Config with default HTTPConfig values.
+func parseClusterConfig(conf []byte) (ClusterConfig, error) {
+	config := defaultClusterConfig
+	if err := yaml.Unmarshal(conf, &config); err != nil {
+		return ClusterConfig{}, err
+	}
+
+	return config, nil
+}
+
+func NewClusterStorage(conf []byte) (*ClusterStorage, error) {
+	config, err := parseClusterConfig(conf)
+	if err != nil {
+		return nil, err
+	}
+
+	return NewClusterStorageWith(config)
+}
+
+// NewBucketWithConfig returns a new Bucket using the provided s3 config values.
+func NewClusterStorageWith(config ClusterConfig) (*ClusterStorage, error) {
+	dt, err := config.HTTPConfig.GetHTTPTransport()
+	if err != nil {
+		return nil, fmt.Errorf("error creating transport: %w", err)
+	}
+
+	cs := &ClusterStorage{
+		host:   config.Host,
+		port:   config.Port,
+		client: &http.Client{Transport: dt},
+	}
+
+	// Wait on cluster storage to respond before returning
+	defaultWait := 5 * time.Second
+	retry := 0
+	maxTries := 5
+	for {
+		err := cs.check()
+		if err == nil {
+			break
+		}
+
+		log.Debugf("ClusterStorage: error connecting to cluster storage: %s", err.Error())
+		if retry >= maxTries {
+			return nil, fmt.Errorf("ClusterStorage: failed to connect to cluster storage after %d trys", maxTries)
+		}
+		waitTime := httputil.ExponentialBackoffWaitFor(defaultWait, retry)
+		log.Infof("ClusterStorage: failed to connecting cluster storage. retry in %s", waitTime.String())
+		time.Sleep(waitTime)
+		retry++
+	}
+
+	return cs, nil
+}
+
+func (c *ClusterStorage) makeRequest(method, url string, body io.Reader, fn func(*http.Response) error) error {
+	request, err := http.NewRequest(method, url, body)
+	if err != nil {
+		return fmt.Errorf("failed to build request: %w", err)
+	}
+
+	resp, err := c.client.Do(request)
+	if err != nil {
+		return fmt.Errorf("request failed: %w", err)
+	}
+	if resp.Body != nil {
+		defer resp.Body.Close()
+	}
+
+	if !(resp.StatusCode >= 200 && resp.StatusCode <= 299) {
+		if resp.Body != nil {
+			var errResp Response[any]
+			err = json.NewDecoder(resp.Body).Decode(&errResp)
+			if err == nil {
+				return fmt.Errorf("invalid response %d: %s", resp.StatusCode, errResp.Message)
+			}
+		}
+		return fmt.Errorf("invalid response %d", resp.StatusCode)
+	}
+
+	if fn != nil {
+		err = fn(resp)
+		if err != nil {
+			return fmt.Errorf("failed to handle response: %w", err)
+		}
+	}
+	return nil
+}
+
+func (c *ClusterStorage) getURL(subpath string, args map[string]string) string {
+	pathElems := strings.Split(subpath, "/")
+	u := new(url.URL)
+	u.Scheme = c.scheme()
+	u.Host = net.JoinHostPort(c.host, fmt.Sprintf("%d", c.port))
+	u = u.JoinPath(pathElems...)
+
+	q := make(url.Values)
+	for k, v := range args {
+		q.Set(k, v)
+	}
+
+	rawQuery, _ := url.QueryUnescape(q.Encode())
+	u.RawQuery = rawQuery
+
+	return u.String() // <-- full URL string
+}
+
+func (c *ClusterStorage) check() error {
+	err := c.makeRequest(
+		http.MethodGet,
+		c.getURL("healthz", nil),
+		nil,
+		nil,
+	)
+	if err != nil {
+		return fmt.Errorf("ClusterStorage: failed health check: %w", err)
+	}
+
+	return nil
+}
+
+func (c *ClusterStorage) StorageType() StorageType {
+	return StorageTypeCluster
+}
+
+func (c *ClusterStorage) scheme() string {
+	if c.client.Transport != nil {
+		if transport, ok := c.client.Transport.(*http.Transport); ok {
+			if transport.TLSClientConfig != nil && !transport.TLSClientConfig.InsecureSkipVerify {
+				return "https"
+			}
+		}
+	}
+	return "http"
+}
+
+func (c *ClusterStorage) FullPath(path string) string {
+	var jsonResp Response[string]
+	fn := func(resp *http.Response) error {
+		err := json.NewDecoder(resp.Body).Decode(&jsonResp)
+		if err != nil {
+			return fmt.Errorf("failed to decode json: %w", err)
+		}
+		return nil
+	}
+
+	args := map[string]string{
+		"path": path,
+	}
+
+	err := c.makeRequest(
+		http.MethodGet,
+		c.getURL("clusterStorage/fullPath", args),
+		nil,
+		fn,
+	)
+	if err != nil {
+		log.Errorf("ClusterStorage: FullPath: %s", err.Error())
+	}
+
+	return jsonResp.Data
+}
+
+type Response[T any] struct {
+	Code    int    `json:"code"`
+	Data    T      `json:"data"`
+	Message string `json:"message"`
+}
+
+func (c *ClusterStorage) Stat(path string) (*StorageInfo, error) {
+	var jsonResp Response[*StorageInfo]
+	fn := func(resp *http.Response) error {
+		err := json.NewDecoder(resp.Body).Decode(&jsonResp)
+		if err != nil {
+			return fmt.Errorf("failed to decode json: %w", err)
+		}
+		return nil
+	}
+
+	args := map[string]string{
+		"path": path,
+	}
+
+	err := c.makeRequest(
+		http.MethodGet,
+		c.getURL("clusterStorage/stat", args),
+		nil,
+		fn,
+	)
+	if err != nil {
+		return nil, fmt.Errorf("ClusterStorage: Stat: %w", err)
+	}
+
+	return jsonResp.Data, nil
+}
+
+func (c *ClusterStorage) Read(path string) ([]byte, error) {
+	var jsonResp Response[[]byte]
+	fn := func(resp *http.Response) error {
+		err := json.NewDecoder(resp.Body).Decode(&jsonResp)
+		if err != nil {
+			return fmt.Errorf("failed to decode json: %w", err)
+		}
+		return nil
+	}
+
+	args := map[string]string{
+		"path": path,
+	}
+
+	err := c.makeRequest(
+		http.MethodGet,
+		c.getURL("clusterStorage/read", args),
+		nil,
+		fn,
+	)
+	if err != nil {
+		return nil, fmt.Errorf("ClusterStorage: Read: %w", err)
+	}
+
+	return jsonResp.Data, nil
+}
+
+func (c *ClusterStorage) Write(path string, data []byte) error {
+	fn := func(resp *http.Response) error {
+		return nil
+	}
+
+	args := map[string]string{
+		"path": path,
+	}
+
+	err := c.makeRequest(
+		http.MethodPut,
+		c.getURL("clusterStorage/write", args),
+		bytes.NewReader(data),
+		fn,
+	)
+	if err != nil {
+		return fmt.Errorf("ClusterStorage: Write: %w", err)
+	}
+
+	return nil
+}
+
+func (c *ClusterStorage) Remove(path string) error {
+	fn := func(resp *http.Response) error {
+		return nil
+	}
+
+	args := map[string]string{
+		"path": path,
+	}
+
+	err := c.makeRequest(
+		http.MethodDelete,
+		c.getURL("clusterStorage/remove", args),
+		nil,
+		fn,
+	)
+	if err != nil {
+		return fmt.Errorf("ClusterStorage: Write: %w", err)
+	}
+
+	return nil
+}
+
+func (c *ClusterStorage) Exists(path string) (bool, error) {
+	var jsonResp Response[bool]
+	fn := func(resp *http.Response) error {
+		err := json.NewDecoder(resp.Body).Decode(&jsonResp)
+		if err != nil {
+			return fmt.Errorf("failed to decode json: %w", err)
+		}
+		return nil
+	}
+
+	args := map[string]string{
+		"path": path,
+	}
+
+	err := c.makeRequest(
+		http.MethodGet,
+		c.getURL("clusterStorage/exists", args),
+		nil,
+		fn,
+	)
+	if err != nil {
+		return false, fmt.Errorf("ClusterStorage: Exists: %w", err)
+	}
+
+	return jsonResp.Data, nil
+}
+
+func (c *ClusterStorage) List(path string) ([]*StorageInfo, error) {
+	var jsonResp Response[[]*StorageInfo]
+	fn := func(resp *http.Response) error {
+		err := json.NewDecoder(resp.Body).Decode(&jsonResp)
+		if err != nil {
+			return fmt.Errorf("failed to decode json: %w", err)
+		}
+		return nil
+	}
+
+	args := map[string]string{
+		"path": path,
+	}
+
+	err := c.makeRequest(
+		http.MethodGet,
+		c.getURL("clusterStorage/list", args),
+		nil,
+		fn,
+	)
+	if err != nil {
+		return nil, fmt.Errorf("ClusterStorage: List: %w", err)
+	}
+
+	return jsonResp.Data, nil
+}
+
+func (c *ClusterStorage) ListDirectories(path string) ([]*StorageInfo, error) {
+	var jsonResp Response[[]*StorageInfo]
+	fn := func(resp *http.Response) error {
+		err := json.NewDecoder(resp.Body).Decode(&jsonResp)
+		if err != nil {
+			return fmt.Errorf("failed to decode json: %w", err)
+		}
+		return nil
+	}
+
+	args := map[string]string{
+		"path": path,
+	}
+
+	err := c.makeRequest(
+		http.MethodGet,
+		c.getURL("clusterStorage/listDirectories", args),
+		nil,
+		fn,
+	)
+	if err != nil {
+		return nil, fmt.Errorf("ClusterStorage: List Directories: %w", err)
+	}
+
+	// add '/' to the end of directory names to match other bucket storage types
+	for _, info := range jsonResp.Data {
+		info.Name = strings.TrimSuffix(info.Name, DirDelim) + DirDelim
+	}
+
+	return jsonResp.Data, nil
+}

+ 9 - 1
core/pkg/storage/filestorage.go

@@ -55,6 +55,9 @@ func (fs *FileStorage) List(path string) ([]*StorageInfo, error) {
 	// Read files in the backup path
 	entries, err := os.ReadDir(p)
 	if err != nil {
+		if os.IsNotExist(err) {
+			return []*StorageInfo{}, nil
+		}
 		return nil, err
 	}
 	files := make([]gofs.FileInfo, 0, len(entries))
@@ -71,10 +74,12 @@ func (fs *FileStorage) List(path string) ([]*StorageInfo, error) {
 
 func (fs *FileStorage) ListDirectories(path string) ([]*StorageInfo, error) {
 	p := gopath.Join(fs.baseDir, path)
-
 	// Read files in the backup path
 	entries, err := os.ReadDir(p)
 	if err != nil {
+		if os.IsNotExist(err) {
+			return []*StorageInfo{}, nil
+		}
 		return nil, err
 	}
 	files := make([]gofs.FileInfo, 0, len(entries))
@@ -167,6 +172,9 @@ func (fs *FileStorage) prepare(path string) (string, error) {
 func FilesToStorageInfo(fileInfo []gofs.FileInfo) []*StorageInfo {
 	var stats []*StorageInfo
 	for _, info := range fileInfo {
+		if info.IsDir() {
+			continue
+		}
 		stats = append(stats, FileToStorageInfo(info))
 	}
 	return stats

+ 150 - 0
core/pkg/storage/http.go

@@ -0,0 +1,150 @@
+package storage
+
+import (
+	"crypto/tls"
+	"crypto/x509"
+	"fmt"
+	"net"
+	"net/http"
+	"os"
+	"time"
+)
+
+// HTTPConfig configures HTTP client settings that can be used across different storage implementations.
+type HTTPConfig struct {
+	IdleConnTimeout       time.Duration `yaml:"idle_conn_timeout"`
+	ResponseHeaderTimeout time.Duration `yaml:"response_header_timeout"`
+	InsecureSkipVerify    bool          `yaml:"insecure_skip_verify"`
+
+	TLSHandshakeTimeout   time.Duration `yaml:"tls_handshake_timeout"`
+	ExpectContinueTimeout time.Duration `yaml:"expect_continue_timeout"`
+	MaxIdleConns          int           `yaml:"max_idle_conns"`
+	MaxIdleConnsPerHost   int           `yaml:"max_idle_conns_per_host"`
+	MaxConnsPerHost       int           `yaml:"max_conns_per_host"`
+	DisableCompression    bool          `yaml:"disable_compression"`
+
+	// Allow upstream callers to inject a round tripper
+	Transport http.RoundTripper `yaml:"-"`
+
+	TLSConfig TLSConfig `yaml:"tls_config"`
+}
+
+// NewHTTPTransport creates a new http.Transport from the HTTPConfig.
+func (config HTTPConfig) GetHTTPTransport() (http.RoundTripper, error) {
+	// check for injected round tripper
+	if config.Transport != nil {
+		return config.Transport, nil
+	}
+	tlsConfig, err := config.TLSConfig.ToConfig()
+	if err != nil {
+		return nil, fmt.Errorf("error creating TLS config: %w", err)
+	}
+
+	if config.InsecureSkipVerify {
+		tlsConfig.InsecureSkipVerify = true
+	}
+
+	return &http.Transport{
+		Proxy: http.ProxyFromEnvironment,
+		DialContext: (&net.Dialer{
+			Timeout:   30 * time.Second,
+			KeepAlive: 30 * time.Second,
+			DualStack: true,
+		}).DialContext,
+
+		MaxIdleConns:          config.MaxIdleConns,
+		MaxIdleConnsPerHost:   config.MaxIdleConnsPerHost,
+		IdleConnTimeout:       config.IdleConnTimeout,
+		MaxConnsPerHost:       config.MaxConnsPerHost,
+		TLSHandshakeTimeout:   config.TLSHandshakeTimeout,
+		ExpectContinueTimeout: config.ExpectContinueTimeout,
+		// A custom ResponseHeaderTimeout was introduced
+		// to cover cases where the tcp connection works but
+		// the server never answers. Defaults to 2 minutes.
+		ResponseHeaderTimeout: config.ResponseHeaderTimeout,
+		// Set this value so that the underlying transport round-tripper
+		// doesn't try to auto decode the body of objects with
+		// content-encoding set to `gzip`.
+		//
+		// Refer: https://golang.org/src/net/http/transport.go?h=roundTrip#L1843.
+		DisableCompression: config.DisableCompression,
+		// #nosec It's up to the user to decide on TLS configs
+		TLSClientConfig: tlsConfig,
+	}, nil
+}
+
+// NewTLSConfig creates a new tls.Config from the given TLSConfig.
+func (cfg TLSConfig) ToConfig() (*tls.Config, error) {
+	tlsConfig := &tls.Config{InsecureSkipVerify: cfg.InsecureSkipVerify}
+
+	// If a CA cert is provided then let's read it in.
+	if len(cfg.CAFile) > 0 {
+		b, err := readCAFile(cfg.CAFile)
+		if err != nil {
+			return nil, err
+		}
+		if !updateRootCA(tlsConfig, b) {
+			return nil, fmt.Errorf("unable to use specified CA cert %s", cfg.CAFile)
+		}
+	}
+
+	if len(cfg.ServerName) > 0 {
+		tlsConfig.ServerName = cfg.ServerName
+	}
+	// If a client cert & key is provided then configure TLS config accordingly.
+	if len(cfg.CertFile) > 0 && len(cfg.KeyFile) == 0 {
+		return nil, fmt.Errorf("client cert file %q specified without client key file", cfg.CertFile)
+	} else if len(cfg.KeyFile) > 0 && len(cfg.CertFile) == 0 {
+		return nil, fmt.Errorf("client key file %q specified without client cert file", cfg.KeyFile)
+	} else if len(cfg.CertFile) > 0 && len(cfg.KeyFile) > 0 {
+		// Verify that client cert and key are valid.
+		if _, err := cfg.getClientCertificate(nil); err != nil {
+			return nil, err
+		}
+		tlsConfig.GetClientCertificate = cfg.getClientCertificate
+	}
+
+	return tlsConfig, nil
+}
+
+// readCAFile reads the CA cert file from disk.
+func readCAFile(f string) ([]byte, error) {
+	data, err := os.ReadFile(f)
+	if err != nil {
+		return nil, fmt.Errorf("unable to load specified CA cert %s: %s", f, err)
+	}
+	return data, nil
+}
+
+// updateRootCA parses the given byte slice as a series of PEM encoded certificates and updates tls.Config.RootCAs.
+func updateRootCA(cfg *tls.Config, b []byte) bool {
+	caCertPool := x509.NewCertPool()
+	if !caCertPool.AppendCertsFromPEM(b) {
+		return false
+	}
+	cfg.RootCAs = caCertPool
+	return true
+}
+
+// getClientCertificate reads the pair of client cert and key from disk and returns a tls.Certificate.
+func (c TLSConfig) getClientCertificate(*tls.CertificateRequestInfo) (*tls.Certificate, error) {
+	cert, err := tls.LoadX509KeyPair(c.CertFile, c.KeyFile)
+	if err != nil {
+		return nil, fmt.Errorf("unable to use specified client cert (%s) & key (%s): %s", c.CertFile, c.KeyFile, err)
+	}
+	return &cert, nil
+}
+
+// TLSConfig configures the options for TLS connections.
+type TLSConfig struct {
+	// The CA cert to use for the targets.
+	CAFile string `yaml:"ca_file"`
+	// The client cert file for the targets.
+	CertFile string `yaml:"cert_file"`
+	// The client key file for the targets.
+	KeyFile string `yaml:"key_file"`
+	// Used to verify the hostname for the targets.
+	ServerName string `yaml:"server_name"`
+	// Disable target certificate validation.
+	InsecureSkipVerify bool `yaml:"insecure_skip_verify"`
+}

+ 559 - 0
core/pkg/storage/http_test.go

@@ -0,0 +1,559 @@
+package storage
+
+import (
+	"crypto/tls"
+	"net/http"
+	"os"
+	"path"
+	"testing"
+	"time"
+
+	assert2 "github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/require"
+)
+
+const (
+	caFileName          = "testCA.pem"
+	keyFileName         = "testkey.pem"
+	invalidFileName     = "invalid.pem"
+	nonExistentFileName = "no.exist"
+	// valid CA File for test purposes only
+	caContent = `-----BEGIN CERTIFICATE-----
+MIIF2TCCA8GgAwIBAgIUIY1Kop8xSQEwlz4EeykhGEviRLIwDQYJKoZIhvcNAQEL
+BQAwfDELMAkGA1UEBhMCVVMxEzARBgNVBAgMCkNhbGlmb3JuaWExDTALBgNVBAcM
+BHRlc3QxDTALBgNVBAoMBHRlc3QxDTALBgNVBAsMBHRlc3QxDTALBgNVBAMMBHRl
+c3QxHDAaBgkqhkiG9w0BCQEWDXRlc3RAdGVzdC5jb20wHhcNMjUwNjI0MjEwMTM0
+WhcNMjUwNzI0MjEwMTM0WjB8MQswCQYDVQQGEwJVUzETMBEGA1UECAwKQ2FsaWZv
+cm5pYTENMAsGA1UEBwwEdGVzdDENMAsGA1UECgwEdGVzdDENMAsGA1UECwwEdGVz
+dDENMAsGA1UEAwwEdGVzdDEcMBoGCSqGSIb3DQEJARYNdGVzdEB0ZXN0LmNvbTCC
+AiIwDQYJKoZIhvcNAQEBBQADggIPADCCAgoCggIBALYBm4UDowPTNvBxanFKdJ5g
++ZIKkzvIqlAVxKWPWopdlQinoRl6jyofDJ1yhuLiqz4CxDczTv+A1TjxH0RaSdj2
+qebOqqhHl+EahV/stc16vOz4mywkrV+C5i5Vk2y1SXqxyzZQtthhvPquHD2C/Z8M
+PVVzyN4+gGog0srdXffPhEI774uenkkcBZNh1ycvJJPv6nQ3Ib0Gjk/J7nnV5AvI
+glfloy2sENZagtx2EuPxuQzeuJoR62hrBLLG/gR50Mqr1RRxn3BV61Z2q+8vmhK0
+qyRF6RiFqrtJy67NGyhlBmlCttI5rZX8lBADCpaLRWDRlZlGtA08Mh9FbiawHRwd
+pcN4AVpMfsqRxZJ18LLzZe9XzWrVpaWoF5JFNB8rcF5+eH7ry671AkK8nV0BrbdR
+H+zJnbqi1ewQBpL49dYsUheqqZw9w/bq7fgvefxEL2urAfbEwnGfyygReYjiNw7D
+z9uMudBoYNQyCTe/lYH0q6xP1Ycso0WfjKL50mcvMOQNaux0Nd/oH1B0WFe4OnFE
+QqAvs6g1hxd4W8Q3mjbiTNVmpFU9O5W3yCvNgJt/5UKi/zfXClTsMicAPznvQTlB
++O7GxL4B9mgSNL+8qLwx8NmfJlskk4HrJZZkdk1hTIr9Lj/uu/ooR1tbRDkbeLuY
+N6wqN7+jcBWoDZcHh+gDAgMBAAGjUzBRMB0GA1UdDgQWBBQsrXi8RmTF705Ct3RR
+Fng02lj7+TAfBgNVHSMEGDAWgBQsrXi8RmTF705Ct3RRFng02lj7+TAPBgNVHRMB
+Af8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4ICAQA7MLXyWIRghrsJeOtxfi2xmr2F
+di417pi5cGrq98liRlISF/bqXLqLWcTSDpCLdHzLBbUA5CVGU//1g88HuHUC1mbp
+ENxdKqRzlK5wkxDifGiLFqX87SOb+dSqkhn85fs6kBkHRBixZwoVcs/WQWQNSbPw
+lFx6sVa5oKOPkJbre7AJf9BTWtCUftps0rwQfrlZw9FYmBSGQ1PkhJOdOkbWKhJ6
+F55lWdfpcxZRPuemQFWzErQRLMXwzOm9MdmrISkQbRttTjZCvoo0njphckPjdrZJ
+kqN2OSFUGmoia/0LfIOaC6LdWri2tEemS69JDJxIXxpxv5GE1PAu6zdQBmV5/rBn
+e2EwLSLUxleNDGp4YhyXOXSS3Tm2zCxVmLUKLAqq6IqvIO155J0uTBFKpRGs+3DL
+P+c0XFHgp3+tVWnkYGszh6hrKMgYWCneEDAdCXrv2GQjY7ObbRo3f4dR0Iswz8pR
+KdrUSftz1CNVmQpjq06nUa9pdZpqwAxuyvKKzBrefprHqUS2WBsiGdmhjpNIagGl
+jF3fZ24qJxDOv6vAvF+9jHxfTq+WUrR+tS6BpgvrvVVJkaWsj6s8wlRFny8zrWln
+gy8s1O9SI6+368+p37nPPgIfBVjOJpGVLrG3QL/e1kg91mUdhXkfan81NVXdQif5
+JXNAW1pC0i+sxoHYfg==
+-----END CERTIFICATE-----
+`
+	// valid key file, only used for testing
+	keyContent = `-----BEGIN PRIVATE KEY-----
+MIIJQgIBADANBgkqhkiG9w0BAQEFAASCCSwwggkoAgEAAoICAQC2AZuFA6MD0zbw
+cWpxSnSeYPmSCpM7yKpQFcSlj1qKXZUIp6EZeo8qHwydcobi4qs+AsQ3M07/gNU4
+8R9EWknY9qnmzqqoR5fhGoVf7LXNerzs+JssJK1fguYuVZNstUl6scs2ULbYYbz6
+rhw9gv2fDD1Vc8jePoBqINLK3V33z4RCO++Lnp5JHAWTYdcnLyST7+p0NyG9Bo5P
+ye551eQLyIJX5aMtrBDWWoLcdhLj8bkM3riaEetoawSyxv4EedDKq9UUcZ9wVetW
+dqvvL5oStKskRekYhaq7ScuuzRsoZQZpQrbSOa2V/JQQAwqWi0Vg0ZWZRrQNPDIf
+RW4msB0cHaXDeAFaTH7KkcWSdfCy82XvV81q1aWlqBeSRTQfK3Befnh+68uu9QJC
+vJ1dAa23UR/syZ26otXsEAaS+PXWLFIXqqmcPcP26u34L3n8RC9rqwH2xMJxn8so
+EXmI4jcOw8/bjLnQaGDUMgk3v5WB9KusT9WHLKNFn4yi+dJnLzDkDWrsdDXf6B9Q
+dFhXuDpxREKgL7OoNYcXeFvEN5o24kzVZqRVPTuVt8grzYCbf+VCov831wpU7DIn
+AD8570E5QfjuxsS+AfZoEjS/vKi8MfDZnyZbJJOB6yWWZHZNYUyK/S4/7rv6KEdb
+W0Q5G3i7mDesKje/o3AVqA2XB4foAwIDAQABAoICAANcxbTyPadTke5T5LsNVjaa
+S13DqKZnoG1Sdm3DgiZ/ChgBTh5nP26BT6xq1Dq0rOiJGb+Nt9cH4Ju+GBf7G9Ty
+nV0OShefPaPLcBnholr8+lvdlCpuaBkD8xACEnM6izjFYuRKdIIIDdBxW+68DJOR
+S5XYWuIaVm1jw9ihDekG9WhZcRy8vecxEBGS5eOE9pKhFdXQdqkqlrS7DJ+J2wrf
+FeN/R9Ko6diVhgPogbaRRAsMn+TWGi35uX5LwRcRz7cNvK55rDqU2rNs0n2HUnew
+RDyaLk0YaGfr5MGcBRdrwIDE61m4XIfd9BV/YFpdok5rH7qXpXXlIlRyw9m4UkN0
+CzuBMQbUj5eLXurKXZm/+g9v1lTx3tpgyoQXrNljyhQvfNi8DdeijlQj7zOeTXMW
+fe+1XZ/R+KahBtgytq9/caKdEbo/CokY5DZFz+RGUiV+jjHGBAj5l4K8gQdLVm3L
+v9m69fMvBFXX3WGz1jPfVsvNGOPPothd9fDTQAdjSBWYP8WTBMn62ivojKYIXX6O
+ggEQmQ/9HENS0chKz0mYrFPTJ2bauPRzo3ikVXTG7YwyvmAa+CpxaTNBi0z3XGTs
+B7lkT0/vYZls2ZApMQc53DJnsUD4hn2zr2SPZry3jotiR2Ww/e/t/nosMxTe4FUW
+p/HSOzAHJGY/Lom+5swJAoIBAQD6i/fo1fT5b2pyh/SW4UIgx2GEQK7f3mh4VnnR
+Qgtz1PjMCOULIlOyy88ZZxOA+vIwL85lwdFuTaG27C6hfceWAOYtcbWjqen+QXZ8
+b5uyyNmFov3+sdazfl7O0Q1VE0jbAxNioSa//8YsqOxVR4PVxJ7cPMqAU701ykHK
+IeE5lzm0U74ibvm7sdr6oYSNIfZ+z4dWBE6OFFNPyGhMv9nJA8W6U+nT3DVCqryx
+q58mIcN77tKkyFaSCfP8VWcHJdavZBOe8hlv9j+eUakWqTC9RMr47rDpvdUPT5gm
+tW+D1304+6HwSu7iysm3zPaDqSTGulTtCIimKDFdTEThwZZ1AoIBAQC597xUkoEj
+he4QXs6j/Ds/gocqnTdmRBQH1AXnfWrvnE4CK8ej+4hA/ng70KZcNAFm65DJb5lk
+z4vdVmh3cxQgto+EOPY2h81NjDEXscr2hZt7+RX28s7RwL36+Pxs521ut7D5UiFe
+gRi/hkbJr2NUSMP6EZMMegvQ65XsAKbdVvJEOKPluUS4w9H2fPplHd3heyM9HtNI
+BUY7/eb6PHUZRfexRTYds9ioGlgTFfIMuuZJ8YdK3mM6pJybbYdoqHSuU7aRoneD
+bMus82GfDRZh+cw5NrFsWv2ieQiaZpy8MQKY6Zdilq7mXUhSh0llcdt0crXF2Dqn
+tiPXZUKejGWXAoIBAC3lF9uB3ecXPrOOLgK5bqicfUOBqcb+cbqhdJ0dcQWd3Jlb
+g8FfX1+gL+aiWBNHZLfo+fDv6RJAjD/60avpY3cZ4RAwBSrexCs8CJ1QwH+mhRoS
+ul4+a2rj2jAeYUfVSYI89P8bMAL5sm6Z3vjcKc0twD/trtaFAGLrEtQZEq2/AuYC
+dRDPrVVxhgBlN+e2cfXWxB7AmTczh/NUba6pchZ9Z2nzVyDk9Kiqp/gPzQ5qHuoD
+3Hgs7pa/1f7CEiZgCwyD04hJJtm4jPzOTqAFDBWPlXK2HpgimvW8Cc4FbFEFVz3p
+8kcXIt1OclcF555EjKUOmuH0rzton2pMv01vbcUCggEAfGuSFic6vVCS2WME79QG
+s9QZqNoswYAUwrQJCzru+8bgrjUqSb01CP735FURqKims3wxj4PZ5gex9PElzZ0x
+vz1FQdp2aD9tjU+ZXNf4Cf2T7FrXZjRHSTCiKrLA9//SSHwfrH9VkgvfSeyFmdR9
+KVvRupJdhsB0/V9RG+fHvFi6mAgpJ75PiyqAZGBziolz9LLU/cSM6SeWOPcDvTIL
+yk/0iybaMP8tmjKd8I8DNZ8qChjNQrsNOqP9n0Olj9D819FsWX2QZl642kqvaqFv
+8zcUesbr56ns/fHqXpr+jC5iJXpLbYuREtEgXQ7kfTmy8PL6SJcFj0WeLzMxYjBe
+mwKCAQEA+kaxUOF2aj3ML4K0c4DfQCSCYEv29qsd0u9gR4QLoaSSnetWpULBdAJ7
+lsFFhuCHLjrkkazXUv/l4a/u8BmSF37wIzb/AWPE5CluC0D7tHENaNj/XTjvvTxV
+nVE2fojWIrxRk3qxfy30AVrULCKp8RHn3FilpUUjJGNUBeFV8xsVS3YF75IV7gQB
+op0zquisxioZ+5rCR+vn2BErzv/ILJpW2zk9FEJxJ72rcCucpfiTeaBADv7twFxT
+rc129p/U6/PKzEE/2voTY1GRb+8sroL1LIEA+K7fvd87C6HEsabGG7fss1Fthi2W
+Hef5W/FKNtov4fx8QuMhwA4lWIuliw==
+-----END PRIVATE KEY-----`
+	invalidContent = "invalid"
+)
+
+func createKeyFiles(t *testing.T, tmpDir string) func() {
+	return createTempFiles(
+		t,
+		tmpDir,
+		map[string]string{
+			caFileName:  caContent,
+			keyFileName: keyContent,
+		},
+	)
+}
+
+// createTempFiles takes a map of file name and content pairs, creates the files in a tmp dir and returns a cleanup
+// function
+func createTempFiles(t *testing.T, tmpDir string, files map[string]string) func() {
+	var filesToRemove []string
+	for name, content := range files {
+		tmpFile, err := os.Create(path.Join(tmpDir, name))
+		require.NoError(t, err)
+		_, err = tmpFile.WriteString(content)
+		require.NoError(t, err)
+		filesToRemove = append(filesToRemove, tmpFile.Name())
+		tmpFile.Close()
+	}
+	return func() {
+		for _, name := range filesToRemove {
+			os.Remove(name)
+		}
+	}
+}
+
+func TestHTTPConfig_GetHTTPTransport(t *testing.T) {
+	testCases := map[string]struct {
+		config        HTTPConfig
+		wantError     bool
+		validateFunc  func(t *testing.T, transport http.RoundTripper)
+		errorContains string
+	}{
+		"default configuration": {
+			config: HTTPConfig{
+				IdleConnTimeout:       90 * time.Second,
+				ResponseHeaderTimeout: 2 * time.Minute,
+				TLSHandshakeTimeout:   10 * time.Second,
+				ExpectContinueTimeout: 1 * time.Second,
+				MaxIdleConns:          100,
+				MaxIdleConnsPerHost:   100,
+				MaxConnsPerHost:       0,
+				DisableCompression:    false,
+				InsecureSkipVerify:    false,
+			},
+			wantError: false,
+			validateFunc: func(t *testing.T, transport http.RoundTripper) {
+				httpTransport, ok := transport.(*http.Transport)
+				require.True(t, ok, "Expected *http.Transport")
+				assert2.Equal(t, 90*time.Second, httpTransport.IdleConnTimeout)
+				assert2.Equal(t, 2*time.Minute, httpTransport.ResponseHeaderTimeout)
+				assert2.Equal(t, 10*time.Second, httpTransport.TLSHandshakeTimeout)
+				assert2.Equal(t, 1*time.Second, httpTransport.ExpectContinueTimeout)
+				assert2.Equal(t, 100, httpTransport.MaxIdleConns)
+				assert2.Equal(t, 100, httpTransport.MaxIdleConnsPerHost)
+				assert2.Equal(t, 0, httpTransport.MaxConnsPerHost)
+				assert2.False(t, httpTransport.DisableCompression)
+				assert2.False(t, httpTransport.TLSClientConfig.InsecureSkipVerify)
+			},
+		},
+		"with insecure skip verify": {
+			config: HTTPConfig{
+				InsecureSkipVerify: true,
+			},
+			wantError: false,
+			validateFunc: func(t *testing.T, transport http.RoundTripper) {
+				httpTransport, ok := transport.(*http.Transport)
+				require.True(t, ok)
+				assert2.True(t, httpTransport.TLSClientConfig.InsecureSkipVerify)
+			},
+		},
+		"with injected transport": {
+			config: HTTPConfig{
+				Transport: &http.Transport{},
+			},
+			wantError: false,
+			validateFunc: func(t *testing.T, transport http.RoundTripper) {
+				_, ok := transport.(*http.Transport)
+				require.True(t, ok)
+			},
+		},
+		"with server name": {
+			config: HTTPConfig{
+				TLSConfig: TLSConfig{
+					ServerName: "example.com",
+				},
+			},
+			wantError: false,
+			validateFunc: func(t *testing.T, transport http.RoundTripper) {
+				httpTransport, ok := transport.(*http.Transport)
+				require.True(t, ok)
+				assert2.Equal(t, "example.com", httpTransport.TLSClientConfig.ServerName)
+			},
+		},
+		"with disable compression": {
+			config: HTTPConfig{
+				DisableCompression: true,
+			},
+			wantError: false,
+			validateFunc: func(t *testing.T, transport http.RoundTripper) {
+				httpTransport, ok := transport.(*http.Transport)
+				require.True(t, ok)
+				assert2.True(t, httpTransport.DisableCompression)
+			},
+		},
+		"with invalid TLS config": {
+			config: HTTPConfig{
+				TLSConfig: TLSConfig{
+					CertFile: "cert.pem", // cert without key should cause error
+				},
+			},
+			wantError:     true,
+			errorContains: "client cert file",
+		},
+	}
+
+	for name, tc := range testCases {
+		t.Run(name, func(t *testing.T) {
+			transport, err := tc.config.GetHTTPTransport()
+
+			if tc.wantError {
+				assert2.Error(t, err)
+				if tc.errorContains != "" {
+					assert2.Contains(t, err.Error(), tc.errorContains)
+				}
+				return
+			}
+
+			require.NoError(t, err)
+			require.NotNil(t, transport)
+			if tc.validateFunc != nil {
+				tc.validateFunc(t, transport)
+			}
+		})
+	}
+}
+
+func TestTLSConfig_ToConfig(t *testing.T) {
+	tmpDir := os.TempDir()
+	cleanupFn := createKeyFiles(t, tmpDir)
+	t.Cleanup(cleanupFn)
+
+	testCases := map[string]struct {
+		config       *TLSConfig
+		want         *tls.Config
+		wantError    bool
+		validateFunc func(t *testing.T, tlsConfig *tls.Config)
+	}{
+		"default configuration": {
+			config:    &TLSConfig{},
+			want:      &tls.Config{},
+			wantError: false,
+		},
+		"with insecure skip verify": {
+			config: &TLSConfig{
+				InsecureSkipVerify: true,
+			},
+			want: &tls.Config{
+				InsecureSkipVerify: true,
+			},
+			wantError: false,
+		},
+		"missing CA file": {
+			config: &TLSConfig{
+				CAFile: path.Join(tmpDir, nonExistentFileName),
+			},
+			wantError: true,
+		},
+		"invalid CA file": {
+			config: &TLSConfig{
+				CAFile: path.Join(tmpDir, invalidFileName),
+			},
+			wantError: true,
+		},
+		"with server name": {
+			config: &TLSConfig{
+				ServerName: "example.com",
+			},
+			want: &tls.Config{
+				ServerName: "example.com",
+			},
+			wantError: false,
+		},
+		"cert file without key file": {
+			config: &TLSConfig{
+				CertFile: path.Join(tmpDir, caFileName),
+			},
+			wantError: true,
+		},
+		"key file without cert file": {
+			config: &TLSConfig{
+				KeyFile: path.Join(tmpDir, keyFileName),
+			},
+			wantError: true,
+		},
+
+		"invalid cert file": {
+			config: &TLSConfig{
+				CertFile: path.Join(tmpDir, invalidFileName),
+				KeyFile:  path.Join(tmpDir, keyFileName),
+			},
+			wantError: true,
+		},
+		"invalid key file": {
+			config: &TLSConfig{
+				CertFile: path.Join(tmpDir, caFileName),
+				KeyFile:  path.Join(tmpDir, invalidFileName),
+			},
+			wantError: true,
+		},
+		"valid Cert and Key file": {
+			config: &TLSConfig{
+				CertFile: path.Join(tmpDir, caFileName),
+				KeyFile:  path.Join(tmpDir, keyFileName),
+			},
+			want: &tls.Config{
+				GetClientCertificate: TLSConfig{
+					CertFile: path.Join(tmpDir, caFileName),
+					KeyFile:  path.Join(tmpDir, keyFileName),
+				}.getClientCertificate,
+			},
+			wantError: false,
+		},
+	}
+
+	for name, tc := range testCases {
+		t.Run(name, func(t *testing.T) {
+			tlsConfig, err := tc.config.ToConfig()
+
+			if tc.wantError {
+				assert2.Error(t, err)
+				return
+			}
+
+			require.NoError(t, err)
+			tlsConfigEqual(t, tlsConfig, tc.want)
+		})
+	}
+}
+
+func tlsConfigEqual(t *testing.T, got, want *tls.Config) {
+	if want == nil {
+		assert2.Nil(t, got)
+		return
+	} else {
+		assert2.NotNil(t, got)
+	}
+
+	assert2.Equal(t, got.InsecureSkipVerify, want.InsecureSkipVerify)
+	assert2.Equal(t, got.ServerName, want.ServerName)
+	assert2.Equal(t, got.GetClientCertificate == nil, want.GetClientCertificate == nil)
+	if want.GetClientCertificate != nil {
+		gotCert, gotError := got.GetClientCertificate(nil)
+		assert2.NoError(t, gotError)
+		wantCert, wantError := want.GetClientCertificate(nil)
+		assert2.NoError(t, wantError)
+		assert2.Equal(t, gotCert, wantCert)
+	}
+}
+
+func TestReadCAFile(t *testing.T) {
+	tmpDir := os.TempDir()
+	cleanupFn := createKeyFiles(t, tmpDir)
+	t.Cleanup(cleanupFn)
+
+	testCases := map[string]struct {
+		fileName  string
+		content   string
+		wantError bool
+	}{
+		"nonexistent file": {
+			fileName:  nonExistentFileName,
+			content:   "",
+			wantError: true,
+		},
+		"invalid file": {
+			fileName:  invalidFileName,
+			content:   invalidContent,
+			wantError: true,
+		},
+		"valid file": {
+			fileName:  caFileName,
+			content:   caContent,
+			wantError: false,
+		},
+	}
+
+	for name, tc := range testCases {
+		t.Run(name, func(t *testing.T) {
+			data, err := readCAFile(path.Join(tmpDir, tc.fileName))
+
+			if tc.wantError {
+				assert2.Error(t, err)
+				return
+			}
+
+			require.NoError(t, err)
+			assert2.Equal(t, tc.content, string(data))
+		})
+	}
+}
+
+func TestUpdateRootCA(t *testing.T) {
+	testCases := map[string]struct {
+		input      []byte
+		expectedOk bool
+	}{
+		"valid PEM certificate": {
+			input:      []byte(caContent),
+			expectedOk: true, // This is a valid certificate
+		},
+		"invalid PEM data": {
+			input:      []byte(invalidContent),
+			expectedOk: false,
+		},
+		"empty data": {
+			input:      []byte(""),
+			expectedOk: false,
+		},
+	}
+
+	for name, tc := range testCases {
+		t.Run(name, func(t *testing.T) {
+			tlsConfig := &tls.Config{}
+			result := updateRootCA(tlsConfig, tc.input)
+
+			assert2.Equal(t, tc.expectedOk, result)
+			if result {
+				assert2.NotNil(t, tlsConfig.RootCAs)
+			} else {
+				assert2.Nil(t, tlsConfig.RootCAs)
+			}
+		})
+	}
+}
+
+func TestTLSConfig_getClientCertificate(t *testing.T) {
+	tmpDir := os.TempDir()
+	cleanupFn := createKeyFiles(t, tmpDir)
+	t.Cleanup(cleanupFn)
+
+	testCases := map[string]struct {
+		config    *TLSConfig
+		wantError bool
+	}{
+		"empty config": {
+			config:    &TLSConfig{},
+			wantError: true,
+		},
+		"nonexistent cert files": {
+			config: &TLSConfig{
+				CertFile: path.Join(tmpDir, nonExistentFileName),
+				KeyFile:  path.Join(tmpDir, nonExistentFileName),
+			},
+			wantError: true,
+		},
+		"missing cert file": {
+			config: &TLSConfig{
+				CertFile: path.Join(tmpDir, nonExistentFileName),
+				KeyFile:  path.Join(tmpDir, keyFileName),
+			},
+			wantError: true,
+		},
+		"missing key file": {
+			config: &TLSConfig{
+				CertFile: path.Join(tmpDir, caFileName),
+				KeyFile:  path.Join(tmpDir, nonExistentFileName),
+			},
+			wantError: true,
+		},
+		"invalid cert file": {
+			config: &TLSConfig{
+				CertFile: path.Join(tmpDir, invalidFileName),
+				KeyFile:  path.Join(tmpDir, keyFileName),
+			},
+			wantError: true,
+		},
+		"invalid key file": {
+			config: &TLSConfig{
+				CertFile: path.Join(tmpDir, caFileName),
+				KeyFile:  path.Join(tmpDir, invalidFileName),
+			},
+			wantError: true,
+		},
+		"valid": {
+			config: &TLSConfig{
+				CertFile: path.Join(tmpDir, caFileName),
+				KeyFile:  path.Join(tmpDir, keyFileName),
+			},
+			wantError: false,
+		},
+	}
+
+	for name, tc := range testCases {
+		t.Run(name, func(t *testing.T) {
+			_, err := tc.config.getClientCertificate(nil)
+
+			if tc.wantError {
+				assert2.Error(t, err)
+				return
+			}
+
+			require.NoError(t, err)
+		})
+	}
+}
+
+func TestHTTPConfigIntegration(t *testing.T) {
+	config := HTTPConfig{
+		IdleConnTimeout:       30 * time.Second,
+		ResponseHeaderTimeout: 1 * time.Minute,
+		TLSHandshakeTimeout:   5 * time.Second,
+		ExpectContinueTimeout: 500 * time.Millisecond,
+		MaxIdleConns:          50,
+		MaxIdleConnsPerHost:   25,
+		MaxConnsPerHost:       10,
+		DisableCompression:    true,
+		InsecureSkipVerify:    true,
+		TLSConfig: TLSConfig{
+			ServerName:         "test-server.com",
+			InsecureSkipVerify: false, // This should be overridden by HTTPConfig.InsecureSkipVerify
+		},
+	}
+
+	transport, err := config.GetHTTPTransport()
+	require.NoError(t, err)
+
+	httpTransport, ok := transport.(*http.Transport)
+	require.True(t, ok)
+
+	// Verify all settings are applied correctly
+	assert2.Equal(t, 30*time.Second, httpTransport.IdleConnTimeout)
+	assert2.Equal(t, 1*time.Minute, httpTransport.ResponseHeaderTimeout)
+	assert2.Equal(t, 5*time.Second, httpTransport.TLSHandshakeTimeout)
+	assert2.Equal(t, 500*time.Millisecond, httpTransport.ExpectContinueTimeout)
+	assert2.Equal(t, 50, httpTransport.MaxIdleConns)
+	assert2.Equal(t, 25, httpTransport.MaxIdleConnsPerHost)
+	assert2.Equal(t, 10, httpTransport.MaxConnsPerHost)
+	assert2.True(t, httpTransport.DisableCompression)
+	assert2.True(t, httpTransport.TLSClientConfig.InsecureSkipVerify) // Should be overridden
+	assert2.Equal(t, "test-server.com", httpTransport.TLSClientConfig.ServerName)
+	assert2.NotNil(t, httpTransport.Proxy)
+	assert2.NotNil(t, httpTransport.DialContext)
+}

+ 10 - 79
core/pkg/storage/s3storage.go

@@ -8,7 +8,6 @@ import (
 	"bytes"
 	"context"
 	"io"
-	"net"
 	"net/http"
 	"os"
 	"strings"
@@ -49,14 +48,15 @@ const (
 
 var defaultS3Config = S3Config{
 	PutUserMetadata: map[string]string{},
-	HTTPConfig: S3HTTPConfig{
-		IdleConnTimeout:       time.Duration(90 * time.Second),
-		ResponseHeaderTimeout: time.Duration(2 * time.Minute),
-		TLSHandshakeTimeout:   time.Duration(10 * time.Second),
-		ExpectContinueTimeout: time.Duration(1 * time.Second),
+	HTTPConfig: HTTPConfig{
+		IdleConnTimeout:       90 * time.Second,
+		ResponseHeaderTimeout: 2 * time.Minute,
+		TLSHandshakeTimeout:   10 * time.Second,
+		ExpectContinueTimeout: 1 * time.Second,
 		MaxIdleConns:          100,
 		MaxIdleConnsPerHost:   100,
 		MaxConnsPerHost:       0,
+		DisableCompression:    true,
 	},
 	PartSize: 1024 * 1024 * 64, // 64MB.
 }
@@ -72,7 +72,7 @@ type S3Config struct {
 	SignatureV2        bool              `yaml:"signature_version2"`
 	SecretKey          string            `yaml:"secret_key"`
 	PutUserMetadata    map[string]string `yaml:"put_user_metadata"`
-	HTTPConfig         S3HTTPConfig      `yaml:"http_config"`
+	HTTPConfig         HTTPConfig        `yaml:"http_config"`
 	TraceConfig        TraceConfig       `yaml:"trace"`
 	ListObjectsVersion string            `yaml:"list_objects_version"`
 	// PartSize used for multipart upload. Only used if uploaded object size is known and larger than configured PartSize.
@@ -95,67 +95,6 @@ type TraceConfig struct {
 	Enable bool `yaml:"enable"`
 }
 
-// HTTPConfig stores the http.Transport configuration for the s3 minio client.
-type S3HTTPConfig struct {
-	IdleConnTimeout       time.Duration `yaml:"idle_conn_timeout"`
-	ResponseHeaderTimeout time.Duration `yaml:"response_header_timeout"`
-	InsecureSkipVerify    bool          `yaml:"insecure_skip_verify"`
-
-	TLSHandshakeTimeout   time.Duration `yaml:"tls_handshake_timeout"`
-	ExpectContinueTimeout time.Duration `yaml:"expect_continue_timeout"`
-	MaxIdleConns          int           `yaml:"max_idle_conns"`
-	MaxIdleConnsPerHost   int           `yaml:"max_idle_conns_per_host"`
-	MaxConnsPerHost       int           `yaml:"max_conns_per_host"`
-
-	// Allow upstream callers to inject a round tripper
-	Transport http.RoundTripper `yaml:"-"`
-
-	TLSConfig TLSConfig `yaml:"tls_config"`
-}
-
-// DefaultTransport - this default transport is based on the Minio
-// DefaultTransport up until the following commit:
-// https://githus3.com/minio/minio-go/commit/008c7aa71fc17e11bf980c209a4f8c4d687fc884
-// The values have since diverged.
-func DefaultS3Transport(config S3Config) (*http.Transport, error) {
-	tlsConfig, err := NewTLSConfig(&config.HTTPConfig.TLSConfig)
-	if err != nil {
-		return nil, err
-	}
-
-	if config.HTTPConfig.InsecureSkipVerify {
-		tlsConfig.InsecureSkipVerify = true
-	}
-
-	return &http.Transport{
-		Proxy: http.ProxyFromEnvironment,
-		DialContext: (&net.Dialer{
-			Timeout:   30 * time.Second,
-			KeepAlive: 30 * time.Second,
-			DualStack: true,
-		}).DialContext,
-
-		MaxIdleConns:          config.HTTPConfig.MaxIdleConns,
-		MaxIdleConnsPerHost:   config.HTTPConfig.MaxIdleConnsPerHost,
-		IdleConnTimeout:       time.Duration(config.HTTPConfig.IdleConnTimeout),
-		MaxConnsPerHost:       config.HTTPConfig.MaxConnsPerHost,
-		TLSHandshakeTimeout:   time.Duration(config.HTTPConfig.TLSHandshakeTimeout),
-		ExpectContinueTimeout: time.Duration(config.HTTPConfig.ExpectContinueTimeout),
-		// A custom ResponseHeaderTimeout was introduced
-		// to cover cases where the tcp connection works but
-		// the server never answers. Defaults to 2 minutes.
-		ResponseHeaderTimeout: time.Duration(config.HTTPConfig.ResponseHeaderTimeout),
-		// Set this value so that the underlying transport round-tripper
-		// doesn't try to auto decode the body of objects with
-		// content-encoding set to `gzip`.
-		//
-		// Refer: https://golang.org/src/net/http/transport.go?h=roundTrip#L1843.
-		DisableCompression: true,
-		// #nosec It's up to the user to decide on TLS configs
-		TLSClientConfig: tlsConfig,
-	}, nil
-}
-
 // S3Storage provides storage via S3
 type S3Storage struct {
 	name            string
@@ -226,17 +165,9 @@ func NewS3StorageWith(config S3Config) (*S3Storage, error) {
 		}
 	}
 
-	// Check if a roundtripper has been set in the config
-	// otherwise build the default transport.
-	var rt http.RoundTripper
-	if config.HTTPConfig.Transport != nil {
-		rt = config.HTTPConfig.Transport
-	} else {
-		var err error
-		rt, err = DefaultS3Transport(config)
-		if err != nil {
-			return nil, err
-		}
+	rt, err := config.HTTPConfig.GetHTTPTransport()
+	if err != nil {
+		return nil, err
 	}
 
 	client, err := minio.New(config.Endpoint, &minio.Options{

+ 1 - 0
core/pkg/storage/storagetypes.go

@@ -20,6 +20,7 @@ type StorageType string
 const (
 	StorageTypeMemory      StorageType = "memory"
 	StorageTypeFile        StorageType = "file"
+	StorageTypeCluster     StorageType = "cluster"
 	StorageTypeBucketS3    StorageType = "bucket|s3"
 	StorageTypeBucketGCS   StorageType = "bucket|gcs"
 	StorageTypeBucketAzure StorageType = "bucket|azure"

+ 420 - 0
core/pkg/storage/test.go

@@ -0,0 +1,420 @@
+package storage
+
+import (
+	"fmt"
+	"path"
+	"testing"
+
+	"github.com/opencost/opencost/core/pkg/util/json"
+)
+
+type testFileContent struct {
+	Field1 int    `json:"field_1"`
+	Field2 string `json:"field_2"`
+}
+
+var tfc = testFileContent{
+	Field1: 101,
+	Field2: "TEST_FILE_CONTENT",
+}
+
+const testpath = "opencost/storage/"
+
+func createFiles(files []string, testName string, store Storage) error {
+	b, err := json.Marshal(tfc)
+	if err != nil {
+		return fmt.Errorf("failed to marshal file content: %w", err)
+	}
+
+	for _, fileName := range files {
+		filePath := path.Join(testpath, testName, fileName)
+		err = store.Write(filePath, b)
+		if err != nil {
+			return fmt.Errorf("failed to write file '%s': %w ", filePath, err)
+		}
+	}
+
+	return nil
+}
+
+func cleanupFiles(files []string, testName string, store Storage) error {
+	for _, fileName := range files {
+		filePath := path.Join(testpath, testName, fileName)
+		err := store.Remove(filePath)
+		if err != nil {
+			return fmt.Errorf("failed to remove file '%s': %w ", filePath, err)
+		}
+	}
+
+	return nil
+}
+
+func TestStorageList(t *testing.T, store Storage) {
+	testName := "list"
+
+	fileNames := []string{
+		"/file0.json",
+		"/file1.json",
+		"/dir0/file2.json",
+		"/dir0/file3.json",
+	}
+
+	err := createFiles(fileNames, testName, store)
+	if err != nil {
+		t.Errorf("failed to create files: %s", err)
+	}
+
+	defer func() {
+		err = cleanupFiles(fileNames, testName, store)
+		if err != nil {
+			t.Errorf("failed to clean up files: %s", err)
+		}
+	}()
+
+	testCases := map[string]struct {
+		path      string
+		expected  []string
+		expectErr bool
+	}{
+		"base dir files": {
+			path: path.Join(testpath, testName),
+			expected: []string{
+				"file0.json",
+				"file1.json",
+			},
+			expectErr: false,
+		},
+		"single nested dir files": {
+			path: path.Join(testpath, testName, "dir0"),
+			expected: []string{
+				"file2.json",
+				"file3.json",
+			},
+			expectErr: false,
+		},
+		"nonexistent dir files": {
+			path:      path.Join(testpath, testName, "dir1"),
+			expected:  []string{},
+			expectErr: false,
+		},
+	}
+
+	for name, tc := range testCases {
+		t.Run(name, func(t *testing.T) {
+			fileList, err := store.List(tc.path)
+			if tc.expectErr == (err == nil) {
+				if tc.expectErr {
+					t.Errorf("expected error was not thrown")
+					return
+				}
+				t.Errorf("unexpected error: %s", err.Error())
+				return
+			}
+
+			if len(fileList) != len(tc.expected) {
+				t.Errorf("file list length does not match expected length, actual: %d, expected: %d", len(fileList), len(tc.expected))
+			}
+
+			expectedSet := map[string]struct{}{}
+			for _, expName := range tc.expected {
+				expectedSet[expName] = struct{}{}
+			}
+
+			for _, file := range fileList {
+				_, ok := expectedSet[file.Name]
+				if !ok {
+					t.Errorf("unexpect file in list %s", file.Name)
+				}
+
+				if file.Size == 0 {
+					t.Errorf("file size is not set")
+				}
+
+				if file.ModTime.IsZero() {
+					t.Errorf("file mod time is not set")
+				}
+			}
+		})
+	}
+}
+
+func TestStorageListDirectories(t *testing.T, store Storage) {
+	testName := "list_directories"
+
+	fileNames := []string{
+		"/file0.json",
+		"/dir0/file2.json",
+		"/dir0/file3.json",
+		"/dir0/dir1/file4.json",
+		"/dir0/dir2/file5.json",
+	}
+
+	err := createFiles(fileNames, testName, store)
+	if err != nil {
+		t.Errorf("failed to create files: %s", err)
+	}
+
+	defer func() {
+		err = cleanupFiles(fileNames, testName, store)
+		if err != nil {
+			t.Errorf("failed to clean up files: %s", err)
+		}
+	}()
+
+	testCases := map[string]struct {
+		path      string
+		expected  []string
+		expectErr bool
+	}{
+		"base dir dir": {
+			path: path.Join(testpath, testName),
+			expected: []string{
+				path.Join(testpath, testName, "dir0") + "/",
+			},
+			expectErr: false,
+		},
+		"single nested dir files": {
+			path: path.Join(testpath, testName, "dir0"),
+			expected: []string{
+				path.Join(testpath, testName, "dir0", "dir1") + "/",
+				path.Join(testpath, testName, "dir0", "dir2") + "/",
+			},
+			expectErr: false,
+		},
+		"dir with no sub dirs": {
+			path:      path.Join(testpath, testName, "dir0/dir1"),
+			expected:  []string{},
+			expectErr: false,
+		},
+		"non-existent dir": {
+			path:      path.Join(testpath, testName, "dir1"),
+			expected:  []string{},
+			expectErr: false,
+		},
+	}
+
+	for name, tc := range testCases {
+		t.Run(name, func(t *testing.T) {
+			dirList, err := store.ListDirectories(tc.path)
+			if tc.expectErr == (err == nil) {
+				if tc.expectErr {
+					t.Errorf("expected error was not thrown")
+					return
+				}
+				t.Errorf("unexpected error: %s", err.Error())
+				return
+			}
+
+			if len(dirList) != len(tc.expected) {
+				t.Errorf("dir list length does not match expected length, actual: %d, expected: %d", len(dirList), len(tc.expected))
+			}
+
+			expectedSet := map[string]struct{}{}
+			for _, expName := range tc.expected {
+				expectedSet[expName] = struct{}{}
+			}
+
+			for _, dir := range dirList {
+				_, ok := expectedSet[dir.Name]
+				if !ok {
+					t.Errorf("unexpect dir in list %s", dir.Name)
+				}
+			}
+		})
+	}
+}
+
+func TestStorageExists(t *testing.T, store Storage) {
+	testName := "exists"
+
+	fileNames := []string{
+		"/file0.json",
+	}
+
+	err := createFiles(fileNames, testName, store)
+	if err != nil {
+		t.Errorf("failed to create files: %s", err)
+	}
+
+	defer func() {
+		err = cleanupFiles(fileNames, testName, store)
+		if err != nil {
+			t.Errorf("failed to clean up files: %s", err)
+		}
+	}()
+
+	testCases := map[string]struct {
+		path      string
+		expected  bool
+		expectErr bool
+	}{
+		"file exists": {
+			path:      path.Join(testpath, testName, "file0.json"),
+			expected:  true,
+			expectErr: false,
+		},
+		"file does not exist": {
+			path:      path.Join(testpath, testName, "file1.json"),
+			expected:  false,
+			expectErr: false,
+		},
+		"dir does not exist": {
+			path:      path.Join(testpath, testName, "dir0/file.json"),
+			expected:  false,
+			expectErr: false,
+		},
+	}
+
+	for name, tc := range testCases {
+		t.Run(name, func(t *testing.T) {
+			exists, err := store.Exists(tc.path)
+			if tc.expectErr == (err == nil) {
+				if tc.expectErr {
+					t.Errorf("expected error was not thrown")
+					return
+				}
+				t.Errorf("unexpected error: %s", err.Error())
+				return
+			}
+
+			if exists != tc.expected {
+				t.Errorf("file exists output did not match expected")
+			}
+		})
+	}
+}
+
+func TestStorageRead(t *testing.T, store Storage) {
+	testName := "read"
+
+	fileNames := []string{
+		"/file0.json",
+	}
+
+	err := createFiles(fileNames, testName, store)
+	if err != nil {
+		t.Errorf("failed to create files: %s", err)
+	}
+
+	defer func() {
+		err = cleanupFiles(fileNames, testName, store)
+		if err != nil {
+			t.Errorf("failed to clean up files: %s", err)
+		}
+	}()
+
+	testCases := map[string]struct {
+		path      string
+		expectErr bool
+	}{
+		"file exists": {
+			path:      path.Join(testpath, testName, "file0.json"),
+			expectErr: false,
+		},
+		"file does not exist": {
+			path:      path.Join(testpath, testName, "file1.json"),
+			expectErr: true,
+		},
+		"dir does not exist": {
+			path:      path.Join(testpath, testName, "dir0/file.json"),
+			expectErr: true,
+		},
+	}
+
+	for name, tc := range testCases {
+		t.Run(name, func(t *testing.T) {
+			b, err := store.Read(tc.path)
+			if tc.expectErr && err != nil {
+				return
+			}
+			if tc.expectErr == (err == nil) {
+				if tc.expectErr {
+					t.Errorf("expected error was not thrown")
+					return
+				}
+				t.Errorf("unexpected error: %s", err.Error())
+				return
+			}
+			var content testFileContent
+			err = json.Unmarshal(b, &content)
+			if err != nil {
+				t.Errorf("could not unmarshal file content")
+				return
+			}
+
+			if content != tfc {
+				t.Errorf("file content did not match writen value")
+			}
+		})
+	}
+}
+
+func TestStorageStat(t *testing.T, store Storage) {
+	testName := "stat"
+
+	fileNames := []string{
+		"/file0.json",
+	}
+
+	err := createFiles(fileNames, testName, store)
+	if err != nil {
+		t.Errorf("failed to create files: %s", err)
+	}
+
+	defer func() {
+		err = cleanupFiles(fileNames, testName, store)
+		if err != nil {
+			t.Errorf("failed to clean up files: %s", err)
+		}
+	}()
+
+	testCases := map[string]struct {
+		path      string
+		expected  *StorageInfo
+		expectErr bool
+	}{
+		"base dir": {
+			path: path.Join(testpath, testName, "file0.json"),
+			expected: &StorageInfo{
+				Name: "file0.json",
+				Size: 45,
+			},
+			expectErr: false,
+		},
+		"file does not exist": {
+			path:      path.Join(testpath, testName, "file1.json"),
+			expected:  nil,
+			expectErr: true,
+		},
+	}
+
+	for name, tc := range testCases {
+		t.Run(name, func(t *testing.T) {
+			status, err := store.Stat(tc.path)
+			if tc.expectErr && err != nil {
+				return
+			}
+			if tc.expectErr == (err == nil) {
+				if tc.expectErr {
+					t.Errorf("expected error was not thrown")
+					return
+				}
+				t.Errorf("unexpected error: %s", err.Error())
+				return
+			}
+
+			if status.Name != tc.expected.Name {
+				t.Errorf("status name did name match expected, actual: %s, expected: %s", status.Name, tc.expected.Name)
+			}
+
+			if status.Size != tc.expected.Size {
+				t.Errorf("status name did size match expected, actual: %d, expected: %d", status.Size, tc.expected.Size)
+			}
+
+			if status.ModTime.IsZero() {
+				t.Errorf("status mod time is not set")
+			}
+
+		})
+	}
+}

+ 0 - 84
core/pkg/storage/tlsconfig.go

@@ -1,84 +0,0 @@
-package storage
-
-import (
-	"crypto/tls"
-	"crypto/x509"
-	"fmt"
-	"os"
-)
-
-// NewTLSConfig creates a new tls.Config from the given TLSConfig.
-func NewTLSConfig(cfg *TLSConfig) (*tls.Config, error) {
-	tlsConfig := &tls.Config{InsecureSkipVerify: cfg.InsecureSkipVerify}
-
-	// If a CA cert is provided then let's read it in.
-	if len(cfg.CAFile) > 0 {
-		b, err := readCAFile(cfg.CAFile)
-		if err != nil {
-			return nil, err
-		}
-		if !updateRootCA(tlsConfig, b) {
-			return nil, fmt.Errorf("unable to use specified CA cert %s", cfg.CAFile)
-		}
-	}
-
-	if len(cfg.ServerName) > 0 {
-		tlsConfig.ServerName = cfg.ServerName
-	}
-	// If a client cert & key is provided then configure TLS config accordingly.
-	if len(cfg.CertFile) > 0 && len(cfg.KeyFile) == 0 {
-		return nil, fmt.Errorf("client cert file %q specified without client key file", cfg.CertFile)
-	} else if len(cfg.KeyFile) > 0 && len(cfg.CertFile) == 0 {
-		return nil, fmt.Errorf("client key file %q specified without client cert file", cfg.KeyFile)
-	} else if len(cfg.CertFile) > 0 && len(cfg.KeyFile) > 0 {
-		// Verify that client cert and key are valid.
-		if _, err := cfg.getClientCertificate(nil); err != nil {
-			return nil, err
-		}
-		tlsConfig.GetClientCertificate = cfg.getClientCertificate
-	}
-
-	return tlsConfig, nil
-}
-
-// readCAFile reads the CA cert file from disk.
-func readCAFile(f string) ([]byte, error) {
-	data, err := os.ReadFile(f)
-	if err != nil {
-		return nil, fmt.Errorf("unable to load specified CA cert %s: %s", f, err)
-	}
-	return data, nil
-}
-
-// updateRootCA parses the given byte slice as a series of PEM encoded certificates and updates tls.Config.RootCAs.
-func updateRootCA(cfg *tls.Config, b []byte) bool {
-	caCertPool := x509.NewCertPool()
-	if !caCertPool.AppendCertsFromPEM(b) {
-		return false
-	}
-	cfg.RootCAs = caCertPool
-	return true
-}
-
-// getClientCertificate reads the pair of client cert and key from disk and returns a tls.Certificate.
-func (c *TLSConfig) getClientCertificate(*tls.CertificateRequestInfo) (*tls.Certificate, error) {
-	cert, err := tls.LoadX509KeyPair(c.CertFile, c.KeyFile)
-	if err != nil {
-		return nil, fmt.Errorf("unable to use specified client cert (%s) & key (%s): %s", c.CertFile, c.KeyFile, err)
-	}
-	return &cert, nil
-}
-
-// TLSConfig configures the options for TLS connections.
-type TLSConfig struct {
-	// The CA cert to use for the targets.
-	CAFile string `yaml:"ca_file"`
-	// The client cert file for the targets.
-	CertFile string `yaml:"cert_file"`
-	// The client key file for the targets.
-	KeyFile string `yaml:"key_file"`
-	// Used to verify the hostname for the targets.
-	ServerName string `yaml:"server_name"`
-	// Disable target certificate validation.
-	InsecureSkipVerify bool `yaml:"insecure_skip_verify"`
-}