Просмотр исходного кода

fix export dates

Signed-off-by: r2k1 <yokree@gmail.com>
r2k1 3 лет назад
Родитель
Сommit
1cebf0db3a
3 измененных файлов с 48 добавлено и 7 удалено
  1. 4 4
      pkg/costmodel/allocation.go
  2. 22 3
      pkg/costmodel/allocation_csv.go
  3. 22 0
      pkg/costmodel/allocation_csv_test.go

+ 4 - 4
pkg/costmodel/allocation.go

@@ -59,8 +59,8 @@ const (
 	queryFmtReplicaSetsWithoutOwners = `avg(avg_over_time(kube_replicaset_owner{owner_kind="<none>", owner_name="<none>"}[%s])) by (replicaset, namespace, %s)`
 	queryFmtLBCostPerHr              = `avg(avg_over_time(kubecost_load_balancer_cost[%s])) by (namespace, service_name, %s)`
 	queryFmtLBActiveMins             = `count(kubecost_load_balancer_cost) by (namespace, service_name, %s)[%s:%s]`
-	queryFmtOldestSample             = `max_over_time(timestamp(up{job="opencost"})[%s:%s])`
-	queryFmtNewestSample             = `min_over_time(timestamp(up{job="opencost"})[%s:%s])`
+	queryFmtOldestSample             = `min_over_time(timestamp(up{job="opencost"})[%s:%s])`
+	queryFmtNewestSample             = `max_over_time(timestamp(up{job="opencost"})[%s:%s])`
 )
 
 // Constants for Network Cost Subtype
@@ -266,13 +266,13 @@ func (cm *CostModel) DateRange() (time.Time, time.Time, error) {
 	if err != nil {
 		return time.Time{}, time.Time{}, errors.Wrap(err, "querying oldest sample")
 	}
-	oldest := time.Unix(int64(resOldest[0].Values[0].Timestamp), 0)
+	oldest := time.Unix(int64(resOldest[0].Values[0].Value), 0)
 
 	resNewest, _, err := ctx.QuerySync(fmt.Sprintf(queryFmtNewestSample, "90d", "1h"))
 	if err != nil {
 		return time.Time{}, time.Time{}, errors.Wrap(err, "querying oldest sample")
 	}
-	newest := time.Unix(int64(resNewest[0].Values[0].Timestamp), 0)
+	newest := time.Unix(int64(resNewest[0].Values[0].Value), 0)
 
 	return oldest, newest, nil
 }

+ 22 - 3
pkg/costmodel/allocation_csv.go

