Просмотр исходного кода

Merge branch 'develop' into sth/cc-quierier-arg-update

Sean Holcomb 2 лет назад
Родитель
Сommit
ddb18cc970

+ 2 - 0
pkg/costmodel/router.go

@@ -1204,6 +1204,7 @@ type InstallInfo struct {
 type ContainerInfo struct {
 	ContainerName string `json:"containerName"`
 	Image         string `json:"image"`
+	StartTime     string `json:"startTime"`
 }
 
 func (a *Accesses) GetInstallInfo(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
@@ -1233,6 +1234,7 @@ func (a *Accesses) GetInstallInfo(w http.ResponseWriter, r *http.Request, _ http
 				c := ContainerInfo{
 					ContainerName: container.Name,
 					Image:         container.Image,
+					StartTime:     pod.Status.StartTime.String(),
 				}
 				info.Containers = append(info.Containers, c)
 			}

+ 17 - 3
pkg/customcost/pipelineservice.go

@@ -5,6 +5,7 @@ import (
 	"net/http"
 	"os"
 	"os/exec"
+	"path"
 	"runtime"
 	"strings"
 	"time"
@@ -26,6 +27,7 @@ const execFmt = `%s/%s.ocplugin.%s.%s`
 type PipelineService struct {
 	hourlyIngestor, dailyIngestor *CustomCostIngestor
 	hourlyStore, dailyStore       Repository
+	domains                       []string
 }
 
 func getRegisteredPlugins(configDir string, execDir string) (map[string]*plugin.Client, error) {
@@ -39,6 +41,11 @@ func getRegisteredPlugins(configDir string, execDir string) (map[string]*plugin.
 
 	// list of plugins that we must run are the strings before _
 	for _, file := range configFiles {
+		// skip hidden files and directories
+		if strings.HasPrefix(file.Name(), ".") || file.IsDir() {
+			continue
+		}
+
 		log.Tracef("parsing config file name: %s", file.Name())
 		fileParts := strings.Split(file.Name(), "_")
 
@@ -46,7 +53,7 @@ func getRegisteredPlugins(configDir string, execDir string) (map[string]*plugin.
 			return nil, fmt.Errorf("plugin config file name %s invalid. Config files must have the form <plugin name>_config.json", file.Name())
 		}
 
-		pluginNames[fileParts[0]] = configDir + "/" + file.Name()
+		pluginNames[fileParts[0]] = path.Join(configDir, file.Name())
 	}
 
 	if len(pluginNames) == 0 {
@@ -105,7 +112,6 @@ func getRegisteredPlugins(configDir string, execDir string) (map[string]*plugin.
 
 // NewPipelineService is a constructor for a PipelineService
 func NewPipelineService(hourlyrepo, dailyrepo Repository, ingConf CustomCostIngestorConfig) (*PipelineService, error) {
-
 	registeredPlugins, err := getRegisteredPlugins(ingConf.PluginConfigDir, ingConf.PluginExecutableDir)
 	if err != nil {
 		log.Errorf("error getting registered plugins: %v", err)
@@ -125,15 +131,22 @@ func NewPipelineService(hourlyrepo, dailyrepo Repository, ingConf CustomCostInge
 	}
 
 	dailyIngestor.Start(false)
+
+	var domains []string
+	for domain, _ := range registeredPlugins {
+		domains = append(domains, domain)
+	}
+
 	return &PipelineService{
 		hourlyIngestor: hourlyIngestor,
 		hourlyStore:    hourlyrepo,
 		dailyStore:     dailyrepo,
 		dailyIngestor:  dailyIngestor,
+		domains:        domains,
 	}, nil
 }
 
-// Status gives a combined view of the state of configs and the ingestior status
+// Status gives a combined view of the state of configs and the ingestor status
 func (dp *PipelineService) Status() Status {
 
 	// Pull config status from the config controller
@@ -148,6 +161,7 @@ func (dp *PipelineService) Status() Status {
 		CoverageHourly:    ingstatusHourly.Coverage,
 		RefreshRateHourly: ingstatusHourly.RefreshRate.String(),
 		RefreshRateDaily:  ingstatusDaily.RefreshRate.String(),
+		Domains:           dp.domains,
 	}
 
 }

+ 86 - 0
pkg/customcost/querier.go

@@ -2,9 +2,95 @@ package customcost
 
 import (
 	"context"
+	"fmt"
+	"time"
+
+	"github.com/opencost/opencost/core/pkg/opencost"
+	"github.com/opencost/opencost/core/pkg/util/timeutil"
+	"github.com/opencost/opencost/pkg/env"
 )
 
 type Querier interface {
 	QueryTotal(ctx context.Context, request CostTotalRequest) (*CostResponse, error)
 	QueryTimeseries(ctx context.Context, request CostTimeseriesRequest) (*CostTimeseriesResponse, error)
 }
+
+func GetCustomCostWindowAccumulation(window opencost.Window, accumulate opencost.AccumulateOption) (opencost.Window, opencost.AccumulateOption, error) {
+	var err error
+	if accumulate == opencost.AccumulateOptionNone {
+		accumulate, err = getCustomCostAccumulateOption(window, nil)
+		if err != nil {
+			return opencost.Window{}, opencost.AccumulateOptionNone, fmt.Errorf("failed to determine custom cost accumulation option: %v", err)
+		}
+	}
+	window, err = window.GetAccumulateWindow(accumulate)
+	if err != nil {
+		return opencost.Window{}, opencost.AccumulateOptionNone, fmt.Errorf("failed to determine custom cost accumulation option: %v", err)
+	}
+
+	return window, accumulate, nil
+}
+
+// getCustomCostAccumulateOption determines defaults in a way that matches options presented in the UI
+func getCustomCostAccumulateOption(window opencost.Window, from []opencost.AccumulateOption) (opencost.AccumulateOption, error) {
+	if window.IsOpen() || window.IsNegative() {
+		return opencost.AccumulateOptionNone, fmt.Errorf("invalid window '%s'", window.String())
+	}
+
+	if len(from) == 0 {
+		from = allSteppedAccumulateOptions
+	}
+
+	hourlyStoreHours := env.GetDataRetentionHourlyResolutionHours()
+	hourlySteps := time.Duration(hourlyStoreHours) * time.Hour
+	oldestHourly := time.Now().Add(-1 * hourlySteps)
+
+	// Use hourly if...
+	//  (1) hourly is an option;
+	//  (2) we have hourly store coverage; and
+	//  (3) the window duration is less than the hourly break point.
+	if hasHourly(from) && oldestHourly.Before(*window.Start()) && window.Duration() <= hourlySteps {
+		return opencost.AccumulateOptionHour, nil
+	}
+
+	dailyStoreDays := env.GetDataRetentionDailyResolutionDays()
+	dailySteps := time.Duration(dailyStoreDays) * timeutil.Day
+	oldestDaily := time.Now().Add(-1 * dailySteps)
+	// Use daily if...
+	//  (1) daily is an option
+	// It is acceptable to query a range for which we only have partial data
+	if hasDaily(from) {
+		return opencost.AccumulateOptionDay, nil
+	}
+
+	if oldestDaily.After(*window.Start()) {
+		return opencost.AccumulateOptionDay, fmt.Errorf("data store does not have coverage for %v", window)
+	}
+
+	return opencost.AccumulateOptionNone, fmt.Errorf("no valid accumulate option in %v for %s", from, window)
+}
+
+var allSteppedAccumulateOptions = []opencost.AccumulateOption{
+	opencost.AccumulateOptionHour,
+	opencost.AccumulateOptionDay,
+}
+
+func hasHourly(opts []opencost.AccumulateOption) bool {
+	for _, opt := range opts {
+		if opt == opencost.AccumulateOptionHour {
+			return true
+		}
+	}
+
+	return false
+}
+
+func hasDaily(opts []opencost.AccumulateOption) bool {
+	for _, opt := range opts {
+		if opt == opencost.AccumulateOptionDay {
+			return true
+		}
+	}
+
+	return false
+}

+ 16 - 79
pkg/customcost/repositoryquerier.go

@@ -8,7 +8,6 @@ import (
 
 	"github.com/opencost/opencost/core/pkg/opencost"
 	"github.com/opencost/opencost/core/pkg/util/timeutil"
-	"github.com/opencost/opencost/pkg/env"
 )
 
 type RepositoryQuerier struct {
@@ -28,9 +27,15 @@ func NewRepositoryQuerier(hourlyRepo, dailyRepo Repository, hourlyDuration, dail
 }
 
 func (rq *RepositoryQuerier) QueryTotal(ctx context.Context, request CostTotalRequest) (*CostResponse, error) {
+	window := opencost.NewClosedWindow(request.Start, request.End)
+	window, accumulate, err := GetCustomCostWindowAccumulation(window, request.Accumulate)
+	if err != nil {
+		return nil, fmt.Errorf("error getting custom cost total window accumulation: %w", err)
+	}
+
 	repo := rq.dailyRepo
 	step := timeutil.Day
-	if request.Accumulate == opencost.AccumulateOptionHour {
+	if accumulate == opencost.AccumulateOptionHour {
 		repo = rq.hourlyRepo
 		step = time.Hour
 	}
@@ -45,10 +50,9 @@ func (rq *RepositoryQuerier) QueryTotal(ctx context.Context, request CostTotalRe
 		return nil, fmt.Errorf("RepositoryQuerier: Query: failed to compile filters: %w", err)
 	}
 
-	requestWindow := opencost.NewClosedWindow(request.Start, request.End)
-	ccs := NewCustomCostSet(requestWindow)
-	queryStart := request.Start
-	for queryStart.Before(request.End) {
+	ccs := NewCustomCostSet(window)
+	queryStart := *window.Start()
+	for queryStart.Before(*window.End()) {
 		queryEnd := queryStart.Add(step)
 
 		for _, domain := range domains {
@@ -78,81 +82,14 @@ func (rq *RepositoryQuerier) QueryTotal(ctx context.Context, request CostTotalRe
 	return NewCostResponse(ccs), nil
 }
 
-var allSteppedAccumulateOptions = []opencost.AccumulateOption{
-	opencost.AccumulateOptionHour,
-	opencost.AccumulateOptionDay,
-}
-
-func hasHourly(opts []opencost.AccumulateOption) bool {
-	for _, opt := range opts {
-		if opt == opencost.AccumulateOptionHour {
-			return true
-		}
-	}
-
-	return false
-}
-
-func hasDaily(opts []opencost.AccumulateOption) bool {
-	for _, opt := range opts {
-		if opt == opencost.AccumulateOptionDay {
-			return true
-		}
-	}
-
-	return false
-}
-
-// GetCustomCostAccumulateOption determines defaults in a way that matches options presented in the UI
-func GetCustomCostAccumulateOption(window opencost.Window, from []opencost.AccumulateOption) (opencost.AccumulateOption, error) {
-	if window.IsOpen() || window.IsNegative() {
-		return opencost.AccumulateOptionNone, fmt.Errorf("invalid window '%s'", window.String())
-	}
-
-	if len(from) == 0 {
-		from = allSteppedAccumulateOptions
-	}
-
-	hourlyStoreHours := env.GetDataRetentionHourlyResolutionHours()
-	hourlySteps := time.Duration(hourlyStoreHours) * time.Hour
-	oldestHourly := time.Now().Add(-1 * hourlySteps)
-
-	// Use hourly if...
-	//  (1) hourly is an option;
-	//  (2) we have hourly store coverage; and
-	//  (3) the window duration is less than the hourly break point.
-	if hasHourly(from) && oldestHourly.Before(*window.Start()) && window.Duration() <= hourlySteps {
-		return opencost.AccumulateOptionHour, nil
-	}
-
-	dailyStoreDays := env.GetDataRetentionDailyResolutionDays()
-	dailySteps := time.Duration(dailyStoreDays) * timeutil.Day
-	oldestDaily := time.Now().Add(-1 * dailySteps)
-	// Use daily if...
-	//  (1) daily is an option
-	// It is acceptable to query a range for which we only have partial data
-	if hasDaily(from) {
-		return opencost.AccumulateOptionDay, nil
-	}
-
-	if oldestDaily.After(*window.Start()) {
-		return opencost.AccumulateOptionNone, fmt.Errorf("data store does not have coverage for %v", window)
-	}
-
-	return opencost.AccumulateOptionNone, fmt.Errorf("no valid accumulate option in %v for %s", from, window)
-}
-
 func (rq *RepositoryQuerier) QueryTimeseries(ctx context.Context, request CostTimeseriesRequest) (*CostTimeseriesResponse, error) {
-	window, _ := opencost.NewClosedWindow(request.Start, request.End).GetAccumulateWindow(request.Accumulate)
-	var err error
-	if request.Accumulate == opencost.AccumulateOptionNone {
-		request.Accumulate, err = GetCustomCostAccumulateOption(window, nil)
-		if err != nil {
-			return nil, fmt.Errorf("error determining accumulation option: %v", err)
-		}
+	window := opencost.NewClosedWindow(request.Start, request.End)
+	window, accumulate, err := GetCustomCostWindowAccumulation(window, request.Accumulate)
+	if err != nil {
+		return nil, fmt.Errorf("error getting custom cost timeseries window accumulation: %w", err)
 	}
 
-	windows, err := window.GetAccumulateWindows(request.Accumulate)
+	windows, err := window.GetAccumulateWindows(accumulate)
 	if err != nil {
 		return nil, fmt.Errorf("error getting timeseries windows: %w", err)
 	}
@@ -172,7 +109,7 @@ func (rq *RepositoryQuerier) QueryTimeseries(ctx context.Context, request CostTi
 				End:         *window.End(),
 				AggregateBy: request.AggregateBy,
 				Filter:      request.Filter,
-				Accumulate:  request.Accumulate,
+				Accumulate:  accumulate,
 			})
 		}(i, w, totals)
 	}

+ 1 - 1
pkg/customcost/repositoryquerier_test.go

@@ -67,7 +67,7 @@ func TestGetCustomCostAccumulateOption(t *testing.T) {
 	}
 	for name, tt := range tests {
 		t.Run(name, func(t *testing.T) {
-			got, err := GetCustomCostAccumulateOption(tt.window, tt.from)
+			got, err := getCustomCostAccumulateOption(tt.window, tt.from)
 			if (err != nil) != tt.wantErr {
 				t.Errorf("GetAccumulateOption() error = %v, wantErr %v", err, tt.wantErr)
 				return

+ 1 - 0
pkg/customcost/status.go

@@ -9,6 +9,7 @@ import (
 // Status gives the details and metadata of a CustomCost integration
 type Status struct {
 	Enabled           bool                       `json:"enabled"`
+	Domains           []string                   `json:"domains"`
 	Key               string                     `json:"key,omitempty"`
 	Source            string                     `json:"source,omitempty"`
 	Provider          string                     `json:"provider,omitempty"`