job.go 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. package kubemodel
  2. import (
  3. "time"
  4. "github.com/opencost/opencost/core/pkg/log"
  5. "github.com/opencost/opencost/core/pkg/model/kubemodel"
  6. "github.com/opencost/opencost/core/pkg/source"
  7. )
  8. func (km *KubeModel) computeJobs(kms *kubemodel.KubeModelSet, start, end time.Time) error {
  9. grp := source.NewQueryGroup()
  10. metrics := km.ds.Metrics()
  11. jobInfoResultFuture := source.WithGroup(grp, metrics.QueryJobInfo(start, end))
  12. jobUptimeResultFuture := source.WithGroup(grp, metrics.QueryJobUptime(start, end))
  13. jobLabelsResultFuture := source.WithGroup(grp, metrics.QueryJobLabels(start, end))
  14. jobAnnotationsResultFuture := source.WithGroup(grp, metrics.QueryJobAnnotations(start, end))
  15. jobMap := make(map[string]*kubemodel.Job)
  16. jobInfoResult, _ := jobInfoResultFuture.Await()
  17. for _, res := range jobInfoResult {
  18. jobMap[res.UID] = &kubemodel.Job{
  19. UID: res.UID,
  20. Name: res.Job,
  21. NamespaceUID: res.NamespaceUID,
  22. }
  23. }
  24. jobUptimeResult, _ := jobUptimeResultFuture.Await()
  25. for _, res := range jobUptimeResult {
  26. job, ok := jobMap[res.UID]
  27. if !ok {
  28. log.Warnf("job with UID '%s' has not been initialized to add uptime", res.UID)
  29. continue
  30. }
  31. s, e := res.GetStartEnd(start, end, km.ds.Resolution())
  32. job.Start = s
  33. job.End = e
  34. }
  35. jobLabelsResult, _ := jobLabelsResultFuture.Await()
  36. for _, res := range jobLabelsResult {
  37. job, ok := jobMap[res.UID]
  38. if !ok {
  39. log.Warnf("job with UID '%s' has not been initialized to add labels", res.UID)
  40. continue
  41. }
  42. job.Labels = res.Labels
  43. }
  44. jobAnnotationsResult, _ := jobAnnotationsResultFuture.Await()
  45. for _, res := range jobAnnotationsResult {
  46. job, ok := jobMap[res.UID]
  47. if !ok {
  48. log.Warnf("job with UID '%s' has not been initialized to add annotations", res.UID)
  49. continue
  50. }
  51. job.Annotations = res.Annotations
  52. }
  53. for _, job := range jobMap {
  54. err := kms.RegisterJob(job)
  55. if err != nil {
  56. log.Warnf("Failed to register job: %s", err.Error())
  57. }
  58. }
  59. return nil
  60. }