| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456 |
- package storage
- import (
- "bytes"
- "fmt"
- "io"
- "net"
- "net/http"
- "net/url"
- "os"
- "path/filepath"
- "strings"
- "time"
- "github.com/opencost/opencost/core/pkg/log"
- "github.com/opencost/opencost/core/pkg/util/httputil"
- "github.com/opencost/opencost/core/pkg/util/json"
- "gopkg.in/yaml.v2"
- )
- var defaultClusterConfig = ClusterConfig{
- Host: "localhost",
- Port: 9006,
- HTTPConfig: HTTPConfig{
- IdleConnTimeout: 90 * time.Second,
- ResponseHeaderTimeout: 2 * time.Minute,
- TLSHandshakeTimeout: 10 * time.Second,
- ExpectContinueTimeout: 1 * time.Second,
- MaxIdleConns: 100,
- MaxIdleConnsPerHost: 100,
- MaxConnsPerHost: 0,
- },
- }
- // ClusterStorage is a Storage implementation which connects to a remote file storage over http
- type ClusterStorage struct {
- client *http.Client
- host string
- port int
- }
- type ClusterConfig struct {
- Host string `yaml:"host"`
- Port int `yaml:"port"`
- HTTPConfig HTTPConfig `yaml:"http_config"`
- }
- // parseConfig unmarshals a buffer into a Config with default HTTPConfig values.
- func parseClusterConfig(conf []byte) (ClusterConfig, error) {
- config := defaultClusterConfig
- if err := yaml.Unmarshal(conf, &config); err != nil {
- return ClusterConfig{}, err
- }
- return config, nil
- }
- func NewClusterStorage(conf []byte) (*ClusterStorage, error) {
- config, err := parseClusterConfig(conf)
- if err != nil {
- return nil, err
- }
- return NewClusterStorageWith(config)
- }
- // NewBucketWithConfig returns a new Bucket using the provided s3 config values.
- func NewClusterStorageWith(config ClusterConfig) (*ClusterStorage, error) {
- dt, err := config.HTTPConfig.GetHTTPTransport()
- if err != nil {
- return nil, fmt.Errorf("error creating transport: %w", err)
- }
- cs := &ClusterStorage{
- host: config.Host,
- port: config.Port,
- client: &http.Client{Transport: dt},
- }
- // Wait on cluster storage to respond before returning
- defaultWait := 5 * time.Second
- retry := 0
- maxTries := 5
- for {
- err := cs.check()
- if err == nil {
- break
- }
- log.Debugf("ClusterStorage: error connecting to cluster storage: %s", err.Error())
- if retry >= maxTries {
- return nil, fmt.Errorf("ClusterStorage: failed to connect to cluster storage after %d trys", maxTries)
- }
- waitTime := httputil.ExponentialBackoffWaitFor(defaultWait, retry)
- log.Infof("ClusterStorage: failed to connecting cluster storage. retry in %s", waitTime.String())
- time.Sleep(waitTime)
- retry++
- }
- log.Debugf("ClusterStorage: New cluster storage client initialized with '%s://%s:%d'", cs.scheme(), config.Host, config.Port)
- return cs, nil
- }
- func (c *ClusterStorage) makeRequest(method, url string, body io.Reader, fn func(*http.Response) error) error {
- request, err := http.NewRequest(method, url, body)
- if err != nil {
- return fmt.Errorf("failed to build request: %w", err)
- }
- resp, err := c.client.Do(request)
- if err != nil {
- return fmt.Errorf("request failed: %w", err)
- }
- if resp.Body != nil {
- defer resp.Body.Close()
- }
- if !(resp.StatusCode >= 200 && resp.StatusCode <= 299) {
- if resp.Body != nil {
- var errResp Response[any]
- err = json.NewDecoder(resp.Body).Decode(&errResp)
- if err == nil {
- return fmt.Errorf("invalid response %d: %s", resp.StatusCode, errResp.Message)
- }
- }
- return fmt.Errorf("invalid response %d", resp.StatusCode)
- }
- if fn != nil {
- err = fn(resp)
- if err != nil {
- return fmt.Errorf("failed to handle response: %w", err)
- }
- }
- return nil
- }
- func (c *ClusterStorage) getURL(subpath string, args map[string]string) string {
- pathElems := strings.Split(subpath, "/")
- u := new(url.URL)
- u.Scheme = c.scheme()
- u.Host = net.JoinHostPort(c.host, fmt.Sprintf("%d", c.port))
- u = u.JoinPath(pathElems...)
- q := make(url.Values)
- for k, v := range args {
- q.Set(k, v)
- }
- rawQuery, _ := url.QueryUnescape(q.Encode())
- u.RawQuery = rawQuery
- return u.String() // <-- full URL string
- }
- func (c *ClusterStorage) check() error {
- err := c.makeRequest(
- http.MethodGet,
- c.getURL("healthz", nil),
- nil,
- nil,
- )
- if err != nil {
- return fmt.Errorf("ClusterStorage: failed health check: %w", err)
- }
- return nil
- }
- // String returns the host and port for the cluster storage.
- func (c *ClusterStorage) String() string {
- return fmt.Sprintf("%s:%d", c.host, c.port)
- }
- func (c *ClusterStorage) StorageType() StorageType {
- return StorageTypeCluster
- }
- // scheme returns the protocol scheme (http or https) based on TLS configuration
- func (c *ClusterStorage) scheme() string {
- if c.client.Transport != nil {
- if transport, ok := c.client.Transport.(*http.Transport); ok {
- if transport.TLSClientConfig != nil && !transport.TLSClientConfig.InsecureSkipVerify {
- return "https"
- }
- }
- }
- return "http"
- }
- func (c *ClusterStorage) FullPath(path string) string {
- var jsonResp Response[string]
- fn := func(resp *http.Response) error {
- err := json.NewDecoder(resp.Body).Decode(&jsonResp)
- if err != nil {
- return fmt.Errorf("failed to decode json: %w", err)
- }
- return nil
- }
- args := map[string]string{
- "path": path,
- }
- err := c.makeRequest(
- http.MethodGet,
- c.getURL("clusterStorage/fullPath", args),
- nil,
- fn,
- )
- if err != nil {
- log.Errorf("ClusterStorage: FullPath: %s", err.Error())
- }
- return jsonResp.Data
- }
- type Response[T any] struct {
- Code int `json:"code"`
- Data T `json:"data"`
- Message string `json:"message"`
- }
- func (c *ClusterStorage) Stat(path string) (*StorageInfo, error) {
- log.Debugf("ClusterStorage::Stat::%s(%s)", strings.ToUpper(c.scheme()), path)
- var jsonResp Response[*StorageInfo]
- fn := func(resp *http.Response) error {
- err := json.NewDecoder(resp.Body).Decode(&jsonResp)
- if err != nil {
- return fmt.Errorf("failed to decode json: %w", err)
- }
- return nil
- }
- args := map[string]string{
- "path": path,
- }
- err := c.makeRequest(
- http.MethodGet,
- c.getURL("clusterStorage/stat", args),
- nil,
- fn,
- )
- if err != nil {
- return nil, fmt.Errorf("ClusterStorage: Stat: %w", err)
- }
- return jsonResp.Data, nil
- }
- func (c *ClusterStorage) Read(path string) ([]byte, error) {
- log.Debugf("ClusterStorage::Read::%s(%s)", strings.ToUpper(c.scheme()), path)
- var jsonResp Response[[]byte]
- fn := func(resp *http.Response) error {
- err := json.NewDecoder(resp.Body).Decode(&jsonResp)
- if err != nil {
- return fmt.Errorf("failed to decode json: %w", err)
- }
- return nil
- }
- args := map[string]string{
- "path": path,
- }
- err := c.makeRequest(
- http.MethodGet,
- c.getURL("clusterStorage/read", args),
- nil,
- fn,
- )
- if err != nil {
- return nil, fmt.Errorf("ClusterStorage: Read: %w", err)
- }
- return jsonResp.Data, nil
- }
- // ReadStream returns a reader for the specified object path.
- //
- // Note: ClusterStorage does not currently expose a remote streaming endpoint, so this
- // implementation materializes the response via Read and wraps it as an io.ReadCloser.
- func (c *ClusterStorage) ReadStream(path string) (io.ReadCloser, error) {
- data, err := c.Read(path)
- if err != nil {
- return nil, err
- }
- return io.NopCloser(bytes.NewReader(data)), nil
- }
- // ReadToLocalFile downloads the specified object at path to destPath on the local file system.
- //
- // Note: ClusterStorage does not currently expose a streaming download endpoint, so this implementation
- // loads the content via Read() and then writes it to destPath.
- func (c *ClusterStorage) ReadToLocalFile(path, destPath string) error {
- log.Debugf("ClusterStorage::ReadToLocalFile::%s(%s) -> %s", strings.ToUpper(c.scheme()), path, destPath)
- data, err := c.Read(path)
- if err != nil {
- return err
- }
- if err := os.MkdirAll(filepath.Dir(destPath), os.ModePerm); err != nil {
- return fmt.Errorf("ClusterStorage: ReadToLocalFile: creating destination directory: %w", err)
- }
- if err := os.WriteFile(destPath, data, 0600); err != nil {
- return fmt.Errorf("ClusterStorage: ReadToLocalFile: writing destination file: %w", err)
- }
- return nil
- }
- func (c *ClusterStorage) Write(path string, data []byte) error {
- log.Debugf("ClusterStorage::Write::%s(%s)", strings.ToUpper(c.scheme()), path)
- fn := func(resp *http.Response) error {
- return nil
- }
- args := map[string]string{
- "path": path,
- }
- err := c.makeRequest(
- http.MethodPut,
- c.getURL("clusterStorage/write", args),
- bytes.NewReader(data),
- fn,
- )
- if err != nil {
- return fmt.Errorf("ClusterStorage: Write: %w", err)
- }
- return nil
- }
- func (c *ClusterStorage) Remove(path string) error {
- log.Debugf("ClusterStorage::Remove::%s(%s)", strings.ToUpper(c.scheme()), path)
- fn := func(resp *http.Response) error {
- return nil
- }
- args := map[string]string{
- "path": path,
- }
- err := c.makeRequest(
- http.MethodDelete,
- c.getURL("clusterStorage/remove", args),
- nil,
- fn,
- )
- if err != nil {
- return fmt.Errorf("ClusterStorage: Write: %w", err)
- }
- return nil
- }
- func (c *ClusterStorage) Exists(path string) (bool, error) {
- log.Debugf("ClusterStorage::Exists::%s(%s)", strings.ToUpper(c.scheme()), path)
- var jsonResp Response[bool]
- fn := func(resp *http.Response) error {
- err := json.NewDecoder(resp.Body).Decode(&jsonResp)
- if err != nil {
- return fmt.Errorf("failed to decode json: %w", err)
- }
- return nil
- }
- args := map[string]string{
- "path": path,
- }
- err := c.makeRequest(
- http.MethodGet,
- c.getURL("clusterStorage/exists", args),
- nil,
- fn,
- )
- if err != nil {
- return false, fmt.Errorf("ClusterStorage: Exists: %w", err)
- }
- return jsonResp.Data, nil
- }
- func (c *ClusterStorage) List(path string) ([]*StorageInfo, error) {
- log.Debugf("ClusterStorage::List::%s(%s)", strings.ToUpper(c.scheme()), path)
- var jsonResp Response[[]*StorageInfo]
- fn := func(resp *http.Response) error {
- err := json.NewDecoder(resp.Body).Decode(&jsonResp)
- if err != nil {
- return fmt.Errorf("failed to decode json: %w", err)
- }
- return nil
- }
- args := map[string]string{
- "path": path,
- }
- err := c.makeRequest(
- http.MethodGet,
- c.getURL("clusterStorage/list", args),
- nil,
- fn,
- )
- if err != nil {
- return nil, fmt.Errorf("ClusterStorage: List: %w", err)
- }
- return jsonResp.Data, nil
- }
- func (c *ClusterStorage) ListDirectories(path string) ([]*StorageInfo, error) {
- log.Debugf("ClusterStorage::ListDirectories::%s(%s)", strings.ToUpper(c.scheme()), path)
- var jsonResp Response[[]*StorageInfo]
- fn := func(resp *http.Response) error {
- err := json.NewDecoder(resp.Body).Decode(&jsonResp)
- if err != nil {
- return fmt.Errorf("failed to decode json: %w", err)
- }
- return nil
- }
- args := map[string]string{
- "path": path,
- }
- err := c.makeRequest(
- http.MethodGet,
- c.getURL("clusterStorage/listDirectories", args),
- nil,
- fn,
- )
- if err != nil {
- return nil, fmt.Errorf("ClusterStorage: List Directories: %w", err)
- }
- // add '/' to the end of directory names to match other bucket storage types
- for _, info := range jsonResp.Data {
- info.Name = strings.TrimSuffix(info.Name, DirDelim) + DirDelim
- }
- return jsonResp.Data, nil
- }
|