2
0

clusterstorage_test.go 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. package storage
  2. import (
  3. "bytes"
  4. "crypto/tls"
  5. "encoding/json"
  6. "io"
  7. "net/http"
  8. "net/http/httptest"
  9. "net/url"
  10. "os"
  11. "path/filepath"
  12. "strconv"
  13. "strings"
  14. "testing"
  15. )
  16. // TestClusterStorage_scheme tests the scheme() method returns correct values based on TLS configuration
  17. func TestClusterStorage_scheme(t *testing.T) {
  18. tests := []struct {
  19. name string
  20. transport http.RoundTripper
  21. want string
  22. }{
  23. {
  24. name: "nil transport returns http",
  25. transport: nil,
  26. want: "http",
  27. },
  28. {
  29. name: "transport without TLS config returns http",
  30. transport: &http.Transport{},
  31. want: "http",
  32. },
  33. {
  34. name: "transport with TLS config returns https",
  35. transport: &http.Transport{
  36. TLSClientConfig: &tls.Config{},
  37. },
  38. want: "https",
  39. },
  40. {
  41. name: "transport with InsecureSkipVerify returns http",
  42. transport: &http.Transport{
  43. TLSClientConfig: &tls.Config{
  44. InsecureSkipVerify: true,
  45. },
  46. },
  47. want: "http",
  48. },
  49. }
  50. for _, tt := range tests {
  51. t.Run(tt.name, func(t *testing.T) {
  52. cs := &ClusterStorage{
  53. client: &http.Client{
  54. Transport: tt.transport,
  55. },
  56. }
  57. got := cs.scheme()
  58. if got != tt.want {
  59. t.Errorf("ClusterStorage.scheme() = %v, want %v", got, tt.want)
  60. }
  61. // Also test that strings.ToUpper(scheme()) works as expected in log statements
  62. gotUpper := strings.ToUpper(cs.scheme())
  63. wantUpper := strings.ToUpper(tt.want)
  64. if gotUpper != wantUpper {
  65. t.Errorf("strings.ToUpper(ClusterStorage.scheme()) = %v, want %v", gotUpper, wantUpper)
  66. }
  67. })
  68. }
  69. }
  70. func TestClusterStorage_ReadToLocalFile(t *testing.T) {
  71. expected := []byte("cluster-storage-contents")
  72. srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  73. if r.URL.Path != "/clusterStorage/read" {
  74. w.WriteHeader(http.StatusNotFound)
  75. return
  76. }
  77. resp := Response[[]byte]{
  78. Code: 0,
  79. Data: expected,
  80. }
  81. w.Header().Set("Content-Type", "application/json")
  82. _ = json.NewEncoder(w).Encode(resp)
  83. }))
  84. defer srv.Close()
  85. u, err := url.Parse(srv.URL)
  86. if err != nil {
  87. t.Fatalf("parsing test server URL: %s", err)
  88. }
  89. port, err := strconv.Atoi(u.Port())
  90. if err != nil {
  91. t.Fatalf("parsing test server port: %s", err)
  92. }
  93. cs := &ClusterStorage{
  94. client: &http.Client{},
  95. host: u.Hostname(),
  96. port: port,
  97. }
  98. destPath := filepath.Join(t.TempDir(), "out.bin")
  99. if err := cs.ReadToLocalFile("some/path", destPath); err != nil {
  100. t.Fatalf("ReadToLocalFile failed: %s", err)
  101. }
  102. data, err := os.ReadFile(destPath)
  103. if err != nil {
  104. t.Fatalf("reading destination file: %s", err)
  105. }
  106. if string(data) != string(expected) {
  107. t.Fatalf("destination file contents mismatch: got %q want %q", string(data), string(expected))
  108. }
  109. }
  110. func TestClusterStorage_ReadStream(t *testing.T) {
  111. expected := []byte("cluster-storage-stream-contents")
  112. srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  113. if r.URL.Path != "/clusterStorage/read" {
  114. w.WriteHeader(http.StatusNotFound)
  115. return
  116. }
  117. resp := Response[[]byte]{
  118. Code: 0,
  119. Data: expected,
  120. }
  121. w.Header().Set("Content-Type", "application/json")
  122. _ = json.NewEncoder(w).Encode(resp)
  123. }))
  124. defer srv.Close()
  125. u, err := url.Parse(srv.URL)
  126. if err != nil {
  127. t.Fatalf("parsing test server URL: %s", err)
  128. }
  129. port, err := strconv.Atoi(u.Port())
  130. if err != nil {
  131. t.Fatalf("parsing test server port: %s", err)
  132. }
  133. cs := &ClusterStorage{
  134. client: &http.Client{},
  135. host: u.Hostname(),
  136. port: port,
  137. }
  138. r, err := cs.ReadStream("some/path")
  139. if err != nil {
  140. t.Fatalf("ReadStream failed: %s", err)
  141. }
  142. defer r.Close()
  143. data, err := io.ReadAll(r)
  144. if err != nil {
  145. t.Fatalf("reading stream failed: %s", err)
  146. }
  147. if string(data) != string(expected) {
  148. t.Fatalf("stream contents mismatch: got %q want %q", string(data), string(expected))
  149. }
  150. }
  151. func TestClusterStorage_WriteStream(t *testing.T) {
  152. writeHandler := func(captured *[]byte) http.HandlerFunc {
  153. return func(w http.ResponseWriter, r *http.Request) {
  154. if r.Method != http.MethodPut || r.URL.Path != "/clusterStorage/write" {
  155. w.WriteHeader(http.StatusNotFound)
  156. return
  157. }
  158. var err error
  159. *captured, err = io.ReadAll(r.Body)
  160. if err != nil {
  161. w.WriteHeader(http.StatusInternalServerError)
  162. return
  163. }
  164. w.WriteHeader(http.StatusOK)
  165. }
  166. }
  167. t.Run("single chunk reaches server and Close returns nil", func(t *testing.T) {
  168. want := []byte("hello from stream")
  169. var received []byte
  170. srv := httptest.NewServer(writeHandler(&received))
  171. defer srv.Close()
  172. cs := newClusterStorageFromURL(t, srv.URL)
  173. w, err := cs.WriteStream("some/path")
  174. if err != nil {
  175. t.Fatalf("WriteStream: %s", err)
  176. }
  177. if _, err = w.Write(want); err != nil {
  178. _ = w.Close()
  179. t.Fatalf("Write: %s", err)
  180. }
  181. if err = w.Close(); err != nil {
  182. t.Fatalf("Close: %s", err)
  183. }
  184. // Checking received immediately after Close() proves it is synchronous.
  185. if !bytes.Equal(received, want) {
  186. t.Errorf("body mismatch: got %q, want %q", received, want)
  187. }
  188. })
  189. t.Run("multi-chunk write concatenates correctly", func(t *testing.T) {
  190. chunks := [][]byte{[]byte("alpha"), []byte("beta"), []byte("gamma")}
  191. want := bytes.Join(chunks, nil)
  192. var received []byte
  193. srv := httptest.NewServer(writeHandler(&received))
  194. defer srv.Close()
  195. cs := newClusterStorageFromURL(t, srv.URL)
  196. w, err := cs.WriteStream("some/path")
  197. if err != nil {
  198. t.Fatalf("WriteStream: %s", err)
  199. }
  200. for _, chunk := range chunks {
  201. if _, err = w.Write(chunk); err != nil {
  202. _ = w.Close()
  203. t.Fatalf("Write: %s", err)
  204. }
  205. }
  206. if err = w.Close(); err != nil {
  207. t.Fatalf("Close: %s", err)
  208. }
  209. if !bytes.Equal(received, want) {
  210. t.Errorf("body mismatch: got %q, want %q", received, want)
  211. }
  212. })
  213. t.Run("server error propagates through Close", func(t *testing.T) {
  214. srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  215. _, _ = io.Copy(io.Discard, r.Body)
  216. w.WriteHeader(http.StatusInternalServerError)
  217. }))
  218. defer srv.Close()
  219. cs := newClusterStorageFromURL(t, srv.URL)
  220. w, err := cs.WriteStream("some/path")
  221. if err != nil {
  222. t.Fatalf("WriteStream: %s", err)
  223. }
  224. _, _ = w.Write([]byte("data"))
  225. if err = w.Close(); err == nil {
  226. t.Fatalf("expected error from Close on server error, got nil")
  227. }
  228. })
  229. }
  230. // newClusterStorageFromURL constructs a ClusterStorage pointed at the given test server URL.
  231. func newClusterStorageFromURL(t *testing.T, rawURL string) *ClusterStorage {
  232. t.Helper()
  233. u, err := url.Parse(rawURL)
  234. if err != nil {
  235. t.Fatalf("parsing test server URL: %s", err)
  236. }
  237. port, err := strconv.Atoi(u.Port())
  238. if err != nil {
  239. t.Fatalf("parsing test server port: %s", err)
  240. }
  241. return &ClusterStorage{
  242. client: &http.Client{},
  243. host: u.Hostname(),
  244. port: port,
  245. }
  246. }