clusterexporter.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. package clustercache
  2. import (
  3. "time"
  4. "github.com/kubecost/cost-model/pkg/config"
  5. "github.com/kubecost/cost-model/pkg/log"
  6. "github.com/kubecost/cost-model/pkg/util/atomic"
  7. "github.com/kubecost/cost-model/pkg/util/json"
  8. appsv1 "k8s.io/api/apps/v1"
  9. autoscaling "k8s.io/api/autoscaling/v2beta1"
  10. batchv1 "k8s.io/api/batch/v1"
  11. v1 "k8s.io/api/core/v1"
  12. "k8s.io/api/policy/v1beta1"
  13. stv1 "k8s.io/api/storage/v1"
  14. )
  15. // clusterEncoding is used to represent the cluster objects in the encoded states.
  16. type clusterEncoding struct {
  17. Namespaces []*v1.Namespace `json:"namespaces,omitempty"`
  18. Nodes []*v1.Node `json:"nodes,omitempty"`
  19. Pods []*v1.Pod `json:"pods,omitempty"`
  20. Services []*v1.Service `json:"services,omitempty"`
  21. DaemonSets []*appsv1.DaemonSet `json:"daemonSets,omitempty"`
  22. Deployments []*appsv1.Deployment `json:"deployments,omitempty"`
  23. StatefulSets []*appsv1.StatefulSet `json:"statefulSets,omitempty"`
  24. ReplicaSets []*appsv1.ReplicaSet `json:"replicaSets,omitempty"`
  25. PersistentVolumes []*v1.PersistentVolume `json:"persistentVolumes,omitempty"`
  26. PersistentVolumeClaims []*v1.PersistentVolumeClaim `json:"persistentVolumeClaims,omitempty"`
  27. StorageClasses []*stv1.StorageClass `json:"storageClasses,omitempty"`
  28. Jobs []*batchv1.Job `json:"jobs,omitempty"`
  29. HorizontalPodAutoscalers []*autoscaling.HorizontalPodAutoscaler `json:"horizontalPodAutoscalers,omitempty"`
  30. PodDisruptionBudgets []*v1beta1.PodDisruptionBudget `json:"podDisruptionBudgets,omitEmpty"`
  31. }
  32. // ClusterExporter manages and runs an file export process which dumps the local kubernetes cluster to a target location.
  33. type ClusterExporter struct {
  34. cluster ClusterCache
  35. target *config.ConfigFile
  36. interval time.Duration
  37. runState atomic.AtomicRunState
  38. }
  39. // NewClusterExporter creates a new ClusterExporter instance for exporting the kubernetes cluster.
  40. func NewClusterExporter(cluster ClusterCache, target *config.ConfigFile, interval time.Duration) *ClusterExporter {
  41. return &ClusterExporter{
  42. cluster: cluster,
  43. target: target,
  44. interval: interval,
  45. }
  46. }
  47. // Run starts the automated process of running Export on a specific interval.
  48. func (ce *ClusterExporter) Run() {
  49. // in the event there is a race that occurs between Run() and Stop(), we
  50. // ensure that we wait for the reset to occur before starting again
  51. ce.runState.WaitForReset()
  52. if !ce.runState.Start() {
  53. log.Warningf("ClusterExporter already running")
  54. return
  55. }
  56. go func() {
  57. for {
  58. err := ce.Export()
  59. if err != nil {
  60. log.Warningf("Failed to export cluster: %s", err)
  61. }
  62. select {
  63. case <-time.After(ce.interval):
  64. case <-ce.runState.OnStop():
  65. ce.runState.Reset()
  66. return
  67. }
  68. }
  69. }()
  70. }
  71. // Stop halts the Cluster export on an interval
  72. func (ce *ClusterExporter) Stop() {
  73. ce.runState.Stop()
  74. }
  75. // Export stores the cluster cache data into a PODO, marshals as JSON, and saves it to the
  76. // target location.
  77. func (ce *ClusterExporter) Export() error {
  78. c := ce.cluster
  79. encoding := &clusterEncoding{
  80. Namespaces: c.GetAllNamespaces(),
  81. Nodes: c.GetAllNodes(),
  82. Pods: c.GetAllPods(),
  83. Services: c.GetAllServices(),
  84. DaemonSets: c.GetAllDaemonSets(),
  85. Deployments: c.GetAllDeployments(),
  86. StatefulSets: c.GetAllStatefulSets(),
  87. ReplicaSets: c.GetAllReplicaSets(),
  88. PersistentVolumes: c.GetAllPersistentVolumes(),
  89. PersistentVolumeClaims: c.GetAllPersistentVolumeClaims(),
  90. StorageClasses: c.GetAllStorageClasses(),
  91. Jobs: c.GetAllJobs(),
  92. HorizontalPodAutoscalers: c.GetAllHorizontalPodAutoscalers(),
  93. PodDisruptionBudgets: c.GetAllPodDisruptionBudgets(),
  94. }
  95. data, err := json.Marshal(encoding)
  96. if err != nil {
  97. return err
  98. }
  99. return ce.target.Write(data)
  100. }