allocation_csv.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297
  1. package costmodel
  2. import (
  3. "context"
  4. "encoding/csv"
  5. "io"
  6. "os"
  7. "strconv"
  8. "time"
  9. "github.com/pkg/errors"
  10. "github.com/opencost/opencost/pkg/kubecost"
  11. "github.com/opencost/opencost/pkg/log"
  12. )
  13. type CloudStorage interface {
  14. FileReplace(ctx context.Context, f *os.File, path string) error
  15. FileDownload(ctx context.Context, path string) (*os.File, error)
  16. FileExists(ctx context.Context, path string) (bool, error)
  17. }
  18. type AllocationModel interface {
  19. ComputeAllocation(start, end time.Time, resolution time.Duration) (*kubecost.AllocationSet, error)
  20. DateRange(ctx context.Context) (time.Time, time.Time, error)
  21. }
  22. // UpdateCSVWorker launches a worker that updates CSV file in cloud storage with allocation data
  23. // It updates data immediately on launch and then runs every day at 00:10 UTC
  24. // It expected to run a goroutine
  25. func UpdateCSVWorker(ctx context.Context, storage CloudStorage, model AllocationModel, path string) error {
  26. // perform first update immediately
  27. nextRunAt := time.Now()
  28. for {
  29. select {
  30. case <-ctx.Done():
  31. return ctx.Err()
  32. case <-time.After(nextRunAt.Sub(time.Now())):
  33. dayBefore := time.Date(nextRunAt.Year(), nextRunAt.Month(), nextRunAt.Day(), 0, 0, 0, 0, time.UTC).AddDate(0, 0, -1)
  34. err := UpdateCSV(ctx, storage, model, path, dayBefore)
  35. if err != nil {
  36. // it's background worker, log error and carry on, maybe next time it will work
  37. log.Errorf("Error updating CSV: %s", err)
  38. }
  39. now := time.Now()
  40. // next launch is at 00:10 UTC tomorrow
  41. // extra 10 minutes is to let prometheus to collect all the data for the previous day
  42. nextRunAt = time.Date(now.Year(), now.Month(), now.Day(), 0, 10, 0, 0, time.UTC).AddDate(0, 0, 1)
  43. }
  44. }
  45. }
  46. func UpdateCSV(ctx context.Context, storage CloudStorage, model AllocationModel, path string, date time.Time) error {
  47. exporter := &csvExporter{
  48. Storage: storage,
  49. Model: model,
  50. FilePath: path,
  51. }
  52. return exporter.Update(ctx, date)
  53. }
  54. type csvExporter struct {
  55. Storage CloudStorage
  56. Model AllocationModel
  57. FilePath string
  58. }
  59. // TODO: logging
  60. func (e *csvExporter) Update(ctx context.Context, date time.Time) error {
  61. exist, err := e.Storage.FileExists(ctx, e.FilePath)
  62. if err != nil {
  63. return err
  64. }
  65. dateExport, err := e.writeCSVToFile(ctx, date)
  66. if err != nil {
  67. return err
  68. }
  69. defer dateExport.Close()
  70. var result *os.File
  71. if exist {
  72. // merge existing file with new data
  73. previousExport, err := e.Storage.FileDownload(ctx, e.FilePath)
  74. if err != nil {
  75. return err
  76. }
  77. defer previousExport.Close()
  78. result, err = os.CreateTemp("", "cost-model-*.csv")
  79. if err != nil {
  80. return errors.Wrap(err, "creating temp file")
  81. }
  82. err = mergeCSV([]*os.File{previousExport, dateExport}, result)
  83. if err != nil {
  84. return err
  85. }
  86. } else {
  87. // no existing file, create a new one
  88. result = dateExport
  89. }
  90. // we just finished writing to the file, so we need to seek to the beginning, so we can read from it
  91. _, err = result.Seek(0, io.SeekStart)
  92. if err != nil {
  93. return errors.Wrap(err, "seeking to the beginning of the csv file")
  94. }
  95. err = e.Storage.FileReplace(ctx, result, e.FilePath)
  96. if err != nil {
  97. return err
  98. }
  99. return nil
  100. }
  101. func (e *csvExporter) writeCSVToFile(ctx context.Context, date time.Time) (*os.File, error) {
  102. f, err := os.CreateTemp("", "cost-model-*.csv")
  103. if err != nil {
  104. return nil, errors.Wrap(err, "creating temp file")
  105. }
  106. err = e.writeCSVToWriter(ctx, f, date)
  107. if err != nil {
  108. return nil, err
  109. }
  110. return f, nil
  111. }
  112. func (e *csvExporter) writeCSVToWriter(ctx context.Context, w io.Writer, date time.Time) error {
  113. start := time.Date(date.Year(), date.Month(), date.Day(), 0, 0, 0, 0, time.UTC)
  114. end := start.AddDate(0, 1, 0)
  115. data, err := e.Model.ComputeAllocation(start, end, 5*time.Minute)
  116. if err != nil {
  117. return err
  118. }
  119. log.Infof("data: %d", len(data.Allocations))
  120. csvWriter := csv.NewWriter(w)
  121. // TODO: confirm columns we want to export
  122. err = csvWriter.Write([]string{
  123. "Date",
  124. "Name",
  125. "CPUCoreUsageAverage",
  126. "CPUCoreRequestAverage",
  127. "CPUCost",
  128. "RAMBytesUsageAverage",
  129. "RAMBytesRequestAverage",
  130. "RAMCost",
  131. })
  132. if err != nil {
  133. return err
  134. }
  135. for _, alloc := range data.Allocations {
  136. if err := ctx.Err(); err != nil {
  137. return err
  138. }
  139. err := csvWriter.Write([]string{
  140. date.Format("2006-01-02"),
  141. alloc.Name,
  142. fmtFloat(alloc.CPUCoreUsageAverage),
  143. fmtFloat(alloc.CPUCoreRequestAverage),
  144. fmtFloat(alloc.CPUCost),
  145. fmtFloat(alloc.RAMBytesUsageAverage),
  146. fmtFloat(alloc.RAMBytesRequestAverage),
  147. fmtFloat(alloc.RAMCost),
  148. })
  149. if err != nil {
  150. return err
  151. }
  152. }
  153. csvWriter.Flush()
  154. if err := csvWriter.Error(); err != nil {
  155. return err
  156. }
  157. return nil
  158. }
  159. func fmtFloat(f float64) string {
  160. return strconv.FormatFloat(f, 'f', -1, 64)
  161. }
  162. // mergeCSV merges multiple csv files into one.
  163. // Files may have different headers, but the result will have a header that is a union of all headers.
  164. // The main goal here is to allow changing CSV format without breaking or loosing existing data.
  165. func mergeCSV(files []*os.File, output *os.File) error {
  166. var err error
  167. headers := make([][]string, 0, len(files))
  168. csvReaders := make([]*csv.Reader, 0, len(files))
  169. // first, get information about the result header
  170. for _, file := range files {
  171. if _, err := file.Seek(0, io.SeekStart); err != nil {
  172. return errors.Wrapf(err, "seeking to start of %s", file.Name())
  173. }
  174. csvReader := csv.NewReader(file)
  175. header, err := csvReader.Read()
  176. if errors.Is(err, io.EOF) {
  177. // ignore empty files
  178. continue
  179. }
  180. if err != nil {
  181. return errors.Wrapf(err, "reading header of %s", file.Name())
  182. }
  183. headers = append(headers, header)
  184. csvReaders = append(csvReaders, csvReader)
  185. }
  186. mapping, header := combineHeaders(headers)
  187. csvWriter := csv.NewWriter(output)
  188. err = csvWriter.Write(mergeHeaders(headers))
  189. if err != nil {
  190. return errors.Wrapf(err, "writing header to %s", output.Name())
  191. }
  192. for csvIndex, csvReader := range csvReaders {
  193. for {
  194. inputLine, err := csvReader.Read()
  195. if errors.Is(err, io.EOF) {
  196. break
  197. }
  198. if err != nil {
  199. return errors.Wrap(err, "reading csv file line")
  200. }
  201. outputLine := make([]string, len(header))
  202. for colIndex := range header {
  203. destColIndex, ok := mapping[csvIndex][colIndex]
  204. if !ok {
  205. continue
  206. }
  207. outputLine[destColIndex] = inputLine[colIndex]
  208. }
  209. err = csvWriter.Write(outputLine)
  210. if err != nil {
  211. return errors.Wrapf(err, "writing line to %s", output.Name())
  212. }
  213. }
  214. }
  215. csvWriter.Flush()
  216. _, err = output.Seek(0, io.SeekStart)
  217. if err != nil {
  218. return errors.Wrapf(err, "seeking to start of %s", output.Name())
  219. }
  220. return nil
  221. }
  222. func combineHeaders(headers [][]string) ([]map[int]int, []string) {
  223. result := make([]string, 0)
  224. indices := make([]map[int]int, len(headers))
  225. for i, header := range headers {
  226. indices[i] = make(map[int]int)
  227. for j, column := range header {
  228. if !contains(result, column) {
  229. result = append(result, column)
  230. indices[i][j] = len(result) - 1
  231. } else {
  232. indices[i][j] = indexOf(result, column)
  233. }
  234. }
  235. }
  236. return indices, result
  237. }
  238. func mergeHeaders(headers [][]string) []string {
  239. result := make([]string, 0)
  240. for _, header := range headers {
  241. for _, column := range header {
  242. if !contains(result, column) {
  243. result = append(result, column)
  244. }
  245. }
  246. }
  247. return result
  248. }
  249. func contains(slice []string, item string) bool {
  250. for _, element := range slice {
  251. if element == item {
  252. return true
  253. }
  254. }
  255. return false
  256. }
  257. func indexOf(slice []string, element string) int {
  258. for i, e := range slice {
  259. if e == element {
  260. return i
  261. }
  262. }
  263. return -1
  264. }