Преглед на файлове

change repo API

Signed-off-by: Alex Meijer <ameijer@kubecost.com>
Alex Meijer преди 2 години
родител
ревизия
0c63d03197
променени са 4 файла, в които са добавени 37 реда и са изтрити 40 реда
  1. 15 13
      pkg/customcost/ingestor.go
  2. 18 24
      pkg/customcost/memoryrepository.go
  3. 2 1
      pkg/customcost/pipelineservice.go
  4. 2 2
      pkg/customcost/repository.go

+ 15 - 13
pkg/customcost/ingestor.go

@@ -107,12 +107,12 @@ func (ing *CustomCostIngestor) LoadWindow(start, end time.Time) {
 }
 
 func (ing *CustomCostIngestor) BuildWindow(start, end time.Time) {
-	// log.Infof("CloudCost[%s]: ingestor: building window %s", ing.key, opencost.NewWindow(&start, &end))
-	// ccsr, err := ing.integration.GetCloudCost(start, end)
-	// if err != nil {
-	// 	log.Errorf("CloudCost[%s]: ingestor: build failed for window %s: %s", ing.key, opencost.NewWindow(&start, &end), err.Error())
-	// 	return
-	// }
+	log.Infof("ingestor: building window %s", opencost.NewWindow(&start, &end))
+
+	// // build customCostRequest
+	// // make RPC call via plugin
+
+	// // loop through each customCostResponse, adding
 	// for _, ccs := range ccsr.CloudCostSets {
 	// 	log.Debugf("BuildWindow[%s]: GetCloudCost: writing cloud costs for window %s: %d", ccs.Integration, ccs.Window, len(ccs.CloudCosts))
 	// 	err2 := ing.repo.Put(ccs)
@@ -148,9 +148,12 @@ func (ing *CustomCostIngestor) Start(rebuild bool) {
 	if err != nil {
 		panic(err)
 	}
-	err = ing.repo.Put(resps)
-	if err != nil {
-		panic(err)
+
+	for _, resp := range resps {
+		err = ing.repo.Put(resp)
+		if err != nil {
+			panic(err)
+		}
 	}
 	//2024-02-27T01:00:00
 	target := time.Date(2024, 2, 27, 1, 0, 0, 0, time.UTC)
@@ -159,10 +162,9 @@ func (ing *CustomCostIngestor) Start(rebuild bool) {
 		panic(err)
 	}
 
-	for _, storedResp := range stored {
-		log.Debug("got stored object: ")
-		spew.Dump(storedResp)
-	}
+	log.Debug("got stored object: ")
+	spew.Dump(stored)
+
 }
 
 func (ing *CustomCostIngestor) Stop() {

+ 18 - 24
pkg/customcost/memoryrepository.go

@@ -13,12 +13,12 @@ import (
 // RWMutex to make it threadsafe
 type MemoryRepository struct {
 	rwLock sync.RWMutex
-	data   map[string]map[time.Time][]*model.CustomCostResponse
+	data   map[string]map[time.Time]*model.CustomCostResponse
 }
 
 func NewMemoryRepository() *MemoryRepository {
 	return &MemoryRepository{
-		data: make(map[string]map[time.Time][]*model.CustomCostResponse),
+		data: make(map[string]map[time.Time]*model.CustomCostResponse),
 	}
 }
 
@@ -35,7 +35,7 @@ func (m *MemoryRepository) Has(startTime time.Time, domain string) (bool, error)
 	return ook, nil
 }
 
-func (m *MemoryRepository) Get(startTime time.Time, domain string) ([]*model.CustomCostResponse, error) {
+func (m *MemoryRepository) Get(startTime time.Time, domain string) (*model.CustomCostResponse, error) {
 	m.rwLock.RLock()
 	defer m.rwLock.RUnlock()
 
@@ -44,18 +44,13 @@ func (m *MemoryRepository) Get(startTime time.Time, domain string) ([]*model.Cus
 		return nil, nil
 	}
 
-	ccr, ook := domainData[startTime.UTC()]
+	cc, ook := domainData[startTime.UTC()]
 	if !ook {
 		return nil, nil
 	}
-	clones := []*model.CustomCostResponse{}
 
-	for _, cc := range ccr {
-		clone := cc.Clone()
-		clones = append(clones, &clone)
-	}
-
-	return clones, nil
+	clone := cc.Clone()
+	return &clone, nil
 }
 
 func (m *MemoryRepository) Keys() ([]string, error) {
@@ -66,7 +61,7 @@ func (m *MemoryRepository) Keys() ([]string, error) {
 	return keys, nil
 }
 
-func (m *MemoryRepository) Put(ccr []*model.CustomCostResponse) error {
+func (m *MemoryRepository) Put(ccr *model.CustomCostResponse) error {
 	m.rwLock.Lock()
 	defer m.rwLock.Unlock()
 
@@ -74,21 +69,20 @@ func (m *MemoryRepository) Put(ccr []*model.CustomCostResponse) error {
 		return fmt.Errorf("MemoryRepository: Put: cannot save nil")
 	}
 
-	for _, cc := range ccr {
-		if cc.Window.IsOpen() {
-			return fmt.Errorf("MemoryRepository: Put: custom cost response has invalid window %s", cc.Window.String())
-		}
-
-		if cc.GetDomain() == "" {
-			return fmt.Errorf("MemoryRepository: Put: custom cost response does not have a domain value")
-		}
+	if ccr.Window.IsOpen() {
+		return fmt.Errorf("MemoryRepository: Put: custom cost response has invalid window %s", ccr.Window.String())
+	}
 
-		if _, ok := m.data[cc.GetDomain()]; !ok {
-			m.data[cc.GetDomain()] = make(map[time.Time][]*model.CustomCostResponse)
-		}
+	if ccr.GetDomain() == "" {
+		return fmt.Errorf("MemoryRepository: Put: custom cost response does not have a domain value")
+	}
 
-		m.data[cc.GetDomain()][cc.Window.Start().UTC()] = append(m.data[cc.GetDomain()][cc.Window.Start().UTC()], cc)
+	if _, ok := m.data[ccr.GetDomain()]; !ok {
+		m.data[ccr.GetDomain()] = make(map[time.Time]*model.CustomCostResponse)
 	}
+
+	m.data[ccr.GetDomain()][ccr.Window.Start().UTC()] = ccr
+
 	return nil
 }
 

+ 2 - 1
pkg/customcost/pipelineservice.go

@@ -106,7 +106,8 @@ func getRegisteredPlugins(configDir string, execDir string) (map[string]*plugin.
 // NewPipelineService is a constructor for a PipelineService
 func NewPipelineService(hourlyrepo, dailyrepo Repository, ingConf CustomCostIngestorConfig) (*PipelineService, error) {
 
-	registeredPlugins, err := getRegisteredPlugins(ingConf.PluginConfigDir, ingConf.PluginExecutableDir)
+	var registeredPlugins map[string]*plugin.ClientProtocol
+	var err error //getRegisteredPlugins(ingConf.PluginConfigDir, ingConf.PluginExecutableDir)
 	if err != nil {
 		log.Errorf("error getting registered plugins: %v", err)
 		return nil, fmt.Errorf("error getting registered plugins: %v", err)

+ 2 - 2
pkg/customcost/repository.go

@@ -9,8 +9,8 @@ import (
 // Repository is an interface for storing and retrieving CloudCost data
 type Repository interface {
 	Has(time.Time, string) (bool, error)
-	Get(time.Time, string) ([]*model.CustomCostResponse, error)
+	Get(time.Time, string) (*model.CustomCostResponse, error)
 	Keys() ([]string, error)
-	Put([]*model.CustomCostResponse) error
+	Put(*model.CustomCostResponse) error
 	Expire(time.Time) error
 }