clusterstorage.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397
  1. package storage
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "net"
  7. "net/http"
  8. "net/url"
  9. "strings"
  10. "time"
  11. "github.com/opencost/opencost/core/pkg/log"
  12. "github.com/opencost/opencost/core/pkg/util/httputil"
  13. "github.com/opencost/opencost/core/pkg/util/json"
  14. "gopkg.in/yaml.v2"
  15. )
  16. var defaultClusterConfig = ClusterConfig{
  17. Host: "localhost",
  18. Port: 9006,
  19. HTTPConfig: HTTPConfig{
  20. IdleConnTimeout: 90 * time.Second,
  21. ResponseHeaderTimeout: 2 * time.Minute,
  22. TLSHandshakeTimeout: 10 * time.Second,
  23. ExpectContinueTimeout: 1 * time.Second,
  24. MaxIdleConns: 100,
  25. MaxIdleConnsPerHost: 100,
  26. MaxConnsPerHost: 0,
  27. },
  28. }
  29. // ClusterStorage is a Storage implementation which connects to a remote file storage over http
  30. type ClusterStorage struct {
  31. client *http.Client
  32. host string
  33. port int
  34. }
  35. type ClusterConfig struct {
  36. Host string `yaml:"host"`
  37. Port int `yaml:"port"`
  38. HTTPConfig HTTPConfig `yaml:"http_config"`
  39. }
  40. // parseConfig unmarshals a buffer into a Config with default HTTPConfig values.
  41. func parseClusterConfig(conf []byte) (ClusterConfig, error) {
  42. config := defaultClusterConfig
  43. if err := yaml.Unmarshal(conf, &config); err != nil {
  44. return ClusterConfig{}, err
  45. }
  46. return config, nil
  47. }
  48. func NewClusterStorage(conf []byte) (*ClusterStorage, error) {
  49. config, err := parseClusterConfig(conf)
  50. if err != nil {
  51. return nil, err
  52. }
  53. return NewClusterStorageWith(config)
  54. }
  55. // NewBucketWithConfig returns a new Bucket using the provided s3 config values.
  56. func NewClusterStorageWith(config ClusterConfig) (*ClusterStorage, error) {
  57. dt, err := config.HTTPConfig.GetHTTPTransport()
  58. if err != nil {
  59. return nil, fmt.Errorf("error creating transport: %w", err)
  60. }
  61. cs := &ClusterStorage{
  62. host: config.Host,
  63. port: config.Port,
  64. client: &http.Client{Transport: dt},
  65. }
  66. // Wait on cluster storage to respond before returning
  67. defaultWait := 5 * time.Second
  68. retry := 0
  69. maxTries := 5
  70. for {
  71. err := cs.check()
  72. if err == nil {
  73. break
  74. }
  75. log.Debugf("ClusterStorage: error connecting to cluster storage: %s", err.Error())
  76. if retry >= maxTries {
  77. return nil, fmt.Errorf("ClusterStorage: failed to connect to cluster storage after %d trys", maxTries)
  78. }
  79. waitTime := httputil.ExponentialBackoffWaitFor(defaultWait, retry)
  80. log.Infof("ClusterStorage: failed to connecting cluster storage. retry in %s", waitTime.String())
  81. time.Sleep(waitTime)
  82. retry++
  83. }
  84. return cs, nil
  85. }
  86. func (c *ClusterStorage) makeRequest(method, url string, body io.Reader, fn func(*http.Response) error) error {
  87. request, err := http.NewRequest(method, url, body)
  88. if err != nil {
  89. return fmt.Errorf("failed to build request: %w", err)
  90. }
  91. resp, err := c.client.Do(request)
  92. if err != nil {
  93. return fmt.Errorf("request failed: %w", err)
  94. }
  95. if resp.Body != nil {
  96. defer resp.Body.Close()
  97. }
  98. if !(resp.StatusCode >= 200 && resp.StatusCode <= 299) {
  99. if resp.Body != nil {
  100. var errResp Response[any]
  101. err = json.NewDecoder(resp.Body).Decode(&errResp)
  102. if err == nil {
  103. return fmt.Errorf("invalid response %d: %s", resp.StatusCode, errResp.Message)
  104. }
  105. }
  106. return fmt.Errorf("invalid response %d", resp.StatusCode)
  107. }
  108. if fn != nil {
  109. err = fn(resp)
  110. if err != nil {
  111. return fmt.Errorf("failed to handle response: %w", err)
  112. }
  113. }
  114. return nil
  115. }
  116. func (c *ClusterStorage) getURL(subpath string, args map[string]string) string {
  117. pathElems := strings.Split(subpath, "/")
  118. u := new(url.URL)
  119. u.Scheme = c.scheme()
  120. u.Host = net.JoinHostPort(c.host, fmt.Sprintf("%d", c.port))
  121. u = u.JoinPath(pathElems...)
  122. q := make(url.Values)
  123. for k, v := range args {
  124. q.Set(k, v)
  125. }
  126. rawQuery, _ := url.QueryUnescape(q.Encode())
  127. u.RawQuery = rawQuery
  128. return u.String() // <-- full URL string
  129. }
  130. func (c *ClusterStorage) check() error {
  131. err := c.makeRequest(
  132. http.MethodGet,
  133. c.getURL("healthz", nil),
  134. nil,
  135. nil,
  136. )
  137. if err != nil {
  138. return fmt.Errorf("ClusterStorage: failed health check: %w", err)
  139. }
  140. return nil
  141. }
  142. func (c *ClusterStorage) StorageType() StorageType {
  143. return StorageTypeCluster
  144. }
  145. func (c *ClusterStorage) scheme() string {
  146. if c.client.Transport != nil {
  147. if transport, ok := c.client.Transport.(*http.Transport); ok {
  148. if transport.TLSClientConfig != nil && !transport.TLSClientConfig.InsecureSkipVerify {
  149. return "https"
  150. }
  151. }
  152. }
  153. return "http"
  154. }
  155. func (c *ClusterStorage) FullPath(path string) string {
  156. var jsonResp Response[string]
  157. fn := func(resp *http.Response) error {
  158. err := json.NewDecoder(resp.Body).Decode(&jsonResp)
  159. if err != nil {
  160. return fmt.Errorf("failed to decode json: %w", err)
  161. }
  162. return nil
  163. }
  164. args := map[string]string{
  165. "path": path,
  166. }
  167. err := c.makeRequest(
  168. http.MethodGet,
  169. c.getURL("clusterStorage/fullPath", args),
  170. nil,
  171. fn,
  172. )
  173. if err != nil {
  174. log.Errorf("ClusterStorage: FullPath: %s", err.Error())
  175. }
  176. return jsonResp.Data
  177. }
  178. type Response[T any] struct {
  179. Code int `json:"code"`
  180. Data T `json:"data"`
  181. Message string `json:"message"`
  182. }
  183. func (c *ClusterStorage) Stat(path string) (*StorageInfo, error) {
  184. var jsonResp Response[*StorageInfo]
  185. fn := func(resp *http.Response) error {
  186. err := json.NewDecoder(resp.Body).Decode(&jsonResp)
  187. if err != nil {
  188. return fmt.Errorf("failed to decode json: %w", err)
  189. }
  190. return nil
  191. }
  192. args := map[string]string{
  193. "path": path,
  194. }
  195. err := c.makeRequest(
  196. http.MethodGet,
  197. c.getURL("clusterStorage/stat", args),
  198. nil,
  199. fn,
  200. )
  201. if err != nil {
  202. return nil, fmt.Errorf("ClusterStorage: Stat: %w", err)
  203. }
  204. return jsonResp.Data, nil
  205. }
  206. func (c *ClusterStorage) Read(path string) ([]byte, error) {
  207. var jsonResp Response[[]byte]
  208. fn := func(resp *http.Response) error {
  209. err := json.NewDecoder(resp.Body).Decode(&jsonResp)
  210. if err != nil {
  211. return fmt.Errorf("failed to decode json: %w", err)
  212. }
  213. return nil
  214. }
  215. args := map[string]string{
  216. "path": path,
  217. }
  218. err := c.makeRequest(
  219. http.MethodGet,
  220. c.getURL("clusterStorage/read", args),
  221. nil,
  222. fn,
  223. )
  224. if err != nil {
  225. return nil, fmt.Errorf("ClusterStorage: Read: %w", err)
  226. }
  227. return jsonResp.Data, nil
  228. }
  229. func (c *ClusterStorage) Write(path string, data []byte) error {
  230. fn := func(resp *http.Response) error {
  231. return nil
  232. }
  233. args := map[string]string{
  234. "path": path,
  235. }
  236. err := c.makeRequest(
  237. http.MethodPut,
  238. c.getURL("clusterStorage/write", args),
  239. bytes.NewReader(data),
  240. fn,
  241. )
  242. if err != nil {
  243. return fmt.Errorf("ClusterStorage: Write: %w", err)
  244. }
  245. return nil
  246. }
  247. func (c *ClusterStorage) Remove(path string) error {
  248. fn := func(resp *http.Response) error {
  249. return nil
  250. }
  251. args := map[string]string{
  252. "path": path,
  253. }
  254. err := c.makeRequest(
  255. http.MethodDelete,
  256. c.getURL("clusterStorage/remove", args),
  257. nil,
  258. fn,
  259. )
  260. if err != nil {
  261. return fmt.Errorf("ClusterStorage: Write: %w", err)
  262. }
  263. return nil
  264. }
  265. func (c *ClusterStorage) Exists(path string) (bool, error) {
  266. var jsonResp Response[bool]
  267. fn := func(resp *http.Response) error {
  268. err := json.NewDecoder(resp.Body).Decode(&jsonResp)
  269. if err != nil {
  270. return fmt.Errorf("failed to decode json: %w", err)
  271. }
  272. return nil
  273. }
  274. args := map[string]string{
  275. "path": path,
  276. }
  277. err := c.makeRequest(
  278. http.MethodGet,
  279. c.getURL("clusterStorage/exists", args),
  280. nil,
  281. fn,
  282. )
  283. if err != nil {
  284. return false, fmt.Errorf("ClusterStorage: Exists: %w", err)
  285. }
  286. return jsonResp.Data, nil
  287. }
  288. func (c *ClusterStorage) List(path string) ([]*StorageInfo, error) {
  289. var jsonResp Response[[]*StorageInfo]
  290. fn := func(resp *http.Response) error {
  291. err := json.NewDecoder(resp.Body).Decode(&jsonResp)
  292. if err != nil {
  293. return fmt.Errorf("failed to decode json: %w", err)
  294. }
  295. return nil
  296. }
  297. args := map[string]string{
  298. "path": path,
  299. }
  300. err := c.makeRequest(
  301. http.MethodGet,
  302. c.getURL("clusterStorage/list", args),
  303. nil,
  304. fn,
  305. )
  306. if err != nil {
  307. return nil, fmt.Errorf("ClusterStorage: List: %w", err)
  308. }
  309. return jsonResp.Data, nil
  310. }
  311. func (c *ClusterStorage) ListDirectories(path string) ([]*StorageInfo, error) {
  312. var jsonResp Response[[]*StorageInfo]
  313. fn := func(resp *http.Response) error {
  314. err := json.NewDecoder(resp.Body).Decode(&jsonResp)
  315. if err != nil {
  316. return fmt.Errorf("failed to decode json: %w", err)
  317. }
  318. return nil
  319. }
  320. args := map[string]string{
  321. "path": path,
  322. }
  323. err := c.makeRequest(
  324. http.MethodGet,
  325. c.getURL("clusterStorage/listDirectories", args),
  326. nil,
  327. fn,
  328. )
  329. if err != nil {
  330. return nil, fmt.Errorf("ClusterStorage: List Directories: %w", err)
  331. }
  332. // add '/' to the end of directory names to match other bucket storage types
  333. for _, info := range jsonResp.Data {
  334. info.Name = strings.TrimSuffix(info.Name, DirDelim) + DirDelim
  335. }
  336. return jsonResp.Data, nil
  337. }