clusterstorage.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419
  1. package storage
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "net"
  7. "net/http"
  8. "net/url"
  9. "strings"
  10. "time"
  11. "github.com/opencost/opencost/core/pkg/log"
  12. "github.com/opencost/opencost/core/pkg/util/httputil"
  13. "github.com/opencost/opencost/core/pkg/util/json"
  14. "gopkg.in/yaml.v2"
  15. )
  16. var defaultClusterConfig = ClusterConfig{
  17. Host: "localhost",
  18. Port: 9006,
  19. HTTPConfig: HTTPConfig{
  20. IdleConnTimeout: 90 * time.Second,
  21. ResponseHeaderTimeout: 2 * time.Minute,
  22. TLSHandshakeTimeout: 10 * time.Second,
  23. ExpectContinueTimeout: 1 * time.Second,
  24. MaxIdleConns: 100,
  25. MaxIdleConnsPerHost: 100,
  26. MaxConnsPerHost: 0,
  27. },
  28. }
  29. // ClusterStorage is a Storage implementation which connects to a remote file storage over http
  30. type ClusterStorage struct {
  31. client *http.Client
  32. host string
  33. port int
  34. }
  35. type ClusterConfig struct {
  36. Host string `yaml:"host"`
  37. Port int `yaml:"port"`
  38. HTTPConfig HTTPConfig `yaml:"http_config"`
  39. }
  40. // parseConfig unmarshals a buffer into a Config with default HTTPConfig values.
  41. func parseClusterConfig(conf []byte) (ClusterConfig, error) {
  42. config := defaultClusterConfig
  43. if err := yaml.Unmarshal(conf, &config); err != nil {
  44. return ClusterConfig{}, err
  45. }
  46. return config, nil
  47. }
  48. func NewClusterStorage(conf []byte) (*ClusterStorage, error) {
  49. config, err := parseClusterConfig(conf)
  50. if err != nil {
  51. return nil, err
  52. }
  53. return NewClusterStorageWith(config)
  54. }
  55. // NewBucketWithConfig returns a new Bucket using the provided s3 config values.
  56. func NewClusterStorageWith(config ClusterConfig) (*ClusterStorage, error) {
  57. dt, err := config.HTTPConfig.GetHTTPTransport()
  58. if err != nil {
  59. return nil, fmt.Errorf("error creating transport: %w", err)
  60. }
  61. cs := &ClusterStorage{
  62. host: config.Host,
  63. port: config.Port,
  64. client: &http.Client{Transport: dt},
  65. }
  66. // Wait on cluster storage to respond before returning
  67. defaultWait := 5 * time.Second
  68. retry := 0
  69. maxTries := 5
  70. for {
  71. err := cs.check()
  72. if err == nil {
  73. break
  74. }
  75. log.Debugf("ClusterStorage: error connecting to cluster storage: %s", err.Error())
  76. if retry >= maxTries {
  77. return nil, fmt.Errorf("ClusterStorage: failed to connect to cluster storage after %d trys", maxTries)
  78. }
  79. waitTime := httputil.ExponentialBackoffWaitFor(defaultWait, retry)
  80. log.Infof("ClusterStorage: failed to connecting cluster storage. retry in %s", waitTime.String())
  81. time.Sleep(waitTime)
  82. retry++
  83. }
  84. log.Debugf("ClusterStorage: New cluster storage client initialized with '%s://%s:%d'", cs.scheme(), config.Host, config.Port)
  85. return cs, nil
  86. }
  87. func (c *ClusterStorage) makeRequest(method, url string, body io.Reader, fn func(*http.Response) error) error {
  88. request, err := http.NewRequest(method, url, body)
  89. if err != nil {
  90. return fmt.Errorf("failed to build request: %w", err)
  91. }
  92. resp, err := c.client.Do(request)
  93. if err != nil {
  94. return fmt.Errorf("request failed: %w", err)
  95. }
  96. if resp.Body != nil {
  97. defer resp.Body.Close()
  98. }
  99. if !(resp.StatusCode >= 200 && resp.StatusCode <= 299) {
  100. if resp.Body != nil {
  101. var errResp Response[any]
  102. err = json.NewDecoder(resp.Body).Decode(&errResp)
  103. if err == nil {
  104. return fmt.Errorf("invalid response %d: %s", resp.StatusCode, errResp.Message)
  105. }
  106. }
  107. return fmt.Errorf("invalid response %d", resp.StatusCode)
  108. }
  109. if fn != nil {
  110. err = fn(resp)
  111. if err != nil {
  112. return fmt.Errorf("failed to handle response: %w", err)
  113. }
  114. }
  115. return nil
  116. }
  117. func (c *ClusterStorage) getURL(subpath string, args map[string]string) string {
  118. pathElems := strings.Split(subpath, "/")
  119. u := new(url.URL)
  120. u.Scheme = c.scheme()
  121. u.Host = net.JoinHostPort(c.host, fmt.Sprintf("%d", c.port))
  122. u = u.JoinPath(pathElems...)
  123. q := make(url.Values)
  124. for k, v := range args {
  125. q.Set(k, v)
  126. }
  127. rawQuery, _ := url.QueryUnescape(q.Encode())
  128. u.RawQuery = rawQuery
  129. return u.String() // <-- full URL string
  130. }
  131. func (c *ClusterStorage) check() error {
  132. err := c.makeRequest(
  133. http.MethodGet,
  134. c.getURL("healthz", nil),
  135. nil,
  136. nil,
  137. )
  138. if err != nil {
  139. return fmt.Errorf("ClusterStorage: failed health check: %w", err)
  140. }
  141. return nil
  142. }
  143. // String returns the host and port for the cluster storage.
  144. func (c *ClusterStorage) String() string {
  145. return fmt.Sprintf("%s:%d", c.host, c.port)
  146. }
  147. func (c *ClusterStorage) StorageType() StorageType {
  148. return StorageTypeCluster
  149. }
  150. // scheme returns the protocol scheme (http or https) based on TLS configuration
  151. func (c *ClusterStorage) scheme() string {
  152. if c.client.Transport != nil {
  153. if transport, ok := c.client.Transport.(*http.Transport); ok {
  154. if transport.TLSClientConfig != nil && !transport.TLSClientConfig.InsecureSkipVerify {
  155. return "https"
  156. }
  157. }
  158. }
  159. return "http"
  160. }
  161. func (c *ClusterStorage) FullPath(path string) string {
  162. var jsonResp Response[string]
  163. fn := func(resp *http.Response) error {
  164. err := json.NewDecoder(resp.Body).Decode(&jsonResp)
  165. if err != nil {
  166. return fmt.Errorf("failed to decode json: %w", err)
  167. }
  168. return nil
  169. }
  170. args := map[string]string{
  171. "path": path,
  172. }
  173. err := c.makeRequest(
  174. http.MethodGet,
  175. c.getURL("clusterStorage/fullPath", args),
  176. nil,
  177. fn,
  178. )
  179. if err != nil {
  180. log.Errorf("ClusterStorage: FullPath: %s", err.Error())
  181. }
  182. return jsonResp.Data
  183. }
  184. type Response[T any] struct {
  185. Code int `json:"code"`
  186. Data T `json:"data"`
  187. Message string `json:"message"`
  188. }
  189. func (c *ClusterStorage) Stat(path string) (*StorageInfo, error) {
  190. log.Debugf("ClusterStorage::Stat::%s(%s)", strings.ToUpper(c.scheme()), path)
  191. var jsonResp Response[*StorageInfo]
  192. fn := func(resp *http.Response) error {
  193. err := json.NewDecoder(resp.Body).Decode(&jsonResp)
  194. if err != nil {
  195. return fmt.Errorf("failed to decode json: %w", err)
  196. }
  197. return nil
  198. }
  199. args := map[string]string{
  200. "path": path,
  201. }
  202. err := c.makeRequest(
  203. http.MethodGet,
  204. c.getURL("clusterStorage/stat", args),
  205. nil,
  206. fn,
  207. )
  208. if err != nil {
  209. return nil, fmt.Errorf("ClusterStorage: Stat: %w", err)
  210. }
  211. return jsonResp.Data, nil
  212. }
  213. func (c *ClusterStorage) Read(path string) ([]byte, error) {
  214. log.Debugf("ClusterStorage::Read::%s(%s)", strings.ToUpper(c.scheme()), path)
  215. var jsonResp Response[[]byte]
  216. fn := func(resp *http.Response) error {
  217. err := json.NewDecoder(resp.Body).Decode(&jsonResp)
  218. if err != nil {
  219. return fmt.Errorf("failed to decode json: %w", err)
  220. }
  221. return nil
  222. }
  223. args := map[string]string{
  224. "path": path,
  225. }
  226. err := c.makeRequest(
  227. http.MethodGet,
  228. c.getURL("clusterStorage/read", args),
  229. nil,
  230. fn,
  231. )
  232. if err != nil {
  233. return nil, fmt.Errorf("ClusterStorage: Read: %w", err)
  234. }
  235. return jsonResp.Data, nil
  236. }
  237. func (c *ClusterStorage) Write(path string, data []byte) error {
  238. log.Debugf("ClusterStorage::Write::%s(%s)", strings.ToUpper(c.scheme()), path)
  239. fn := func(resp *http.Response) error {
  240. return nil
  241. }
  242. args := map[string]string{
  243. "path": path,
  244. }
  245. err := c.makeRequest(
  246. http.MethodPut,
  247. c.getURL("clusterStorage/write", args),
  248. bytes.NewReader(data),
  249. fn,
  250. )
  251. if err != nil {
  252. return fmt.Errorf("ClusterStorage: Write: %w", err)
  253. }
  254. return nil
  255. }
  256. func (c *ClusterStorage) Remove(path string) error {
  257. log.Debugf("ClusterStorage::Remove::%s(%s)", strings.ToUpper(c.scheme()), path)
  258. fn := func(resp *http.Response) error {
  259. return nil
  260. }
  261. args := map[string]string{
  262. "path": path,
  263. }
  264. err := c.makeRequest(
  265. http.MethodDelete,
  266. c.getURL("clusterStorage/remove", args),
  267. nil,
  268. fn,
  269. )
  270. if err != nil {
  271. return fmt.Errorf("ClusterStorage: Write: %w", err)
  272. }
  273. return nil
  274. }
  275. func (c *ClusterStorage) Exists(path string) (bool, error) {
  276. log.Debugf("ClusterStorage::Exists::%s(%s)", strings.ToUpper(c.scheme()), path)
  277. var jsonResp Response[bool]
  278. fn := func(resp *http.Response) error {
  279. err := json.NewDecoder(resp.Body).Decode(&jsonResp)
  280. if err != nil {
  281. return fmt.Errorf("failed to decode json: %w", err)
  282. }
  283. return nil
  284. }
  285. args := map[string]string{
  286. "path": path,
  287. }
  288. err := c.makeRequest(
  289. http.MethodGet,
  290. c.getURL("clusterStorage/exists", args),
  291. nil,
  292. fn,
  293. )
  294. if err != nil {
  295. return false, fmt.Errorf("ClusterStorage: Exists: %w", err)
  296. }
  297. return jsonResp.Data, nil
  298. }
  299. func (c *ClusterStorage) List(path string) ([]*StorageInfo, error) {
  300. log.Debugf("ClusterStorage::List::%s(%s)", strings.ToUpper(c.scheme()), path)
  301. var jsonResp Response[[]*StorageInfo]
  302. fn := func(resp *http.Response) error {
  303. err := json.NewDecoder(resp.Body).Decode(&jsonResp)
  304. if err != nil {
  305. return fmt.Errorf("failed to decode json: %w", err)
  306. }
  307. return nil
  308. }
  309. args := map[string]string{
  310. "path": path,
  311. }
  312. err := c.makeRequest(
  313. http.MethodGet,
  314. c.getURL("clusterStorage/list", args),
  315. nil,
  316. fn,
  317. )
  318. if err != nil {
  319. return nil, fmt.Errorf("ClusterStorage: List: %w", err)
  320. }
  321. return jsonResp.Data, nil
  322. }
  323. func (c *ClusterStorage) ListDirectories(path string) ([]*StorageInfo, error) {
  324. log.Debugf("ClusterStorage::ListDirectories::%s(%s)", strings.ToUpper(c.scheme()), path)
  325. var jsonResp Response[[]*StorageInfo]
  326. fn := func(resp *http.Response) error {
  327. err := json.NewDecoder(resp.Body).Decode(&jsonResp)
  328. if err != nil {
  329. return fmt.Errorf("failed to decode json: %w", err)
  330. }
  331. return nil
  332. }
  333. args := map[string]string{
  334. "path": path,
  335. }
  336. err := c.makeRequest(
  337. http.MethodGet,
  338. c.getURL("clusterStorage/listDirectories", args),
  339. nil,
  340. fn,
  341. )
  342. if err != nil {
  343. return nil, fmt.Errorf("ClusterStorage: List Directories: %w", err)
  344. }
  345. // add '/' to the end of directory names to match other bucket storage types
  346. for _, info := range jsonResp.Data {
  347. info.Name = strings.TrimSuffix(info.Name, DirDelim) + DirDelim
  348. }
  349. return jsonResp.Data, nil
  350. }