clusterstorage.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402
  1. package storage
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "net"
  7. "net/http"
  8. "net/url"
  9. "strings"
  10. "time"
  11. "github.com/opencost/opencost/core/pkg/log"
  12. "github.com/opencost/opencost/core/pkg/util/httputil"
  13. "github.com/opencost/opencost/core/pkg/util/json"
  14. "gopkg.in/yaml.v2"
  15. )
  16. var defaultClusterConfig = ClusterConfig{
  17. Host: "localhost",
  18. Port: 9006,
  19. HTTPConfig: HTTPConfig{
  20. IdleConnTimeout: 90 * time.Second,
  21. ResponseHeaderTimeout: 2 * time.Minute,
  22. TLSHandshakeTimeout: 10 * time.Second,
  23. ExpectContinueTimeout: 1 * time.Second,
  24. MaxIdleConns: 100,
  25. MaxIdleConnsPerHost: 100,
  26. MaxConnsPerHost: 0,
  27. },
  28. }
  29. // ClusterStorage is a Storage implementation which connects to a remote file storage over http
  30. type ClusterStorage struct {
  31. client *http.Client
  32. host string
  33. port int
  34. }
  35. type ClusterConfig struct {
  36. Host string `yaml:"host"`
  37. Port int `yaml:"port"`
  38. HTTPConfig HTTPConfig `yaml:"http_config"`
  39. }
  40. // parseConfig unmarshals a buffer into a Config with default HTTPConfig values.
  41. func parseClusterConfig(conf []byte) (ClusterConfig, error) {
  42. config := defaultClusterConfig
  43. if err := yaml.Unmarshal(conf, &config); err != nil {
  44. return ClusterConfig{}, err
  45. }
  46. return config, nil
  47. }
  48. func NewClusterStorage(conf []byte) (*ClusterStorage, error) {
  49. config, err := parseClusterConfig(conf)
  50. if err != nil {
  51. return nil, err
  52. }
  53. return NewClusterStorageWith(config)
  54. }
  55. // NewBucketWithConfig returns a new Bucket using the provided s3 config values.
  56. func NewClusterStorageWith(config ClusterConfig) (*ClusterStorage, error) {
  57. dt, err := config.HTTPConfig.GetHTTPTransport()
  58. if err != nil {
  59. return nil, fmt.Errorf("error creating transport: %w", err)
  60. }
  61. cs := &ClusterStorage{
  62. host: config.Host,
  63. port: config.Port,
  64. client: &http.Client{Transport: dt},
  65. }
  66. // Wait on cluster storage to respond before returning
  67. defaultWait := 5 * time.Second
  68. retry := 0
  69. maxTries := 5
  70. for {
  71. err := cs.check()
  72. if err == nil {
  73. break
  74. }
  75. log.Debugf("ClusterStorage: error connecting to cluster storage: %s", err.Error())
  76. if retry >= maxTries {
  77. return nil, fmt.Errorf("ClusterStorage: failed to connect to cluster storage after %d trys", maxTries)
  78. }
  79. waitTime := httputil.ExponentialBackoffWaitFor(defaultWait, retry)
  80. log.Infof("ClusterStorage: failed to connecting cluster storage. retry in %s", waitTime.String())
  81. time.Sleep(waitTime)
  82. retry++
  83. }
  84. return cs, nil
  85. }
  86. func (c *ClusterStorage) makeRequest(method, url string, body io.Reader, fn func(*http.Response) error) error {
  87. request, err := http.NewRequest(method, url, body)
  88. if err != nil {
  89. return fmt.Errorf("failed to build request: %w", err)
  90. }
  91. resp, err := c.client.Do(request)
  92. if err != nil {
  93. return fmt.Errorf("request failed: %w", err)
  94. }
  95. if resp.Body != nil {
  96. defer resp.Body.Close()
  97. }
  98. if !(resp.StatusCode >= 200 && resp.StatusCode <= 299) {
  99. if resp.Body != nil {
  100. var errResp Response[any]
  101. err = json.NewDecoder(resp.Body).Decode(&errResp)
  102. if err == nil {
  103. return fmt.Errorf("invalid response %d: %s", resp.StatusCode, errResp.Message)
  104. }
  105. }
  106. return fmt.Errorf("invalid response %d", resp.StatusCode)
  107. }
  108. if fn != nil {
  109. err = fn(resp)
  110. if err != nil {
  111. return fmt.Errorf("failed to handle response: %w", err)
  112. }
  113. }
  114. return nil
  115. }
  116. func (c *ClusterStorage) getURL(subpath string, args map[string]string) string {
  117. pathElems := strings.Split(subpath, "/")
  118. u := new(url.URL)
  119. u.Scheme = c.scheme()
  120. u.Host = net.JoinHostPort(c.host, fmt.Sprintf("%d", c.port))
  121. u = u.JoinPath(pathElems...)
  122. q := make(url.Values)
  123. for k, v := range args {
  124. q.Set(k, v)
  125. }
  126. rawQuery, _ := url.QueryUnescape(q.Encode())
  127. u.RawQuery = rawQuery
  128. return u.String() // <-- full URL string
  129. }
  130. func (c *ClusterStorage) check() error {
  131. err := c.makeRequest(
  132. http.MethodGet,
  133. c.getURL("healthz", nil),
  134. nil,
  135. nil,
  136. )
  137. if err != nil {
  138. return fmt.Errorf("ClusterStorage: failed health check: %w", err)
  139. }
  140. return nil
  141. }
  142. // String returns the host and port for the cluster storage.
  143. func (c *ClusterStorage) String() string {
  144. return fmt.Sprintf("%s:%d", c.host, c.port)
  145. }
  146. func (c *ClusterStorage) StorageType() StorageType {
  147. return StorageTypeCluster
  148. }
  149. func (c *ClusterStorage) scheme() string {
  150. if c.client.Transport != nil {
  151. if transport, ok := c.client.Transport.(*http.Transport); ok {
  152. if transport.TLSClientConfig != nil && !transport.TLSClientConfig.InsecureSkipVerify {
  153. return "https"
  154. }
  155. }
  156. }
  157. return "http"
  158. }
  159. func (c *ClusterStorage) FullPath(path string) string {
  160. var jsonResp Response[string]
  161. fn := func(resp *http.Response) error {
  162. err := json.NewDecoder(resp.Body).Decode(&jsonResp)
  163. if err != nil {
  164. return fmt.Errorf("failed to decode json: %w", err)
  165. }
  166. return nil
  167. }
  168. args := map[string]string{
  169. "path": path,
  170. }
  171. err := c.makeRequest(
  172. http.MethodGet,
  173. c.getURL("clusterStorage/fullPath", args),
  174. nil,
  175. fn,
  176. )
  177. if err != nil {
  178. log.Errorf("ClusterStorage: FullPath: %s", err.Error())
  179. }
  180. return jsonResp.Data
  181. }
  182. type Response[T any] struct {
  183. Code int `json:"code"`
  184. Data T `json:"data"`
  185. Message string `json:"message"`
  186. }
  187. func (c *ClusterStorage) Stat(path string) (*StorageInfo, error) {
  188. var jsonResp Response[*StorageInfo]
  189. fn := func(resp *http.Response) error {
  190. err := json.NewDecoder(resp.Body).Decode(&jsonResp)
  191. if err != nil {
  192. return fmt.Errorf("failed to decode json: %w", err)
  193. }
  194. return nil
  195. }
  196. args := map[string]string{
  197. "path": path,
  198. }
  199. err := c.makeRequest(
  200. http.MethodGet,
  201. c.getURL("clusterStorage/stat", args),
  202. nil,
  203. fn,
  204. )
  205. if err != nil {
  206. return nil, fmt.Errorf("ClusterStorage: Stat: %w", err)
  207. }
  208. return jsonResp.Data, nil
  209. }
  210. func (c *ClusterStorage) Read(path string) ([]byte, error) {
  211. var jsonResp Response[[]byte]
  212. fn := func(resp *http.Response) error {
  213. err := json.NewDecoder(resp.Body).Decode(&jsonResp)
  214. if err != nil {
  215. return fmt.Errorf("failed to decode json: %w", err)
  216. }
  217. return nil
  218. }
  219. args := map[string]string{
  220. "path": path,
  221. }
  222. err := c.makeRequest(
  223. http.MethodGet,
  224. c.getURL("clusterStorage/read", args),
  225. nil,
  226. fn,
  227. )
  228. if err != nil {
  229. return nil, fmt.Errorf("ClusterStorage: Read: %w", err)
  230. }
  231. return jsonResp.Data, nil
  232. }
  233. func (c *ClusterStorage) Write(path string, data []byte) error {
  234. fn := func(resp *http.Response) error {
  235. return nil
  236. }
  237. args := map[string]string{
  238. "path": path,
  239. }
  240. err := c.makeRequest(
  241. http.MethodPut,
  242. c.getURL("clusterStorage/write", args),
  243. bytes.NewReader(data),
  244. fn,
  245. )
  246. if err != nil {
  247. return fmt.Errorf("ClusterStorage: Write: %w", err)
  248. }
  249. return nil
  250. }
  251. func (c *ClusterStorage) Remove(path string) error {
  252. fn := func(resp *http.Response) error {
  253. return nil
  254. }
  255. args := map[string]string{
  256. "path": path,
  257. }
  258. err := c.makeRequest(
  259. http.MethodDelete,
  260. c.getURL("clusterStorage/remove", args),
  261. nil,
  262. fn,
  263. )
  264. if err != nil {
  265. return fmt.Errorf("ClusterStorage: Write: %w", err)
  266. }
  267. return nil
  268. }
  269. func (c *ClusterStorage) Exists(path string) (bool, error) {
  270. var jsonResp Response[bool]
  271. fn := func(resp *http.Response) error {
  272. err := json.NewDecoder(resp.Body).Decode(&jsonResp)
  273. if err != nil {
  274. return fmt.Errorf("failed to decode json: %w", err)
  275. }
  276. return nil
  277. }
  278. args := map[string]string{
  279. "path": path,
  280. }
  281. err := c.makeRequest(
  282. http.MethodGet,
  283. c.getURL("clusterStorage/exists", args),
  284. nil,
  285. fn,
  286. )
  287. if err != nil {
  288. return false, fmt.Errorf("ClusterStorage: Exists: %w", err)
  289. }
  290. return jsonResp.Data, nil
  291. }
  292. func (c *ClusterStorage) List(path string) ([]*StorageInfo, error) {
  293. var jsonResp Response[[]*StorageInfo]
  294. fn := func(resp *http.Response) error {
  295. err := json.NewDecoder(resp.Body).Decode(&jsonResp)
  296. if err != nil {
  297. return fmt.Errorf("failed to decode json: %w", err)
  298. }
  299. return nil
  300. }
  301. args := map[string]string{
  302. "path": path,
  303. }
  304. err := c.makeRequest(
  305. http.MethodGet,
  306. c.getURL("clusterStorage/list", args),
  307. nil,
  308. fn,
  309. )
  310. if err != nil {
  311. return nil, fmt.Errorf("ClusterStorage: List: %w", err)
  312. }
  313. return jsonResp.Data, nil
  314. }
  315. func (c *ClusterStorage) ListDirectories(path string) ([]*StorageInfo, error) {
  316. var jsonResp Response[[]*StorageInfo]
  317. fn := func(resp *http.Response) error {
  318. err := json.NewDecoder(resp.Body).Decode(&jsonResp)
  319. if err != nil {
  320. return fmt.Errorf("failed to decode json: %w", err)
  321. }
  322. return nil
  323. }
  324. args := map[string]string{
  325. "path": path,
  326. }
  327. err := c.makeRequest(
  328. http.MethodGet,
  329. c.getURL("clusterStorage/listDirectories", args),
  330. nil,
  331. fn,
  332. )
  333. if err != nil {
  334. return nil, fmt.Errorf("ClusterStorage: List Directories: %w", err)
  335. }
  336. // add '/' to the end of directory names to match other bucket storage types
  337. for _, info := range jsonResp.Data {
  338. info.Name = strings.TrimSuffix(info.Name, DirDelim) + DirDelim
  339. }
  340. return jsonResp.Data, nil
  341. }