2
0

memoryrepository.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. package customcost
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. "github.com/opencost/opencost/core/pkg/model/pb"
  7. "golang.org/x/exp/maps"
  8. "google.golang.org/protobuf/proto"
  9. )
  10. // MemoryRepository is an implementation of Repository that uses a map keyed on config key and window start along with a
  11. // RWMutex to make it threadsafe
  12. type MemoryRepository struct {
  13. rwLock sync.RWMutex
  14. data map[string]map[time.Time][]byte
  15. }
  16. func NewMemoryRepository() *MemoryRepository {
  17. return &MemoryRepository{
  18. data: make(map[string]map[time.Time][]byte),
  19. }
  20. }
  21. func (m *MemoryRepository) Has(startTime time.Time, domain string) (bool, error) {
  22. m.rwLock.RLock()
  23. defer m.rwLock.RUnlock()
  24. domainData, ok := m.data[domain]
  25. if !ok {
  26. return false, nil
  27. }
  28. _, ook := domainData[startTime.UTC()]
  29. return ook, nil
  30. }
  31. func (m *MemoryRepository) Get(startTime time.Time, domain string) (*pb.CustomCostResponse, error) {
  32. m.rwLock.RLock()
  33. defer m.rwLock.RUnlock()
  34. domainData, ok := m.data[domain]
  35. if !ok {
  36. return &pb.CustomCostResponse{}, nil
  37. }
  38. b, ook := domainData[startTime.UTC()]
  39. if !ook {
  40. return &pb.CustomCostResponse{}, nil
  41. }
  42. ccr := &pb.CustomCostResponse{}
  43. err := proto.Unmarshal(b, ccr)
  44. if err != nil {
  45. return nil, fmt.Errorf("error unmarshalling data: %w", err)
  46. }
  47. return ccr, nil
  48. }
  49. func (m *MemoryRepository) Keys() ([]string, error) {
  50. m.rwLock.RLock()
  51. defer m.rwLock.RUnlock()
  52. keys := maps.Keys(m.data)
  53. return keys, nil
  54. }
  55. func (m *MemoryRepository) Put(ccr *pb.CustomCostResponse) error {
  56. m.rwLock.Lock()
  57. defer m.rwLock.Unlock()
  58. if ccr == nil {
  59. return fmt.Errorf("MemoryRepository: Put: cannot save nil")
  60. }
  61. if ccr.Start == nil || ccr.End == nil {
  62. return fmt.Errorf("MemoryRepository: Put: custom cost response has invalid window")
  63. }
  64. if ccr.GetDomain() == "" {
  65. return fmt.Errorf("MemoryRepository: Put: custom cost response does not have a domain value")
  66. }
  67. if _, ok := m.data[ccr.GetDomain()]; !ok {
  68. m.data[ccr.GetDomain()] = make(map[time.Time][]byte)
  69. }
  70. b, err := proto.Marshal(ccr)
  71. if err != nil {
  72. return fmt.Errorf("MemoryRepository: Put: custom cost could not be marshalled")
  73. }
  74. m.data[ccr.GetDomain()][ccr.Start.AsTime().UTC()] = b
  75. return nil
  76. }
  77. // Expire deletes all items in the map with a start time before the given limit
  78. func (m *MemoryRepository) Expire(limit time.Time) error {
  79. m.rwLock.Lock()
  80. defer m.rwLock.Unlock()
  81. for key, integration := range m.data {
  82. for startTime := range integration {
  83. if startTime.Before(limit) {
  84. delete(integration, startTime)
  85. }
  86. }
  87. // remove integration if it is now empty
  88. if len(integration) == 0 {
  89. delete(m.data, key)
  90. }
  91. }
  92. return nil
  93. }