package storage import ( "bytes" "fmt" "io" "net" "net/http" "net/url" "os" "path/filepath" "strings" "time" "github.com/opencost/opencost/core/pkg/log" "github.com/opencost/opencost/core/pkg/util/httputil" "github.com/opencost/opencost/core/pkg/util/json" "gopkg.in/yaml.v2" ) var defaultClusterConfig = ClusterConfig{ Host: "localhost", Port: 9006, HTTPConfig: HTTPConfig{ IdleConnTimeout: 90 * time.Second, ResponseHeaderTimeout: 2 * time.Minute, TLSHandshakeTimeout: 10 * time.Second, ExpectContinueTimeout: 1 * time.Second, MaxIdleConns: 100, MaxIdleConnsPerHost: 100, MaxConnsPerHost: 0, }, } // ClusterStorage is a Storage implementation which connects to a remote file storage over http type ClusterStorage struct { client *http.Client host string port int } type ClusterConfig struct { Host string `yaml:"host"` Port int `yaml:"port"` HTTPConfig HTTPConfig `yaml:"http_config"` } // parseConfig unmarshals a buffer into a Config with default HTTPConfig values. func parseClusterConfig(conf []byte) (ClusterConfig, error) { config := defaultClusterConfig if err := yaml.Unmarshal(conf, &config); err != nil { return ClusterConfig{}, err } return config, nil } func NewClusterStorage(conf []byte) (*ClusterStorage, error) { config, err := parseClusterConfig(conf) if err != nil { return nil, err } return NewClusterStorageWith(config) } // NewBucketWithConfig returns a new Bucket using the provided s3 config values. func NewClusterStorageWith(config ClusterConfig) (*ClusterStorage, error) { dt, err := config.HTTPConfig.GetHTTPTransport() if err != nil { return nil, fmt.Errorf("error creating transport: %w", err) } cs := &ClusterStorage{ host: config.Host, port: config.Port, client: &http.Client{Transport: dt}, } // Wait on cluster storage to respond before returning defaultWait := 5 * time.Second retry := 0 maxTries := 5 for { err := cs.check() if err == nil { break } log.Debugf("ClusterStorage: error connecting to cluster storage: %s", err.Error()) if retry >= maxTries { return nil, fmt.Errorf("ClusterStorage: failed to connect to cluster storage after %d trys", maxTries) } waitTime := httputil.ExponentialBackoffWaitFor(defaultWait, retry) log.Infof("ClusterStorage: failed to connecting cluster storage. retry in %s", waitTime.String()) time.Sleep(waitTime) retry++ } log.Debugf("ClusterStorage: New cluster storage client initialized with '%s://%s:%d'", cs.scheme(), config.Host, config.Port) return cs, nil } func (c *ClusterStorage) makeRequest(method, url string, body io.Reader, fn func(*http.Response) error) error { request, err := http.NewRequest(method, url, body) if err != nil { return fmt.Errorf("failed to build request: %w", err) } resp, err := c.client.Do(request) if err != nil { return fmt.Errorf("request failed: %w", err) } if resp.Body != nil { defer resp.Body.Close() } if !(resp.StatusCode >= 200 && resp.StatusCode <= 299) { if resp.Body != nil { var errResp Response[any] err = json.NewDecoder(resp.Body).Decode(&errResp) if err == nil { return fmt.Errorf("invalid response %d: %s", resp.StatusCode, errResp.Message) } } return fmt.Errorf("invalid response %d", resp.StatusCode) } if fn != nil { err = fn(resp) if err != nil { return fmt.Errorf("failed to handle response: %w", err) } } return nil } func (c *ClusterStorage) getURL(subpath string, args map[string]string) string { pathElems := strings.Split(subpath, "/") u := new(url.URL) u.Scheme = c.scheme() u.Host = net.JoinHostPort(c.host, fmt.Sprintf("%d", c.port)) u = u.JoinPath(pathElems...) q := make(url.Values) for k, v := range args { q.Set(k, v) } rawQuery, _ := url.QueryUnescape(q.Encode()) u.RawQuery = rawQuery return u.String() // <-- full URL string } func (c *ClusterStorage) check() error { err := c.makeRequest( http.MethodGet, c.getURL("healthz", nil), nil, nil, ) if err != nil { return fmt.Errorf("ClusterStorage: failed health check: %w", err) } return nil } // String returns the host and port for the cluster storage. func (c *ClusterStorage) String() string { return fmt.Sprintf("%s:%d", c.host, c.port) } func (c *ClusterStorage) StorageType() StorageType { return StorageTypeCluster } // scheme returns the protocol scheme (http or https) based on TLS configuration func (c *ClusterStorage) scheme() string { if c.client.Transport != nil { if transport, ok := c.client.Transport.(*http.Transport); ok { if transport.TLSClientConfig != nil && !transport.TLSClientConfig.InsecureSkipVerify { return "https" } } } return "http" } func (c *ClusterStorage) FullPath(path string) string { var jsonResp Response[string] fn := func(resp *http.Response) error { err := json.NewDecoder(resp.Body).Decode(&jsonResp) if err != nil { return fmt.Errorf("failed to decode json: %w", err) } return nil } args := map[string]string{ "path": path, } err := c.makeRequest( http.MethodGet, c.getURL("clusterStorage/fullPath", args), nil, fn, ) if err != nil { log.Errorf("ClusterStorage: FullPath: %s", err.Error()) } return jsonResp.Data } type Response[T any] struct { Code int `json:"code"` Data T `json:"data"` Message string `json:"message"` } func (c *ClusterStorage) Stat(path string) (*StorageInfo, error) { log.Debugf("ClusterStorage::Stat::%s(%s)", strings.ToUpper(c.scheme()), path) var jsonResp Response[*StorageInfo] fn := func(resp *http.Response) error { err := json.NewDecoder(resp.Body).Decode(&jsonResp) if err != nil { return fmt.Errorf("failed to decode json: %w", err) } return nil } args := map[string]string{ "path": path, } err := c.makeRequest( http.MethodGet, c.getURL("clusterStorage/stat", args), nil, fn, ) if err != nil { return nil, fmt.Errorf("ClusterStorage: Stat: %w", err) } return jsonResp.Data, nil } func (c *ClusterStorage) Read(path string) ([]byte, error) { log.Debugf("ClusterStorage::Read::%s(%s)", strings.ToUpper(c.scheme()), path) var jsonResp Response[[]byte] fn := func(resp *http.Response) error { err := json.NewDecoder(resp.Body).Decode(&jsonResp) if err != nil { return fmt.Errorf("failed to decode json: %w", err) } return nil } args := map[string]string{ "path": path, } err := c.makeRequest( http.MethodGet, c.getURL("clusterStorage/read", args), nil, fn, ) if err != nil { return nil, fmt.Errorf("ClusterStorage: Read: %w", err) } return jsonResp.Data, nil } // ReadStream returns a reader for the specified object path. // // Note: ClusterStorage does not currently expose a remote streaming endpoint, so this // implementation materializes the response via Read and wraps it as an io.ReadCloser. func (c *ClusterStorage) ReadStream(path string) (io.ReadCloser, error) { data, err := c.Read(path) if err != nil { return nil, err } return io.NopCloser(bytes.NewReader(data)), nil } // ReadToLocalFile downloads the specified object at path to destPath on the local file system. // // Note: ClusterStorage does not currently expose a streaming download endpoint, so this implementation // loads the content via Read() and then writes it to destPath. func (c *ClusterStorage) ReadToLocalFile(path, destPath string) error { log.Debugf("ClusterStorage::ReadToLocalFile::%s(%s) -> %s", strings.ToUpper(c.scheme()), path, destPath) data, err := c.Read(path) if err != nil { return err } if err := os.MkdirAll(filepath.Dir(destPath), os.ModePerm); err != nil { return fmt.Errorf("ClusterStorage: ReadToLocalFile: creating destination directory: %w", err) } if err := os.WriteFile(destPath, data, 0600); err != nil { return fmt.Errorf("ClusterStorage: ReadToLocalFile: writing destination file: %w", err) } return nil } func (c *ClusterStorage) Write(path string, data []byte) error { log.Debugf("ClusterStorage::Write::%s(%s)", strings.ToUpper(c.scheme()), path) fn := func(resp *http.Response) error { return nil } args := map[string]string{ "path": path, } err := c.makeRequest( http.MethodPut, c.getURL("clusterStorage/write", args), bytes.NewReader(data), fn, ) if err != nil { return fmt.Errorf("ClusterStorage: Write: %w", err) } return nil } func (c *ClusterStorage) Remove(path string) error { log.Debugf("ClusterStorage::Remove::%s(%s)", strings.ToUpper(c.scheme()), path) fn := func(resp *http.Response) error { return nil } args := map[string]string{ "path": path, } err := c.makeRequest( http.MethodDelete, c.getURL("clusterStorage/remove", args), nil, fn, ) if err != nil { return fmt.Errorf("ClusterStorage: Write: %w", err) } return nil } func (c *ClusterStorage) Exists(path string) (bool, error) { log.Debugf("ClusterStorage::Exists::%s(%s)", strings.ToUpper(c.scheme()), path) var jsonResp Response[bool] fn := func(resp *http.Response) error { err := json.NewDecoder(resp.Body).Decode(&jsonResp) if err != nil { return fmt.Errorf("failed to decode json: %w", err) } return nil } args := map[string]string{ "path": path, } err := c.makeRequest( http.MethodGet, c.getURL("clusterStorage/exists", args), nil, fn, ) if err != nil { return false, fmt.Errorf("ClusterStorage: Exists: %w", err) } return jsonResp.Data, nil } func (c *ClusterStorage) List(path string) ([]*StorageInfo, error) { log.Debugf("ClusterStorage::List::%s(%s)", strings.ToUpper(c.scheme()), path) var jsonResp Response[[]*StorageInfo] fn := func(resp *http.Response) error { err := json.NewDecoder(resp.Body).Decode(&jsonResp) if err != nil { return fmt.Errorf("failed to decode json: %w", err) } return nil } args := map[string]string{ "path": path, } err := c.makeRequest( http.MethodGet, c.getURL("clusterStorage/list", args), nil, fn, ) if err != nil { return nil, fmt.Errorf("ClusterStorage: List: %w", err) } return jsonResp.Data, nil } func (c *ClusterStorage) ListDirectories(path string) ([]*StorageInfo, error) { log.Debugf("ClusterStorage::ListDirectories::%s(%s)", strings.ToUpper(c.scheme()), path) var jsonResp Response[[]*StorageInfo] fn := func(resp *http.Response) error { err := json.NewDecoder(resp.Body).Decode(&jsonResp) if err != nil { return fmt.Errorf("failed to decode json: %w", err) } return nil } args := map[string]string{ "path": path, } err := c.makeRequest( http.MethodGet, c.getURL("clusterStorage/listDirectories", args), nil, fn, ) if err != nil { return nil, fmt.Errorf("ClusterStorage: List Directories: %w", err) } // add '/' to the end of directory names to match other bucket storage types for _, info := range jsonResp.Data { info.Name = strings.TrimSuffix(info.Name, DirDelim) + DirDelim } return jsonResp.Data, nil }