@@ -39,6 +39,7 @@ func UpdateCSVWorker(ctx context.Context, storage CloudStorage, model Allocation
 		case <-ctx.Done():
 			return ctx.Err()
 		case <-time.After(nextRunAt.Sub(time.Now())):
+			log.Infof("Updating CSV file: %s", path)
 			err := UpdateCSV(ctx, storage, model, path)
 			if err != nil {
 				// it's background worker, log error and carry on, maybe next time it will work
@@ -68,12 +69,18 @@ type csvExporter struct {
 }
 
 // Update updates CSV file in cloud storage with new allocation data
+// TODO: currently everything is processed in memory, it might be a problem for large clusters
+// it's possible to switch to temporary files, but it will require upgrading all storage provider to work with files
 func (e *csvExporter) Update(ctx context.Context) error {
 	allocationDates, err := e.availableAllocationDates()
 	if err != nil {
 		return err
 	}
 
+	if len(allocationDates) == 0 {
+		return errors.New("no data to export from prometheus")
+	}
+
 	exist, err := e.Storage.Exists(e.FilePath)
 	if err != nil {
 		return err
@@ -108,6 +115,11 @@ func (e *csvExporter) Update(ctx context.Context) error {
 			delete(allocationDates, date)
 		}
 
+		if len(allocationDates) == 0 {
+			log.Info("export file in cloud storage already contain data for all dates, skipping update")
+			return nil
+		}
+
 		dateExport, err := e.allocationsToCSV(ctx, mapTimeToSlice(allocationDates))
 		if err != nil {
 			return err
@@ -124,6 +136,8 @@ func (e *csvExporter) Update(ctx context.Context) error {
 		return err
 	}
 
+	log.Infof("Updated CSV file %s", e.FilePath)
+
 	return nil
 }
 
@@ -143,7 +157,7 @@ func (e *csvExporter) availableAllocationDates() (map[time.Time]struct{}, error)
 		dates[date] = struct{}{}
 	}
 	if len(dates) == 0 {
-		return nil, errors.New("no allocation data available")
+		return nil, errNoData
 	}
 	return dates, nil
 }
@@ -165,6 +179,8 @@ func (e *csvExporter) writeCSVToWriter(ctx context.Context, w io.Writer, dates [
 	// TODO: confirm columns we want to export
 	err := csvWriter.Write([]string{
 		"Date",
+		"Namespace",
+		"Kind",
 		"Name",
 		"CPUCoreUsageAverage",
 		"CPUCoreRequestAverage",
@@ -191,6 +207,7 @@ func (e *csvExporter) writeCSVToWriter(ctx context.Context, w io.Writer, dates [
 			return err
 		}
 		log.Infof("fetched %d records for %s", len(data.Allocations), date.Format("2006-01-02"))
+		// TODO: does it need to be aggregated by namespace+controller first?
 		for _, alloc := range data.Allocations {
 			if err := ctx.Err(); err != nil {
 				return err
@@ -198,7 +215,9 @@ func (e *csvExporter) writeCSVToWriter(ctx context.Context, w io.Writer, dates [
 
 			err := csvWriter.Write([]string{
 				date.Format("2006-01-02"),
-				alloc.Name,
+				alloc.Properties.Namespace,
+				alloc.Properties.ControllerKind,
+				alloc.Properties.Controller,
 				fmtFloat(alloc.CPUCoreUsageAverage),
 				fmtFloat(alloc.CPUCoreRequestAverage),
 				fmtFloat(alloc.CPUTotalCost()),
@@ -248,7 +267,7 @@ func (e *csvExporter) loadDates(csvFile []byte) (map[time.Time]struct{}, error)
 	dates := make(map[time.Time]struct{})
 	for {
 		row, err := csvReader.Read()
-		if err == io.EOF {
+		if errors.Is(err, io.EOF) {
 			break
 		}
 		if err != nil {

+ 22 - 0
pkg/costmodel/allocation_csv_test.go

@@ -114,6 +114,28 @@ func Test_UpdateCSV(t *testing.T) {
 `, string(storage.WriteCalls()[0].Data))
 	})
 
+	t.Run("data already present in export file, export should be skipped", func(t *testing.T) {
+		storage := &CloudStorageMock{
+			ExistsFunc: func(name string) (bool, error) {
+				return true, nil
+			},
+			ReadFunc: func(name string) ([]byte, error) {
+				return []byte(`Date,Name,CPUCoreUsageAverage,CPUCoreRequestAverage,CPUCost,RAMBytesUsageAverage,RAMBytesRequestAverage,RAMCost
+2021-01-01,test,0.1,0.2,0.3,0.4,0.5,0.6
+`), nil
+			},
+		}
+		model := &AllocationModelMock{
+			DateRangeFunc: func() (time.Time, time.Time, error) {
+				return time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), time.Date(2021, 1, 2, 0, 0, 0, 0, time.UTC), nil
+			},
+		}
+		err := UpdateCSV(context.TODO(), storage, model, "/test.csv")
+		require.NoError(t, err)
+		assert.Len(t, storage.WriteCalls(), 0)
+		assert.Len(t, model.ComputeAllocationCalls(), 0)
+	})
+
 	t.Run("allocation data is empty", func(t *testing.T) {
 		model := &AllocationModelMock{
 			ComputeAllocationFunc: func(start time.Time, end time.Time, resolution time.Duration) (*kubecost.AllocationSet, error) {