memorystorage.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. package storage
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "os"
  7. "path/filepath"
  8. "sync"
  9. "github.com/opencost/opencost/core/pkg/log"
  10. "github.com/opencost/opencost/core/pkg/storage/memfile"
  11. )
  12. // MemoryStorage is a thread-safe in-memory file system storage implementation. It can be used for testing storage.Storage dependents
  13. // or to serve as a lightweight storage implementation within a production system.
  14. type MemoryStorage struct {
  15. lock sync.Mutex
  16. directPaths map[string]*memfile.MemoryFile
  17. fileTree *memfile.MemoryDirectory
  18. }
  19. // NewMemoryStorage creates a new in-memory file system storage implementation.
  20. func NewMemoryStorage() *MemoryStorage {
  21. return &MemoryStorage{
  22. directPaths: make(map[string]*memfile.MemoryFile),
  23. fileTree: memfile.NewMemoryDirectory(""),
  24. }
  25. }
  26. // String returns the storage type as a string for logging purposes.
  27. func (ms *MemoryStorage) String() string {
  28. return string(ms.StorageType())
  29. }
  30. // StorageType returns a string identifier for the type of storage used by the implementation.
  31. func (ms *MemoryStorage) StorageType() StorageType {
  32. return StorageTypeMemory
  33. }
  34. // FullPath returns the storage working path combined with the path provided
  35. func (ms *MemoryStorage) FullPath(path string) string {
  36. return path
  37. }
  38. // Stat returns the StorageStats for the specific path.
  39. func (ms *MemoryStorage) Stat(path string) (*StorageInfo, error) {
  40. ms.lock.Lock()
  41. defer ms.lock.Unlock()
  42. path = filepath.Clean(path)
  43. if file, ok := ms.directPaths[path]; ok {
  44. return &StorageInfo{
  45. Name: file.Name,
  46. Size: file.Size(),
  47. ModTime: file.ModTime,
  48. }, nil
  49. }
  50. return nil, DoesNotExistError
  51. }
  52. // Read uses the relative path of the storage combined with the provided path to
  53. // read the contents.
  54. func (ms *MemoryStorage) Read(path string) ([]byte, error) {
  55. ms.lock.Lock()
  56. defer ms.lock.Unlock()
  57. path = filepath.Clean(path)
  58. if file, ok := ms.directPaths[path]; ok {
  59. return file.Contents, nil
  60. }
  61. return nil, DoesNotExistError
  62. }
  63. // ReadStream returns a streaming reader for the specified in-memory object.
  64. func (ms *MemoryStorage) ReadStream(path string) (io.ReadCloser, error) {
  65. ms.lock.Lock()
  66. defer ms.lock.Unlock()
  67. path = filepath.Clean(path)
  68. if file, ok := ms.directPaths[path]; ok {
  69. data := append([]byte(nil), file.Contents...)
  70. return io.NopCloser(bytes.NewReader(data)), nil
  71. }
  72. return nil, DoesNotExistError
  73. }
  74. // ReadToLocalFile writes the specified object at path to destPath on the local file system.
  75. func (ms *MemoryStorage) ReadToLocalFile(path, destPath string) error {
  76. ms.lock.Lock()
  77. path = filepath.Clean(path)
  78. file, ok := ms.directPaths[path]
  79. if !ok {
  80. ms.lock.Unlock()
  81. return DoesNotExistError
  82. }
  83. // Copy the contents so we can release the lock before doing potentially slow disk IO.
  84. data := append([]byte(nil), file.Contents...)
  85. ms.lock.Unlock()
  86. dir := filepath.Dir(destPath)
  87. if err := os.MkdirAll(dir, os.ModePerm); err != nil {
  88. return fmt.Errorf("MemoryStorage: ReadToLocalFile: creating destination directory: %w", err)
  89. }
  90. if err := os.WriteFile(destPath, data, 0600); err != nil {
  91. return fmt.Errorf("MemoryStorage: ReadToLocalFile: writing destination file: %w", err)
  92. }
  93. return nil
  94. }
  95. // Write uses the relative path of the storage combined with the provided path
  96. // to write a new file or overwrite an existing file.
  97. func (ms *MemoryStorage) Write(path string, data []byte) error {
  98. ms.lock.Lock()
  99. defer ms.lock.Unlock()
  100. paths, pFile := memfile.Split(path)
  101. f := memfile.NewMemoryFile(pFile, data)
  102. currentDir := memfile.CreateSubdirectory(ms.fileTree, paths)
  103. currentDir.AddFile(f)
  104. ms.directPaths[path] = f
  105. return nil
  106. }
  107. // WriteStream creates a new relative path and returns the io.WriteCloser that can be used to
  108. // write into the storage path. Close() blocks until all data has been committed to storage.
  109. func (ms *MemoryStorage) WriteStream(path string) (io.WriteCloser, error) {
  110. r, w := io.Pipe()
  111. var wg sync.WaitGroup
  112. wg.Go(func() {
  113. data, err := io.ReadAll(r)
  114. if err != nil {
  115. r.CloseWithError(err)
  116. return
  117. }
  118. ms.lock.Lock()
  119. defer ms.lock.Unlock()
  120. paths, pFile := memfile.Split(path)
  121. f := memfile.NewMemoryFile(pFile, data)
  122. currentDir := memfile.CreateSubdirectory(ms.fileTree, paths)
  123. currentDir.AddFile(f)
  124. ms.directPaths[path] = f
  125. })
  126. return &memWriteCloser{
  127. PipeWriter: w,
  128. wg: &wg,
  129. }, nil
  130. }
  131. // memWriteCloser wraps *io.PipeWriter so that Close() blocks until the drain goroutine
  132. // has finished committing data to the in-memory store.
  133. type memWriteCloser struct {
  134. *io.PipeWriter
  135. wg *sync.WaitGroup
  136. }
  137. func (m *memWriteCloser) Close() error {
  138. err := m.PipeWriter.Close()
  139. m.wg.Wait()
  140. return err
  141. }
  142. // Remove uses the relative path of the storage combined with the provided path to
  143. // remove a file from storage permanently.
  144. func (ms *MemoryStorage) Remove(path string) error {
  145. ms.lock.Lock()
  146. defer ms.lock.Unlock()
  147. path = filepath.Clean(path)
  148. paths, pFile := memfile.Split(path)
  149. currentDir, err := memfile.FindSubdirectory(ms.fileTree, paths)
  150. if err != nil {
  151. return fmt.Errorf("file not found: %s - %w", path, DoesNotExistError)
  152. }
  153. currentDir.RemoveFile(pFile)
  154. delete(ms.directPaths, path)
  155. return nil
  156. }
  157. // Exists uses the relative path of the storage combined with the provided path to
  158. // determine if the file exists.
  159. func (ms *MemoryStorage) Exists(path string) (bool, error) {
  160. ms.lock.Lock()
  161. defer ms.lock.Unlock()
  162. path = filepath.Clean(path)
  163. _, ok := ms.directPaths[path]
  164. return ok, nil
  165. }
  166. // List uses the relative path of the storage combined with the provided path to return
  167. // storage information for the files.
  168. func (ms *MemoryStorage) List(path string) ([]*StorageInfo, error) {
  169. ms.lock.Lock()
  170. defer ms.lock.Unlock()
  171. paths := memfile.SplitPaths(path)
  172. currentDir, err := memfile.FindSubdirectory(ms.fileTree, paths)
  173. if err != nil {
  174. // contract for bucket storages returns an empty list in this case
  175. // so just log a warning, and return an empty list
  176. log.Warnf("failed to resolve path: %s - %s", path, err)
  177. return []*StorageInfo{}, nil
  178. }
  179. storageInfos := make([]*StorageInfo, 0, currentDir.FileCount())
  180. for f := range currentDir.Files() {
  181. storageInfos = append(storageInfos, &StorageInfo{
  182. Name: f.Name,
  183. Size: f.Size(),
  184. ModTime: f.ModTime,
  185. })
  186. }
  187. return storageInfos, nil
  188. }
  189. // ListDirectories uses the relative path of the storage combined with the provided path
  190. // to return storage information for only directories contained along the path. This
  191. // functions as List, but returns storage information for only directories.
  192. func (ms *MemoryStorage) ListDirectories(path string) ([]*StorageInfo, error) {
  193. ms.lock.Lock()
  194. defer ms.lock.Unlock()
  195. paths := memfile.SplitPaths(path)
  196. currentDir, err := memfile.FindSubdirectory(ms.fileTree, paths)
  197. if err != nil {
  198. // contract for bucket storages returns an empty list in this case
  199. // so just log a warning, and return an empty list
  200. log.Warnf("failed to resolve path: %s - %s", path, err)
  201. return []*StorageInfo{}, nil
  202. }
  203. storageInfos := make([]*StorageInfo, 0, currentDir.DirCount())
  204. for d := range currentDir.Directories() {
  205. storageInfos = append(storageInfos, &StorageInfo{
  206. Name: filepath.Join(append(paths, d.Name)...) + "/",
  207. Size: d.Size(),
  208. ModTime: d.ModTime,
  209. })
  210. }
  211. return storageInfos, nil
  212. }