| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114 |
- package customcost
- import (
- "fmt"
- "sync"
- "time"
- "github.com/opencost/opencost/core/pkg/model/pb"
- "golang.org/x/exp/maps"
- "google.golang.org/protobuf/proto"
- )
- // MemoryRepository is an implementation of Repository that uses a map keyed on config key and window start along with a
- // RWMutex to make it threadsafe
- type MemoryRepository struct {
- rwLock sync.RWMutex
- data map[string]map[time.Time][]byte
- }
- func NewMemoryRepository() *MemoryRepository {
- return &MemoryRepository{
- data: make(map[string]map[time.Time][]byte),
- }
- }
- func (m *MemoryRepository) Has(startTime time.Time, domain string) (bool, error) {
- m.rwLock.RLock()
- defer m.rwLock.RUnlock()
- domainData, ok := m.data[domain]
- if !ok {
- return false, nil
- }
- _, ook := domainData[startTime.UTC()]
- return ook, nil
- }
- func (m *MemoryRepository) Get(startTime time.Time, domain string) (*pb.CustomCostResponse, error) {
- m.rwLock.RLock()
- defer m.rwLock.RUnlock()
- domainData, ok := m.data[domain]
- if !ok {
- return &pb.CustomCostResponse{}, nil
- }
- b, ook := domainData[startTime.UTC()]
- if !ook {
- return &pb.CustomCostResponse{}, nil
- }
- ccr := &pb.CustomCostResponse{}
- err := proto.Unmarshal(b, ccr)
- if err != nil {
- return nil, fmt.Errorf("error unmarshalling data: %w", err)
- }
- return ccr, nil
- }
- func (m *MemoryRepository) Keys() ([]string, error) {
- m.rwLock.RLock()
- defer m.rwLock.RUnlock()
- keys := maps.Keys(m.data)
- return keys, nil
- }
- func (m *MemoryRepository) Put(ccr *pb.CustomCostResponse) error {
- m.rwLock.Lock()
- defer m.rwLock.Unlock()
- if ccr == nil {
- return fmt.Errorf("MemoryRepository: Put: cannot save nil")
- }
- if ccr.Start == nil || ccr.End == nil {
- return fmt.Errorf("MemoryRepository: Put: custom cost response has invalid window")
- }
- if ccr.GetDomain() == "" {
- return fmt.Errorf("MemoryRepository: Put: custom cost response does not have a domain value")
- }
- if _, ok := m.data[ccr.GetDomain()]; !ok {
- m.data[ccr.GetDomain()] = make(map[time.Time][]byte)
- }
- b, err := proto.Marshal(ccr)
- if err != nil {
- return fmt.Errorf("MemoryRepository: Put: custom cost could not be marshalled")
- }
- m.data[ccr.GetDomain()][ccr.Start.AsTime().UTC()] = b
- return nil
- }
- // Expire deletes all items in the map with a start time before the given limit
- func (m *MemoryRepository) Expire(limit time.Time) error {
- m.rwLock.Lock()
- defer m.rwLock.Unlock()
- for key, integration := range m.data {
- for startTime := range integration {
- if startTime.Before(limit) {
- delete(integration, startTime)
- }
- }
- // remove integration if it is now empty
- if len(integration) == 0 {
- delete(m.data, key)
- }
- }
- return nil
- }
|