eventpath.go 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. package pathing
  2. import (
  3. "fmt"
  4. "path"
  5. "time"
  6. )
  7. // 2006-01-02T15:04:05Z07:00
  8. // EventStorageTimeFormat is YYYYMMDDHHmmss
  9. const EventStorageTimeFormat = "20060102150405"
  10. // EventStoragePathFormatter is an implementation of the StoragePathFormatter interface for
  11. // a cluster separated storage path of the format:
  12. //
  13. // <root>/federated/<cluster>/<event>/<sub-paths...>/YYYYMMDDHHmmss
  14. type EventStoragePathFormatter struct {
  15. rootDir string
  16. clusterId string
  17. event string
  18. subPaths []string
  19. }
  20. // NewBingenStoragePathFormatter creates a StoragePathFormatter for a cluster separated storage path
  21. // with the given root directory, cluster id, pipeline, and resolution. To omit the resolution directory
  22. // structure, provide a `nil` resolution.
  23. func NewEventStoragePathFormatter(rootDir, clusterId, event string, subPaths ...string) (StoragePathFormatter[time.Time], error) {
  24. if clusterId == "" {
  25. return nil, fmt.Errorf("cluster id cannot be empty")
  26. }
  27. if event == "" {
  28. return nil, fmt.Errorf("event cannot be empty")
  29. }
  30. for _, subPath := range subPaths {
  31. if subPath == "" {
  32. return nil, fmt.Errorf("subpaths cannot be empty")
  33. }
  34. }
  35. return &EventStoragePathFormatter{
  36. rootDir: rootDir,
  37. clusterId: clusterId,
  38. event: event,
  39. subPaths: subPaths,
  40. }, nil
  41. }
  42. // RootDir returns the root directory of the storage path formatter.
  43. func (espf *EventStoragePathFormatter) RootDir() string {
  44. return espf.rootDir
  45. }
  46. // Dir returns the director that files will be placed in
  47. func (espf *EventStoragePathFormatter) Dir() string {
  48. return path.Join(
  49. espf.rootDir,
  50. espf.clusterId,
  51. espf.event,
  52. path.Join(espf.subPaths...),
  53. )
  54. }
  55. // ToFullPath returns the full path to a file name within the storage directory using the format:
  56. //
  57. // <root>/federated/<cluster>/<event>/YYYYMMDDHHmm.json
  58. func (espf *EventStoragePathFormatter) ToFullPath(prefix string, timestamp time.Time, fileExt string) string {
  59. fileName := toEventFileName(prefix, timestamp, fileExt)
  60. return path.Join(
  61. espf.rootDir,
  62. espf.clusterId,
  63. espf.event,
  64. path.Join(espf.subPaths...),
  65. fileName,
  66. )
  67. }
  68. // toEventFileName formats the file name as <prefix>.<timestamp>. if a non-empty fileExt is provided,
  69. // then the file extension is appended to the file name.
  70. func toEventFileName(prefix string, timestamp time.Time, fileExt string) string {
  71. suffix := timestamp.Format(EventStorageTimeFormat)
  72. if fileExt != "" {
  73. suffix = fmt.Sprintf("%s.%s", suffix, fileExt)
  74. }
  75. if prefix == "" {
  76. return suffix
  77. }
  78. return fmt.Sprintf("%s.%s", prefix, suffix)
  79. }