clusterstorage.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456
  1. package storage
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "net"
  7. "net/http"
  8. "net/url"
  9. "os"
  10. "path/filepath"
  11. "strings"
  12. "time"
  13. "github.com/opencost/opencost/core/pkg/log"
  14. "github.com/opencost/opencost/core/pkg/util/httputil"
  15. "github.com/opencost/opencost/core/pkg/util/json"
  16. "gopkg.in/yaml.v2"
  17. )
  18. var defaultClusterConfig = ClusterConfig{
  19. Host: "localhost",
  20. Port: 9006,
  21. HTTPConfig: HTTPConfig{
  22. IdleConnTimeout: 90 * time.Second,
  23. ResponseHeaderTimeout: 2 * time.Minute,
  24. TLSHandshakeTimeout: 10 * time.Second,
  25. ExpectContinueTimeout: 1 * time.Second,
  26. MaxIdleConns: 100,
  27. MaxIdleConnsPerHost: 100,
  28. MaxConnsPerHost: 0,
  29. },
  30. }
  31. // ClusterStorage is a Storage implementation which connects to a remote file storage over http
  32. type ClusterStorage struct {
  33. client *http.Client
  34. host string
  35. port int
  36. }
  37. type ClusterConfig struct {
  38. Host string `yaml:"host"`
  39. Port int `yaml:"port"`
  40. HTTPConfig HTTPConfig `yaml:"http_config"`
  41. }
  42. // parseConfig unmarshals a buffer into a Config with default HTTPConfig values.
  43. func parseClusterConfig(conf []byte) (ClusterConfig, error) {
  44. config := defaultClusterConfig
  45. if err := yaml.Unmarshal(conf, &config); err != nil {
  46. return ClusterConfig{}, err
  47. }
  48. return config, nil
  49. }
  50. func NewClusterStorage(conf []byte) (*ClusterStorage, error) {
  51. config, err := parseClusterConfig(conf)
  52. if err != nil {
  53. return nil, err
  54. }
  55. return NewClusterStorageWith(config)
  56. }
  57. // NewBucketWithConfig returns a new Bucket using the provided s3 config values.
  58. func NewClusterStorageWith(config ClusterConfig) (*ClusterStorage, error) {
  59. dt, err := config.HTTPConfig.GetHTTPTransport()
  60. if err != nil {
  61. return nil, fmt.Errorf("error creating transport: %w", err)
  62. }
  63. cs := &ClusterStorage{
  64. host: config.Host,
  65. port: config.Port,
  66. client: &http.Client{Transport: dt},
  67. }
  68. // Wait on cluster storage to respond before returning
  69. defaultWait := 5 * time.Second
  70. retry := 0
  71. maxTries := 5
  72. for {
  73. err := cs.check()
  74. if err == nil {
  75. break
  76. }
  77. log.Debugf("ClusterStorage: error connecting to cluster storage: %s", err.Error())
  78. if retry >= maxTries {
  79. return nil, fmt.Errorf("ClusterStorage: failed to connect to cluster storage after %d trys", maxTries)
  80. }
  81. waitTime := httputil.ExponentialBackoffWaitFor(defaultWait, retry)
  82. log.Infof("ClusterStorage: failed to connecting cluster storage. retry in %s", waitTime.String())
  83. time.Sleep(waitTime)
  84. retry++
  85. }
  86. log.Debugf("ClusterStorage: New cluster storage client initialized with '%s://%s:%d'", cs.scheme(), config.Host, config.Port)
  87. return cs, nil
  88. }
  89. func (c *ClusterStorage) makeRequest(method, url string, body io.Reader, fn func(*http.Response) error) error {
  90. request, err := http.NewRequest(method, url, body)
  91. if err != nil {
  92. return fmt.Errorf("failed to build request: %w", err)
  93. }
  94. resp, err := c.client.Do(request)
  95. if err != nil {
  96. return fmt.Errorf("request failed: %w", err)
  97. }
  98. if resp.Body != nil {
  99. defer resp.Body.Close()
  100. }
  101. if !(resp.StatusCode >= 200 && resp.StatusCode <= 299) {
  102. if resp.Body != nil {
  103. var errResp Response[any]
  104. err = json.NewDecoder(resp.Body).Decode(&errResp)
  105. if err == nil {
  106. return fmt.Errorf("invalid response %d: %s", resp.StatusCode, errResp.Message)
  107. }
  108. }
  109. return fmt.Errorf("invalid response %d", resp.StatusCode)
  110. }
  111. if fn != nil {
  112. err = fn(resp)
  113. if err != nil {
  114. return fmt.Errorf("failed to handle response: %w", err)
  115. }
  116. }
  117. return nil
  118. }
  119. func (c *ClusterStorage) getURL(subpath string, args map[string]string) string {
  120. pathElems := strings.Split(subpath, "/")
  121. u := new(url.URL)
  122. u.Scheme = c.scheme()
  123. u.Host = net.JoinHostPort(c.host, fmt.Sprintf("%d", c.port))
  124. u = u.JoinPath(pathElems...)
  125. q := make(url.Values)
  126. for k, v := range args {
  127. q.Set(k, v)
  128. }
  129. rawQuery, _ := url.QueryUnescape(q.Encode())
  130. u.RawQuery = rawQuery
  131. return u.String() // <-- full URL string
  132. }
  133. func (c *ClusterStorage) check() error {
  134. err := c.makeRequest(
  135. http.MethodGet,
  136. c.getURL("healthz", nil),
  137. nil,
  138. nil,
  139. )
  140. if err != nil {
  141. return fmt.Errorf("ClusterStorage: failed health check: %w", err)
  142. }
  143. return nil
  144. }
  145. // String returns the host and port for the cluster storage.
  146. func (c *ClusterStorage) String() string {
  147. return fmt.Sprintf("%s:%d", c.host, c.port)
  148. }
  149. func (c *ClusterStorage) StorageType() StorageType {
  150. return StorageTypeCluster
  151. }
  152. // scheme returns the protocol scheme (http or https) based on TLS configuration
  153. func (c *ClusterStorage) scheme() string {
  154. if c.client.Transport != nil {
  155. if transport, ok := c.client.Transport.(*http.Transport); ok {
  156. if transport.TLSClientConfig != nil && !transport.TLSClientConfig.InsecureSkipVerify {
  157. return "https"
  158. }
  159. }
  160. }
  161. return "http"
  162. }
  163. func (c *ClusterStorage) FullPath(path string) string {
  164. var jsonResp Response[string]
  165. fn := func(resp *http.Response) error {
  166. err := json.NewDecoder(resp.Body).Decode(&jsonResp)
  167. if err != nil {
  168. return fmt.Errorf("failed to decode json: %w", err)
  169. }
  170. return nil
  171. }
  172. args := map[string]string{
  173. "path": path,
  174. }
  175. err := c.makeRequest(
  176. http.MethodGet,
  177. c.getURL("clusterStorage/fullPath", args),
  178. nil,
  179. fn,
  180. )
  181. if err != nil {
  182. log.Errorf("ClusterStorage: FullPath: %s", err.Error())
  183. }
  184. return jsonResp.Data
  185. }
  186. type Response[T any] struct {
  187. Code int `json:"code"`
  188. Data T `json:"data"`
  189. Message string `json:"message"`
  190. }
  191. func (c *ClusterStorage) Stat(path string) (*StorageInfo, error) {
  192. log.Debugf("ClusterStorage::Stat::%s(%s)", strings.ToUpper(c.scheme()), path)
  193. var jsonResp Response[*StorageInfo]
  194. fn := func(resp *http.Response) error {
  195. err := json.NewDecoder(resp.Body).Decode(&jsonResp)
  196. if err != nil {
  197. return fmt.Errorf("failed to decode json: %w", err)
  198. }
  199. return nil
  200. }
  201. args := map[string]string{
  202. "path": path,
  203. }
  204. err := c.makeRequest(
  205. http.MethodGet,
  206. c.getURL("clusterStorage/stat", args),
  207. nil,
  208. fn,
  209. )
  210. if err != nil {
  211. return nil, fmt.Errorf("ClusterStorage: Stat: %w", err)
  212. }
  213. return jsonResp.Data, nil
  214. }
  215. func (c *ClusterStorage) Read(path string) ([]byte, error) {
  216. log.Debugf("ClusterStorage::Read::%s(%s)", strings.ToUpper(c.scheme()), path)
  217. var jsonResp Response[[]byte]
  218. fn := func(resp *http.Response) error {
  219. err := json.NewDecoder(resp.Body).Decode(&jsonResp)
  220. if err != nil {
  221. return fmt.Errorf("failed to decode json: %w", err)
  222. }
  223. return nil
  224. }
  225. args := map[string]string{
  226. "path": path,
  227. }
  228. err := c.makeRequest(
  229. http.MethodGet,
  230. c.getURL("clusterStorage/read", args),
  231. nil,
  232. fn,
  233. )
  234. if err != nil {
  235. return nil, fmt.Errorf("ClusterStorage: Read: %w", err)
  236. }
  237. return jsonResp.Data, nil
  238. }
  239. // ReadStream returns a reader for the specified object path.
  240. //
  241. // Note: ClusterStorage does not currently expose a remote streaming endpoint, so this
  242. // implementation materializes the response via Read and wraps it as an io.ReadCloser.
  243. func (c *ClusterStorage) ReadStream(path string) (io.ReadCloser, error) {
  244. data, err := c.Read(path)
  245. if err != nil {
  246. return nil, err
  247. }
  248. return io.NopCloser(bytes.NewReader(data)), nil
  249. }
  250. // ReadToLocalFile downloads the specified object at path to destPath on the local file system.
  251. //
  252. // Note: ClusterStorage does not currently expose a streaming download endpoint, so this implementation
  253. // loads the content via Read() and then writes it to destPath.
  254. func (c *ClusterStorage) ReadToLocalFile(path, destPath string) error {
  255. log.Debugf("ClusterStorage::ReadToLocalFile::%s(%s) -> %s", strings.ToUpper(c.scheme()), path, destPath)
  256. data, err := c.Read(path)
  257. if err != nil {
  258. return err
  259. }
  260. if err := os.MkdirAll(filepath.Dir(destPath), os.ModePerm); err != nil {
  261. return fmt.Errorf("ClusterStorage: ReadToLocalFile: creating destination directory: %w", err)
  262. }
  263. if err := os.WriteFile(destPath, data, 0600); err != nil {
  264. return fmt.Errorf("ClusterStorage: ReadToLocalFile: writing destination file: %w", err)
  265. }
  266. return nil
  267. }
  268. func (c *ClusterStorage) Write(path string, data []byte) error {
  269. log.Debugf("ClusterStorage::Write::%s(%s)", strings.ToUpper(c.scheme()), path)
  270. fn := func(resp *http.Response) error {
  271. return nil
  272. }
  273. args := map[string]string{
  274. "path": path,
  275. }
  276. err := c.makeRequest(
  277. http.MethodPut,
  278. c.getURL("clusterStorage/write", args),
  279. bytes.NewReader(data),
  280. fn,
  281. )
  282. if err != nil {
  283. return fmt.Errorf("ClusterStorage: Write: %w", err)
  284. }
  285. return nil
  286. }
  287. func (c *ClusterStorage) Remove(path string) error {
  288. log.Debugf("ClusterStorage::Remove::%s(%s)", strings.ToUpper(c.scheme()), path)
  289. fn := func(resp *http.Response) error {
  290. return nil
  291. }
  292. args := map[string]string{
  293. "path": path,
  294. }
  295. err := c.makeRequest(
  296. http.MethodDelete,
  297. c.getURL("clusterStorage/remove", args),
  298. nil,
  299. fn,
  300. )
  301. if err != nil {
  302. return fmt.Errorf("ClusterStorage: Write: %w", err)
  303. }
  304. return nil
  305. }
  306. func (c *ClusterStorage) Exists(path string) (bool, error) {
  307. log.Debugf("ClusterStorage::Exists::%s(%s)", strings.ToUpper(c.scheme()), path)
  308. var jsonResp Response[bool]
  309. fn := func(resp *http.Response) error {
  310. err := json.NewDecoder(resp.Body).Decode(&jsonResp)
  311. if err != nil {
  312. return fmt.Errorf("failed to decode json: %w", err)
  313. }
  314. return nil
  315. }
  316. args := map[string]string{
  317. "path": path,
  318. }
  319. err := c.makeRequest(
  320. http.MethodGet,
  321. c.getURL("clusterStorage/exists", args),
  322. nil,
  323. fn,
  324. )
  325. if err != nil {
  326. return false, fmt.Errorf("ClusterStorage: Exists: %w", err)
  327. }
  328. return jsonResp.Data, nil
  329. }
  330. func (c *ClusterStorage) List(path string) ([]*StorageInfo, error) {
  331. log.Debugf("ClusterStorage::List::%s(%s)", strings.ToUpper(c.scheme()), path)
  332. var jsonResp Response[[]*StorageInfo]
  333. fn := func(resp *http.Response) error {
  334. err := json.NewDecoder(resp.Body).Decode(&jsonResp)
  335. if err != nil {
  336. return fmt.Errorf("failed to decode json: %w", err)
  337. }
  338. return nil
  339. }
  340. args := map[string]string{
  341. "path": path,
  342. }
  343. err := c.makeRequest(
  344. http.MethodGet,
  345. c.getURL("clusterStorage/list", args),
  346. nil,
  347. fn,
  348. )
  349. if err != nil {
  350. return nil, fmt.Errorf("ClusterStorage: List: %w", err)
  351. }
  352. return jsonResp.Data, nil
  353. }
  354. func (c *ClusterStorage) ListDirectories(path string) ([]*StorageInfo, error) {
  355. log.Debugf("ClusterStorage::ListDirectories::%s(%s)", strings.ToUpper(c.scheme()), path)
  356. var jsonResp Response[[]*StorageInfo]
  357. fn := func(resp *http.Response) error {
  358. err := json.NewDecoder(resp.Body).Decode(&jsonResp)
  359. if err != nil {
  360. return fmt.Errorf("failed to decode json: %w", err)
  361. }
  362. return nil
  363. }
  364. args := map[string]string{
  365. "path": path,
  366. }
  367. err := c.makeRequest(
  368. http.MethodGet,
  369. c.getURL("clusterStorage/listDirectories", args),
  370. nil,
  371. fn,
  372. )
  373. if err != nil {
  374. return nil, fmt.Errorf("ClusterStorage: List Directories: %w", err)
  375. }
  376. // add '/' to the end of directory names to match other bucket storage types
  377. for _, info := range jsonResp.Data {
  378. info.Name = strings.TrimSuffix(info.Name, DirDelim) + DirDelim
  379. }
  380. return jsonResp.Data, nil
  381. }