|
|
@@ -1,10 +1,11 @@
|
|
|
package costmodel
|
|
|
|
|
|
import (
|
|
|
+ "bytes"
|
|
|
"context"
|
|
|
"encoding/csv"
|
|
|
"io"
|
|
|
- "os"
|
|
|
+ "sort"
|
|
|
"strconv"
|
|
|
"time"
|
|
|
|
|
|
@@ -15,16 +16,18 @@ import (
|
|
|
)
|
|
|
|
|
|
type CloudStorage interface {
|
|
|
- FileReplace(ctx context.Context, f *os.File, path string) error
|
|
|
- FileDownload(ctx context.Context, path string) (*os.File, error)
|
|
|
- FileExists(ctx context.Context, path string) (bool, error)
|
|
|
+ Write(name string, data []byte) error
|
|
|
+ Read(name string) ([]byte, error)
|
|
|
+ Exists(name string) (bool, error)
|
|
|
}
|
|
|
|
|
|
type AllocationModel interface {
|
|
|
ComputeAllocation(start, end time.Time, resolution time.Duration) (*kubecost.AllocationSet, error)
|
|
|
- DateRange(ctx context.Context) (time.Time, time.Time, error)
|
|
|
+ DateRange() (time.Time, time.Time, error)
|
|
|
}
|
|
|
|
|
|
+var errNoData = errors.New("no data")
|
|
|
+
|
|
|
// UpdateCSVWorker launches a worker that updates CSV file in cloud storage with allocation data
|
|
|
// It updates data immediately on launch and then runs every day at 00:10 UTC
|
|
|
// It expected to run a goroutine
|
|
|
@@ -36,8 +39,7 @@ func UpdateCSVWorker(ctx context.Context, storage CloudStorage, model Allocation
|
|
|
case <-ctx.Done():
|
|
|
return ctx.Err()
|
|
|
case <-time.After(nextRunAt.Sub(time.Now())):
|
|
|
- dayBefore := time.Date(nextRunAt.Year(), nextRunAt.Month(), nextRunAt.Day(), 0, 0, 0, 0, time.UTC).AddDate(0, 0, -1)
|
|
|
- err := UpdateCSV(ctx, storage, model, path, dayBefore)
|
|
|
+ err := UpdateCSV(ctx, storage, model, path)
|
|
|
if err != nil {
|
|
|
// it's background worker, log error and carry on, maybe next time it will work
|
|
|
log.Errorf("Error updating CSV: %s", err)
|
|
|
@@ -50,13 +52,13 @@ func UpdateCSVWorker(ctx context.Context, storage CloudStorage, model Allocation
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func UpdateCSV(ctx context.Context, storage CloudStorage, model AllocationModel, path string, date time.Time) error {
|
|
|
+func UpdateCSV(ctx context.Context, storage CloudStorage, model AllocationModel, path string) error {
|
|
|
exporter := &csvExporter{
|
|
|
Storage: storage,
|
|
|
Model: model,
|
|
|
FilePath: path,
|
|
|
}
|
|
|
- return exporter.Update(ctx, date)
|
|
|
+ return exporter.Update(ctx)
|
|
|
}
|
|
|
|
|
|
type csvExporter struct {
|
|
|
@@ -65,48 +67,59 @@ type csvExporter struct {
|
|
|
FilePath string
|
|
|
}
|
|
|
|
|
|
-// TODO: logging
|
|
|
-func (e *csvExporter) Update(ctx context.Context, date time.Time) error {
|
|
|
- exist, err := e.Storage.FileExists(ctx, e.FilePath)
|
|
|
+// Update updates CSV file in cloud storage with new allocation data
|
|
|
+func (e *csvExporter) Update(ctx context.Context) error {
|
|
|
+ allocationDates, err := e.availableAllocationDates()
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
- dateExport, err := e.writeCSVToFile(ctx, date)
|
|
|
+ exist, err := e.Storage.Exists(e.FilePath)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
- defer dateExport.Close()
|
|
|
|
|
|
- var result *os.File
|
|
|
+ var result []byte
|
|
|
+
|
|
|
+ // cloud storage doesn't have an existing file
|
|
|
+ // dump all the data exist to the file
|
|
|
+ if !exist {
|
|
|
+ result, err = e.allocationsToCSV(ctx, mapTimeToSlice(allocationDates))
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // existing export file exists
|
|
|
+ // scan through it and ignore all dates that are already in the file
|
|
|
+ // avoid modifying existing data or producing duplicates
|
|
|
if exist {
|
|
|
- // merge existing file with new data
|
|
|
- previousExport, err := e.Storage.FileDownload(ctx, e.FilePath)
|
|
|
+ previousExport, err := e.Storage.Read(e.FilePath)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
- defer previousExport.Close()
|
|
|
|
|
|
- result, err = os.CreateTemp("", "cost-model-*.csv")
|
|
|
+ csvDates, err := e.loadDates(previousExport)
|
|
|
if err != nil {
|
|
|
- return errors.Wrap(err, "creating temp file")
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ for date := range csvDates {
|
|
|
+ delete(allocationDates, date)
|
|
|
}
|
|
|
- err = mergeCSV([]*os.File{previousExport, dateExport}, result)
|
|
|
+
|
|
|
+ dateExport, err := e.allocationsToCSV(ctx, mapTimeToSlice(allocationDates))
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
- } else {
|
|
|
- // no existing file, create a new one
|
|
|
- result = dateExport
|
|
|
- }
|
|
|
|
|
|
- // we just finished writing to the file, so we need to seek to the beginning, so we can read from it
|
|
|
- _, err = result.Seek(0, io.SeekStart)
|
|
|
- if err != nil {
|
|
|
- return errors.Wrap(err, "seeking to the beginning of the csv file")
|
|
|
+ result, err = mergeCSV([][]byte{previousExport, dateExport})
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- err = e.Storage.FileReplace(ctx, result, e.FilePath)
|
|
|
+ err = e.Storage.Write(e.FilePath, result)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
@@ -114,30 +127,43 @@ func (e *csvExporter) Update(ctx context.Context, date time.Time) error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func (e *csvExporter) writeCSVToFile(ctx context.Context, date time.Time) (*os.File, error) {
|
|
|
- f, err := os.CreateTemp("", "cost-model-*.csv")
|
|
|
+func (e *csvExporter) availableAllocationDates() (map[time.Time]struct{}, error) {
|
|
|
+ start, end, err := e.Model.DateRange()
|
|
|
if err != nil {
|
|
|
- return nil, errors.Wrap(err, "creating temp file")
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ if start != time.Date(start.Year(), start.Month(), start.Day(), 0, 0, 0, 0, time.UTC) {
|
|
|
+ // start doesn't start from 00:00 UTC, it could be truncated by prometheus retention policy
|
|
|
+ // skip incomplete data and begin from the day after, otherwise it may corrupt existing data
|
|
|
+ start = time.Date(start.Year(), start.Month(), start.Day(), 0, 0, 0, 0, time.UTC).AddDate(0, 0, 1)
|
|
|
}
|
|
|
+ end = time.Date(end.Year(), end.Month(), end.Day(), 0, 0, 0, 0, time.UTC)
|
|
|
+ dates := make(map[time.Time]struct{})
|
|
|
+ for date := start; date.Before(end); date = date.AddDate(0, 0, 1) {
|
|
|
+ dates[date] = struct{}{}
|
|
|
+ }
|
|
|
+ if len(dates) == 0 {
|
|
|
+ return nil, errors.New("no allocation data available")
|
|
|
+ }
|
|
|
+ return dates, nil
|
|
|
+}
|
|
|
|
|
|
- err = e.writeCSVToWriter(ctx, f, date)
|
|
|
+func (e *csvExporter) allocationsToCSV(ctx context.Context, dates []time.Time) ([]byte, error) {
|
|
|
+ buf := new(bytes.Buffer)
|
|
|
+ err := e.writeCSVToWriter(ctx, buf, dates)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
- return f, nil
|
|
|
+ return buf.Bytes(), nil
|
|
|
}
|
|
|
|
|
|
-func (e *csvExporter) writeCSVToWriter(ctx context.Context, w io.Writer, date time.Time) error {
|
|
|
- start := time.Date(date.Year(), date.Month(), date.Day(), 0, 0, 0, 0, time.UTC)
|
|
|
- end := start.AddDate(0, 1, 0)
|
|
|
- data, err := e.Model.ComputeAllocation(start, end, 5*time.Minute)
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
+func (e *csvExporter) writeCSVToWriter(ctx context.Context, w io.Writer, dates []time.Time) error {
|
|
|
+ fmtFloat := func(f float64) string {
|
|
|
+ return strconv.FormatFloat(f, 'f', -1, 64)
|
|
|
}
|
|
|
- log.Infof("data: %d", len(data.Allocations))
|
|
|
csvWriter := csv.NewWriter(w)
|
|
|
// TODO: confirm columns we want to export
|
|
|
- err = csvWriter.Write([]string{
|
|
|
+ err := csvWriter.Write([]string{
|
|
|
"Date",
|
|
|
"Name",
|
|
|
"CPUCoreUsageAverage",
|
|
|
@@ -146,73 +172,126 @@ func (e *csvExporter) writeCSVToWriter(ctx context.Context, w io.Writer, date ti
|
|
|
"RAMBytesUsageAverage",
|
|
|
"RAMBytesRequestAverage",
|
|
|
"RAMCost",
|
|
|
+ "GPUs",
|
|
|
+ "GPUCost",
|
|
|
+ "NetworkCost",
|
|
|
+ "PVBytes",
|
|
|
+ "PVCost",
|
|
|
+ "TotalCost",
|
|
|
})
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
- for _, alloc := range data.Allocations {
|
|
|
- if err := ctx.Err(); err != nil {
|
|
|
+ lines := 0
|
|
|
+ for _, date := range dates {
|
|
|
+ start := time.Date(date.Year(), date.Month(), date.Day(), 0, 0, 0, 0, time.UTC)
|
|
|
+ end := start.AddDate(0, 0, 1)
|
|
|
+ data, err := e.Model.ComputeAllocation(start, end, 5*time.Minute)
|
|
|
+ if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
+ log.Infof("fetched %d records for %s", len(data.Allocations), date.Format("2006-01-02"))
|
|
|
+ for _, alloc := range data.Allocations {
|
|
|
+ if err := ctx.Err(); err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
|
|
|
- err := csvWriter.Write([]string{
|
|
|
- date.Format("2006-01-02"),
|
|
|
- alloc.Name,
|
|
|
- fmtFloat(alloc.CPUCoreUsageAverage),
|
|
|
- fmtFloat(alloc.CPUCoreRequestAverage),
|
|
|
- fmtFloat(alloc.CPUCost),
|
|
|
- fmtFloat(alloc.RAMBytesUsageAverage),
|
|
|
- fmtFloat(alloc.RAMBytesRequestAverage),
|
|
|
- fmtFloat(alloc.RAMCost),
|
|
|
- })
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
+ err := csvWriter.Write([]string{
|
|
|
+ date.Format("2006-01-02"),
|
|
|
+ alloc.Name,
|
|
|
+ fmtFloat(alloc.CPUCoreUsageAverage),
|
|
|
+ fmtFloat(alloc.CPUCoreRequestAverage),
|
|
|
+ fmtFloat(alloc.CPUTotalCost()),
|
|
|
+ fmtFloat(alloc.RAMBytesUsageAverage),
|
|
|
+ fmtFloat(alloc.RAMBytesRequestAverage),
|
|
|
+ fmtFloat(alloc.RAMTotalCost()),
|
|
|
+ fmtFloat(alloc.GPUs()),
|
|
|
+ fmtFloat(alloc.GPUCost),
|
|
|
+ fmtFloat(alloc.NetworkTotalCost()),
|
|
|
+ fmtFloat(alloc.PVBytes()),
|
|
|
+ fmtFloat(alloc.PVCost()),
|
|
|
+ fmtFloat(alloc.TotalCost()),
|
|
|
+ })
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ lines++
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ if lines == 0 {
|
|
|
+ return errNoData
|
|
|
+ }
|
|
|
+
|
|
|
csvWriter.Flush()
|
|
|
if err := csvWriter.Error(); err != nil {
|
|
|
return err
|
|
|
}
|
|
|
-
|
|
|
+ log.Infof("exported %d lines", lines)
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func fmtFloat(f float64) string {
|
|
|
- return strconv.FormatFloat(f, 'f', -1, 64)
|
|
|
+// loadDate scans through CSV export file and extract all dates from "Date" column
|
|
|
+func (e *csvExporter) loadDates(csvFile []byte) (map[time.Time]struct{}, error) {
|
|
|
+ csvReader := csv.NewReader(bytes.NewReader(csvFile))
|
|
|
+ header, err := csvReader.Read()
|
|
|
+ if err != nil {
|
|
|
+ return nil, errors.Wrap(err, "reading csv header")
|
|
|
+ }
|
|
|
+ dateColIndex := 0
|
|
|
+ for i, col := range header {
|
|
|
+ if col == "Date" {
|
|
|
+ dateColIndex = i
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ dates := make(map[time.Time]struct{})
|
|
|
+ for {
|
|
|
+ row, err := csvReader.Read()
|
|
|
+ if err == io.EOF {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ if err != nil {
|
|
|
+ return nil, errors.Wrap(err, "reading csv row")
|
|
|
+ }
|
|
|
+ date, err := time.Parse("2006-01-02", row[dateColIndex])
|
|
|
+ if err != nil {
|
|
|
+ return nil, errors.Wrap(err, "parsing date")
|
|
|
+ }
|
|
|
+ dates[date] = struct{}{}
|
|
|
+ }
|
|
|
+ return dates, nil
|
|
|
}
|
|
|
|
|
|
// mergeCSV merges multiple csv files into one.
|
|
|
// Files may have different headers, but the result will have a header that is a union of all headers.
|
|
|
// The main goal here is to allow changing CSV format without breaking or loosing existing data.
|
|
|
-func mergeCSV(files []*os.File, output *os.File) error {
|
|
|
+func mergeCSV(files [][]byte) ([]byte, error) {
|
|
|
var err error
|
|
|
headers := make([][]string, 0, len(files))
|
|
|
csvReaders := make([]*csv.Reader, 0, len(files))
|
|
|
|
|
|
// first, get information about the result header
|
|
|
for _, file := range files {
|
|
|
- if _, err := file.Seek(0, io.SeekStart); err != nil {
|
|
|
- return errors.Wrapf(err, "seeking to start of %s", file.Name())
|
|
|
- }
|
|
|
- csvReader := csv.NewReader(file)
|
|
|
+ csvReader := csv.NewReader(bytes.NewReader(file))
|
|
|
header, err := csvReader.Read()
|
|
|
if errors.Is(err, io.EOF) {
|
|
|
// ignore empty files
|
|
|
continue
|
|
|
}
|
|
|
if err != nil {
|
|
|
- return errors.Wrapf(err, "reading header of %s", file.Name())
|
|
|
+ return nil, errors.Wrap(err, "reading header of csv file")
|
|
|
}
|
|
|
headers = append(headers, header)
|
|
|
csvReaders = append(csvReaders, csvReader)
|
|
|
}
|
|
|
|
|
|
mapping, header := combineHeaders(headers)
|
|
|
-
|
|
|
+ output := new(bytes.Buffer)
|
|
|
csvWriter := csv.NewWriter(output)
|
|
|
err = csvWriter.Write(mergeHeaders(headers))
|
|
|
if err != nil {
|
|
|
- return errors.Wrapf(err, "writing header to %s", output.Name())
|
|
|
+ return nil, errors.Wrap(err, "writing header to csv file")
|
|
|
}
|
|
|
|
|
|
for csvIndex, csvReader := range csvReaders {
|
|
|
@@ -222,7 +301,7 @@ func mergeCSV(files []*os.File, output *os.File) error {
|
|
|
break
|
|
|
}
|
|
|
if err != nil {
|
|
|
- return errors.Wrap(err, "reading csv file line")
|
|
|
+ return nil, errors.Wrap(err, "reading csv file line")
|
|
|
}
|
|
|
|
|
|
outputLine := make([]string, len(header))
|
|
|
@@ -235,18 +314,17 @@ func mergeCSV(files []*os.File, output *os.File) error {
|
|
|
}
|
|
|
err = csvWriter.Write(outputLine)
|
|
|
if err != nil {
|
|
|
- return errors.Wrapf(err, "writing line to %s", output.Name())
|
|
|
+ return nil, errors.Wrap(err, "writing line to csv file")
|
|
|
}
|
|
|
}
|
|
|
|
|
|
}
|
|
|
csvWriter.Flush()
|
|
|
- _, err = output.Seek(0, io.SeekStart)
|
|
|
- if err != nil {
|
|
|
- return errors.Wrapf(err, "seeking to start of %s", output.Name())
|
|
|
+ if csvWriter.Error() != nil {
|
|
|
+ return nil, errors.Wrapf(csvWriter.Error(), "flushing csv file")
|
|
|
}
|
|
|
|
|
|
- return nil
|
|
|
+ return output.Bytes(), nil
|
|
|
}
|
|
|
|
|
|
func combineHeaders(headers [][]string) ([]map[int]int, []string) {
|
|
|
@@ -295,3 +373,14 @@ func indexOf(slice []string, element string) int {
|
|
|
}
|
|
|
return -1
|
|
|
}
|
|
|
+
|
|
|
+func mapTimeToSlice(data map[time.Time]struct{}) []time.Time {
|
|
|
+ result := make([]time.Time, 0, len(data))
|
|
|
+ for key := range data {
|
|
|
+ result = append(result, key)
|
|
|
+ }
|
|
|
+ sort.Slice(result, func(i, j int) bool {
|
|
|
+ return result[i].Before(result[j])
|
|
|
+ })
|
|
|
+ return result
|
|
|
+}
|