|
|
@@ -37,8 +37,6 @@ type csvExporter struct {
|
|
|
}
|
|
|
|
|
|
// Update updates CSV file in cloud storage with new allocation data
|
|
|
-// TODO: currently everything is processed in memory, it might be a problem for large clusters
|
|
|
-// it's possible to switch to temporary files, but it will require upgrading all storage provider to work with files
|
|
|
func (e *csvExporter) Update(ctx context.Context) error {
|
|
|
allocationDates, err := e.availableAllocationDates()
|
|
|
if err != nil {
|
|
|
@@ -49,83 +47,84 @@ func (e *csvExporter) Update(ctx context.Context) error {
|
|
|
return errors.New("no data to export from prometheus")
|
|
|
}
|
|
|
|
|
|
- previousExportTmp, err := os.CreateTemp("", "export-*.csv")
|
|
|
+ resultTmp, err := os.CreateTemp("opencost", "export-*.csv")
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
- defer closeAndDelete(previousExportTmp)
|
|
|
- err = e.FileManager.Download(ctx, previousExportTmp)
|
|
|
- var exist bool
|
|
|
- if err != nil {
|
|
|
- if !errors.Is(err, storagev2.ErrNotFound) {
|
|
|
- return err
|
|
|
- }
|
|
|
- exist = false
|
|
|
- } else {
|
|
|
- exist = true
|
|
|
- }
|
|
|
+ defer closeAndDelete(resultTmp)
|
|
|
|
|
|
- resultTmp, err := os.CreateTemp("", "export-*.csv")
|
|
|
+ previousExportTmp, err := os.CreateTemp("opencost", "previous-export-*.csv")
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
- defer closeAndDelete(resultTmp)
|
|
|
- // cloud storage doesn't have an existing file
|
|
|
- // dump all the data exist to the file
|
|
|
- if !exist {
|
|
|
+ defer closeAndDelete(previousExportTmp)
|
|
|
+
|
|
|
+ err = e.FileManager.Download(ctx, previousExportTmp)
|
|
|
+ switch {
|
|
|
+ case errors.Is(err, storagev2.ErrNotFound):
|
|
|
+ // there is no previous file, so we need to create it
|
|
|
err := e.writeCSVToWriter(ctx, resultTmp, mapTimeToSlice(allocationDates))
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
- }
|
|
|
-
|
|
|
- // existing export file exists
|
|
|
- // scan through it and ignore all dates that are already in the file
|
|
|
- // avoid modifying existing data or producing duplicates
|
|
|
- if exist {
|
|
|
- csvDates, err := e.loadDates(previousExportTmp)
|
|
|
+ case err != nil:
|
|
|
+ return err
|
|
|
+ default:
|
|
|
+ // existing export file exists
|
|
|
+ // scan through it and ignore all dates that are already in the file
|
|
|
+ // avoid modifying existing data or producing duplicates
|
|
|
+ err := e.updateExportCSV(ctx, previousExportTmp, allocationDates, resultTmp)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- for date := range csvDates {
|
|
|
- delete(allocationDates, date)
|
|
|
- }
|
|
|
+ // we just wrote to the file, so we need to seek to the beginning, so we can read from it
|
|
|
+ _, err = resultTmp.Seek(0, io.SeekStart)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
|
|
|
- if len(allocationDates) == 0 {
|
|
|
- log.Info("export file in cloud storage already contain data for all dates, skipping update")
|
|
|
- return nil
|
|
|
- }
|
|
|
+ err = e.FileManager.Upload(ctx, resultTmp)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
|
|
|
- newExportTmp, err := os.CreateTemp("", "new-export-*.csv")
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- defer closeAndDelete(newExportTmp)
|
|
|
+ log.Info("CSV export updated")
|
|
|
|
|
|
- err = e.writeCSVToWriter(ctx, newExportTmp, mapTimeToSlice(allocationDates))
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
+ return nil
|
|
|
+}
|
|
|
|
|
|
- err = mergeCSV([]*os.File{previousExportTmp, newExportTmp}, resultTmp)
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
+func (e *csvExporter) updateExportCSV(ctx context.Context, previousExportTmp *os.File, allocationDates map[time.Time]struct{}, result *os.File) error {
|
|
|
+ csvDates, err := e.loadDates(previousExportTmp)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
}
|
|
|
|
|
|
- _, err = resultTmp.Seek(0, io.SeekStart)
|
|
|
+ for date := range csvDates {
|
|
|
+ delete(allocationDates, date)
|
|
|
+ }
|
|
|
+
|
|
|
+ if len(allocationDates) == 0 {
|
|
|
+ log.Info("export file in cloud storage already contain data for all dates, skipping update")
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ newExportTmp, err := os.CreateTemp("opencost", "new-export-*.csv")
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
+ defer closeAndDelete(newExportTmp)
|
|
|
|
|
|
- err = e.FileManager.Upload(ctx, resultTmp)
|
|
|
+ err = e.writeCSVToWriter(ctx, newExportTmp, mapTimeToSlice(allocationDates))
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
- log.Info("CSV export updated")
|
|
|
-
|
|
|
+ err = mergeCSV([]*os.File{previousExportTmp, newExportTmp}, result)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
@@ -155,7 +154,6 @@ func (e *csvExporter) writeCSVToWriter(ctx context.Context, w io.Writer, dates [
|
|
|
return strconv.FormatFloat(f, 'f', -1, 64)
|
|
|
}
|
|
|
csvWriter := csv.NewWriter(w)
|
|
|
- // TODO: confirm columns we want to export
|
|
|
err := csvWriter.Write([]string{
|
|
|
"Date",
|
|
|
"Namespace",
|
|
|
@@ -192,8 +190,6 @@ func (e *csvExporter) writeCSVToWriter(ctx context.Context, w io.Writer, dates [
|
|
|
return err
|
|
|
}
|
|
|
log.Infof("fetched %d records for %s", len(data.Allocations), date.Format("2006-01-02"))
|
|
|
- // TODO: does it need to be aggregated by namespace+controller?
|
|
|
- // container-level information can be too noisy for most users
|
|
|
for _, alloc := range data.Allocations {
|
|
|
if err := ctx.Err(); err != nil {
|
|
|
return err
|
|
|
@@ -340,6 +336,7 @@ func mergeCSV(input []*os.File, output *os.File) error {
|
|
|
|
|
|
}
|
|
|
csvWriter.Flush()
|
|
|
+ // check for errors from the Flush
|
|
|
if csvWriter.Error() != nil {
|
|
|
return fmt.Errorf("flushing csv file: %w", csvWriter.Error())
|
|
|
}
|