bingenpath.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. package pathing
  2. import (
  3. "fmt"
  4. "path"
  5. "time"
  6. "github.com/opencost/opencost/core/pkg/exporter/pathing/pathutils"
  7. "github.com/opencost/opencost/core/pkg/opencost"
  8. "github.com/opencost/opencost/core/pkg/util/timeutil"
  9. )
  10. const (
  11. baseStorageDir string = "etl/bingen"
  12. )
  13. // BingenStoragePathFormatter is an implementation of the StoragePathFormatter interface for
  14. // a cluster separated storage path of the format:
  15. //
  16. // <root>/federated/<cluster>/etl/bingen/<pipeline>/<resolution>/<epoch-start>-<epoch-end>
  17. type BingenStoragePathFormatter struct {
  18. rootDir string
  19. clusterId string
  20. pipeline string
  21. resolution string
  22. }
  23. // NewBingenStoragePathFormatter creates a StoragePathFormatter for a cluster separated storage path
  24. // with the given root directory, cluster id, pipeline, and resolution. To omit the resolution directory
  25. // structure, provide a `nil` resolution.
  26. func NewBingenStoragePathFormatter(rootDir, clusterId, pipeline string, resolution *time.Duration) (StoragePathFormatter[opencost.Window], error) {
  27. res := "."
  28. if resolution != nil {
  29. res = timeutil.FormatStoreResolution(*resolution)
  30. }
  31. if clusterId == "" {
  32. return nil, fmt.Errorf("cluster id cannot be empty")
  33. }
  34. if pipeline == "" {
  35. return nil, fmt.Errorf("pipeline cannot be empty")
  36. }
  37. return &BingenStoragePathFormatter{
  38. rootDir: rootDir,
  39. clusterId: clusterId,
  40. pipeline: pipeline,
  41. resolution: res,
  42. }, nil
  43. }
  44. // RootDir returns the root directory of the storage path formatter.
  45. func (bsf *BingenStoragePathFormatter) RootDir() string {
  46. return bsf.rootDir
  47. }
  48. // ToFullPath returns the full path to a file name within the storage directory using the format:
  49. //
  50. // <root>/federated/<cluster>/etl/bingen/<pipeline>/<resolution>/<prefix>.<start-epoch>-<end-epoch>
  51. func (bsf *BingenStoragePathFormatter) ToFullPath(prefix string, window opencost.Window, fileExt string) string {
  52. fileName := toBingenFileName(prefix, window, fileExt)
  53. return path.Join(
  54. bsf.rootDir,
  55. bsf.clusterId,
  56. baseStorageDir,
  57. bsf.pipeline,
  58. bsf.resolution,
  59. fileName,
  60. )
  61. }
  62. // toBingenFileName formats the file name as <prefix>.<start-epoch>-<end-epoch> if a prefix is non-empty.
  63. // If prefix is an empty string, then just the format <start-epoch>-<end-epoch> is returned.
  64. func toBingenFileName(prefix string, window opencost.Window, fileExt string) string {
  65. start, end := derefTimeOrZero(window.Start()), derefTimeOrZero(window.End())
  66. suffix := pathutils.FormatEpochRange(start, end)
  67. if fileExt != "" {
  68. suffix = fmt.Sprintf("%s.%s", suffix, fileExt)
  69. }
  70. if prefix == "" {
  71. return suffix
  72. }
  73. return fmt.Sprintf("%s.%s", prefix, suffix)
  74. }
  75. // derefTimeOrZero dereferences a time.Time pointer and returns the zero value if the pointer is nil.
  76. // This prevents nil pointer dereference errors when using windows. This is mostly an assertion, as
  77. // generally windows for pathing will be pre-validated.
  78. func derefTimeOrZero(t *time.Time) time.Time {
  79. if t == nil {
  80. return time.Time{}
  81. }
  82. return *t
  83. }