memoryrepository.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. package customcost
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. "github.com/opencost/opencost/core/pkg/model"
  7. "golang.org/x/exp/maps"
  8. )
  9. // MemoryRepository is an implementation of Repository that uses a map keyed on config key and window start along with a
  10. // RWMutex to make it threadsafe
  11. type MemoryRepository struct {
  12. rwLock sync.RWMutex
  13. data map[string]map[time.Time][]*model.CustomCostResponse
  14. }
  15. func NewMemoryRepository() *MemoryRepository {
  16. return &MemoryRepository{
  17. data: make(map[string]map[time.Time][]*model.CustomCostResponse),
  18. }
  19. }
  20. func (m *MemoryRepository) Has(startTime time.Time, domain string) (bool, error) {
  21. m.rwLock.RLock()
  22. defer m.rwLock.RUnlock()
  23. domainData, ok := m.data[domain]
  24. if !ok {
  25. return false, nil
  26. }
  27. _, ook := domainData[startTime.UTC()]
  28. return ook, nil
  29. }
  30. func (m *MemoryRepository) Get(startTime time.Time, domain string) ([]*model.CustomCostResponse, error) {
  31. m.rwLock.RLock()
  32. defer m.rwLock.RUnlock()
  33. domainData, ok := m.data[domain]
  34. if !ok {
  35. return nil, nil
  36. }
  37. ccr, ook := domainData[startTime.UTC()]
  38. if !ook {
  39. return nil, nil
  40. }
  41. clones := []*model.CustomCostResponse{}
  42. for _, cc := range ccr {
  43. clone := cc.Clone()
  44. clones = append(clones, &clone)
  45. }
  46. return clones, nil
  47. }
  48. func (m *MemoryRepository) Keys() ([]string, error) {
  49. m.rwLock.RLock()
  50. defer m.rwLock.RUnlock()
  51. keys := maps.Keys(m.data)
  52. return keys, nil
  53. }
  54. func (m *MemoryRepository) Put(ccr []*model.CustomCostResponse) error {
  55. m.rwLock.Lock()
  56. defer m.rwLock.Unlock()
  57. if ccr == nil {
  58. return fmt.Errorf("MemoryRepository: Put: cannot save nil")
  59. }
  60. for _, cc := range ccr {
  61. if cc.Window.IsOpen() {
  62. return fmt.Errorf("MemoryRepository: Put: custom cost response has invalid window %s", cc.Window.String())
  63. }
  64. if cc.GetDomain() == "" {
  65. return fmt.Errorf("MemoryRepository: Put: custom cost response does not have a domain value")
  66. }
  67. if _, ok := m.data[cc.GetDomain()]; !ok {
  68. m.data[cc.GetDomain()] = make(map[time.Time][]*model.CustomCostResponse)
  69. }
  70. m.data[cc.GetDomain()][cc.Window.Start().UTC()] = append(m.data[cc.GetDomain()][cc.Window.Start().UTC()], cc)
  71. }
  72. return nil
  73. }
  74. // Expire deletes all items in the map with a start time before the given limit
  75. func (m *MemoryRepository) Expire(limit time.Time) error {
  76. m.rwLock.Lock()
  77. defer m.rwLock.Unlock()
  78. for key, integration := range m.data {
  79. for startTime := range integration {
  80. if startTime.Before(limit) {
  81. delete(integration, startTime)
  82. }
  83. }
  84. // remove integration if it is now empty
  85. if len(integration) == 0 {
  86. delete(m.data, key)
  87. }
  88. }
  89. return nil
  90. }