| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420 |
- package costmodel
- import (
- "bytes"
- "context"
- "encoding/csv"
- "io"
- "sort"
- "strconv"
- "time"
- "github.com/pkg/errors"
- "github.com/opencost/opencost/pkg/kubecost"
- "github.com/opencost/opencost/pkg/log"
- )
- type CloudStorage interface {
- Write(name string, data []byte) error
- Read(name string) ([]byte, error)
- Exists(name string) (bool, error)
- }
- type AllocationModel interface {
- ComputeAllocation(start, end time.Time, resolution time.Duration) (*kubecost.AllocationSet, error)
- DateRange() (time.Time, time.Time, error)
- }
- var errNoData = errors.New("no data")
- // UpdateCSVWorker launches a worker that updates CSV file in cloud storage with allocation data
- // It updates data immediately on launch and then runs every day at 00:10 UTC
- // It expected to run a goroutine
- func UpdateCSVWorker(ctx context.Context, storage CloudStorage, model AllocationModel, path string) error {
- // perform first update immediately
- nextRunAt := time.Now()
- for {
- select {
- case <-ctx.Done():
- return ctx.Err()
- case <-time.After(nextRunAt.Sub(time.Now())):
- log.Infof("Updating CSV file: %s", path)
- err := UpdateCSV(ctx, storage, model, path)
- if err != nil {
- // it's background worker, log error and carry on, maybe next time it will work
- log.Errorf("Error updating CSV: %s", err)
- }
- now := time.Now().UTC()
- // next launch is at 00:10 UTC tomorrow
- // extra 10 minutes is to let prometheus to collect all the data for the previous day
- nextRunAt = time.Date(now.Year(), now.Month(), now.Day(), 0, 10, 0, 0, now.Location()).AddDate(0, 0, 1)
- }
- }
- }
- func UpdateCSV(ctx context.Context, storage CloudStorage, model AllocationModel, path string) error {
- exporter := &csvExporter{
- Storage: storage,
- Model: model,
- FilePath: path,
- }
- return exporter.Update(ctx)
- }
- type csvExporter struct {
- Storage CloudStorage
- Model AllocationModel
- FilePath string
- }
- // Update updates CSV file in cloud storage with new allocation data
- // TODO: currently everything is processed in memory, it might be a problem for large clusters
- // it's possible to switch to temporary files, but it will require upgrading all storage provider to work with files
- func (e *csvExporter) Update(ctx context.Context) error {
- allocationDates, err := e.availableAllocationDates()
- if err != nil {
- return err
- }
- if len(allocationDates) == 0 {
- return errors.New("no data to export from prometheus")
- }
- exist, err := e.Storage.Exists(e.FilePath)
- if err != nil {
- return err
- }
- var result []byte
- // cloud storage doesn't have an existing file
- // dump all the data exist to the file
- if !exist {
- result, err = e.allocationsToCSV(ctx, mapTimeToSlice(allocationDates))
- if err != nil {
- return err
- }
- }
- // existing export file exists
- // scan through it and ignore all dates that are already in the file
- // avoid modifying existing data or producing duplicates
- if exist {
- previousExport, err := e.Storage.Read(e.FilePath)
- if err != nil {
- return err
- }
- csvDates, err := e.loadDates(previousExport)
- if err != nil {
- return err
- }
- for date := range csvDates {
- delete(allocationDates, date)
- }
- if len(allocationDates) == 0 {
- log.Info("export file in cloud storage already contain data for all dates, skipping update")
- return nil
- }
- dateExport, err := e.allocationsToCSV(ctx, mapTimeToSlice(allocationDates))
- if err != nil {
- return err
- }
- result, err = mergeCSV([][]byte{previousExport, dateExport})
- if err != nil {
- return err
- }
- }
- err = e.Storage.Write(e.FilePath, result)
- if err != nil {
- return err
- }
- log.Infof("Updated CSV file %s", e.FilePath)
- return nil
- }
- func (e *csvExporter) availableAllocationDates() (map[time.Time]struct{}, error) {
- start, end, err := e.Model.DateRange()
- if err != nil {
- return nil, err
- }
- if start != time.Date(start.Year(), start.Month(), start.Day(), 0, 0, 0, 0, time.UTC) {
- // start doesn't start from 00:00 UTC, it could be truncated by prometheus retention policy
- // skip incomplete data and begin from the day after, otherwise it may corrupt existing data
- start = time.Date(start.Year(), start.Month(), start.Day(), 0, 0, 0, 0, time.UTC).AddDate(0, 0, 1)
- }
- end = time.Date(end.Year(), end.Month(), end.Day(), 0, 0, 0, 0, time.UTC)
- dates := make(map[time.Time]struct{})
- for date := start; date.Before(end); date = date.AddDate(0, 0, 1) {
- dates[date] = struct{}{}
- }
- if len(dates) == 0 {
- return nil, errNoData
- }
- return dates, nil
- }
- func (e *csvExporter) allocationsToCSV(ctx context.Context, dates []time.Time) ([]byte, error) {
- buf := new(bytes.Buffer)
- err := e.writeCSVToWriter(ctx, buf, dates)
- if err != nil {
- return nil, err
- }
- return buf.Bytes(), nil
- }
- func (e *csvExporter) writeCSVToWriter(ctx context.Context, w io.Writer, dates []time.Time) error {
- fmtFloat := func(f float64) string {
- return strconv.FormatFloat(f, 'f', -1, 64)
- }
- csvWriter := csv.NewWriter(w)
- // TODO: confirm columns we want to export
- err := csvWriter.Write([]string{
- "Date",
- "Namespace",
- "ControllerKind",
- "ControllerName",
- "Pod",
- "Container",
- "CPUCoreUsageAverage",
- "CPUCoreRequestAverage",
- "RAMBytesUsageAverage",
- "RAMBytesRequestAverage",
- "NetworkReceiveBytes",
- "NetworkTransferBytes",
- "GPUs",
- "PVBytes",
- "CPUCost",
- "RAMCost",
- "NetworkCost",
- "PVCost",
- "GPUCost",
- "TotalCost",
- })
- if err != nil {
- return err
- }
- lines := 0
- for _, date := range dates {
- start := time.Date(date.Year(), date.Month(), date.Day(), 0, 0, 0, 0, time.UTC)
- end := start.AddDate(0, 0, 1)
- data, err := e.Model.ComputeAllocation(start, end, 5*time.Minute)
- if err != nil {
- return err
- }
- log.Infof("fetched %d records for %s", len(data.Allocations), date.Format("2006-01-02"))
- // TODO: does it need to be aggregated by namespace+controller?
- // container-level information can be too noisy for most users
- for _, alloc := range data.Allocations {
- if err := ctx.Err(); err != nil {
- return err
- }
- log.Infof("%f", alloc.TotalCost())
- err := csvWriter.Write([]string{
- date.Format("2006-01-02"),
- alloc.Properties.Namespace,
- alloc.Properties.ControllerKind,
- alloc.Properties.Controller,
- alloc.Properties.Pod,
- alloc.Properties.Container,
- fmtFloat(alloc.CPUCoreUsageAverage),
- fmtFloat(alloc.CPUCoreRequestAverage),
- fmtFloat(alloc.RAMBytesUsageAverage),
- fmtFloat(alloc.RAMBytesRequestAverage),
- fmtFloat(alloc.NetworkReceiveBytes),
- fmtFloat(alloc.NetworkTransferBytes),
- fmtFloat(alloc.GPUs()),
- fmtFloat(alloc.PVBytes()),
- fmtFloat(alloc.CPUTotalCost()),
- fmtFloat(alloc.RAMTotalCost()),
- fmtFloat(alloc.NetworkTotalCost()),
- fmtFloat(alloc.PVCost()),
- fmtFloat(alloc.GPUCost),
- fmtFloat(alloc.TotalCost()),
- })
- if err != nil {
- return err
- }
- lines++
- }
- }
- if lines == 0 {
- return errNoData
- }
- csvWriter.Flush()
- if err := csvWriter.Error(); err != nil {
- return err
- }
- log.Infof("exported %d lines", lines)
- return nil
- }
- // loadDate scans through CSV export file and extract all dates from "Date" column
- func (e *csvExporter) loadDates(csvFile []byte) (map[time.Time]struct{}, error) {
- csvReader := csv.NewReader(bytes.NewReader(csvFile))
- header, err := csvReader.Read()
- if err != nil {
- return nil, errors.Wrap(err, "reading csv header")
- }
- dateColIndex := 0
- for i, col := range header {
- if col == "Date" {
- dateColIndex = i
- break
- }
- }
- dates := make(map[time.Time]struct{})
- for {
- row, err := csvReader.Read()
- if errors.Is(err, io.EOF) {
- break
- }
- if err != nil {
- return nil, errors.Wrap(err, "reading csv row")
- }
- date, err := time.Parse("2006-01-02", row[dateColIndex])
- if err != nil {
- return nil, errors.Wrap(err, "parsing date")
- }
- dates[date] = struct{}{}
- }
- return dates, nil
- }
- // mergeCSV merges multiple csv files into one.
- // Files may have different headers, but the result will have a header that is a union of all headers.
- // The main goal here is to allow changing CSV format without breaking or loosing existing data.
- func mergeCSV(files [][]byte) ([]byte, error) {
- var err error
- headers := make([][]string, 0, len(files))
- csvReaders := make([]*csv.Reader, 0, len(files))
- // first, get information about the result header
- for _, file := range files {
- csvReader := csv.NewReader(bytes.NewReader(file))
- header, err := csvReader.Read()
- if errors.Is(err, io.EOF) {
- // ignore empty files
- continue
- }
- if err != nil {
- return nil, errors.Wrap(err, "reading header of csv file")
- }
- headers = append(headers, header)
- csvReaders = append(csvReaders, csvReader)
- }
- mapping, header := combineHeaders(headers)
- output := new(bytes.Buffer)
- csvWriter := csv.NewWriter(output)
- err = csvWriter.Write(mergeHeaders(headers))
- if err != nil {
- return nil, errors.Wrap(err, "writing header to csv file")
- }
- for csvIndex, csvReader := range csvReaders {
- for {
- inputLine, err := csvReader.Read()
- if errors.Is(err, io.EOF) {
- break
- }
- if err != nil {
- return nil, errors.Wrap(err, "reading csv file line")
- }
- outputLine := make([]string, len(header))
- for colIndex := range header {
- destColIndex, ok := mapping[csvIndex][colIndex]
- if !ok {
- continue
- }
- outputLine[destColIndex] = inputLine[colIndex]
- }
- err = csvWriter.Write(outputLine)
- if err != nil {
- return nil, errors.Wrap(err, "writing line to csv file")
- }
- }
- }
- csvWriter.Flush()
- if csvWriter.Error() != nil {
- return nil, errors.Wrapf(csvWriter.Error(), "flushing csv file")
- }
- return output.Bytes(), nil
- }
- func combineHeaders(headers [][]string) ([]map[int]int, []string) {
- result := make([]string, 0)
- indices := make([]map[int]int, len(headers))
- for i, header := range headers {
- indices[i] = make(map[int]int)
- for j, column := range header {
- if !contains(result, column) {
- result = append(result, column)
- indices[i][j] = len(result) - 1
- } else {
- indices[i][j] = indexOf(result, column)
- }
- }
- }
- return indices, result
- }
- func mergeHeaders(headers [][]string) []string {
- result := make([]string, 0)
- for _, header := range headers {
- for _, column := range header {
- if !contains(result, column) {
- result = append(result, column)
- }
- }
- }
- return result
- }
- func contains(slice []string, item string) bool {
- for _, element := range slice {
- if element == item {
- return true
- }
- }
- return false
- }
- func indexOf(slice []string, element string) int {
- for i, e := range slice {
- if e == element {
- return i
- }
- }
- return -1
- }
- func mapTimeToSlice(data map[time.Time]struct{}) []time.Time {
- result := make([]time.Time, 0, len(data))
- for key := range data {
- result = append(result, key)
- }
- sort.Slice(result, func(i, j int) bool {
- return result[i].Before(result[j])
- })
- return result
- }
|