clusterexporter.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. package clustercache
  2. import (
  3. "time"
  4. "github.com/kubecost/cost-model/pkg/config"
  5. "github.com/kubecost/cost-model/pkg/log"
  6. "github.com/kubecost/cost-model/pkg/util/atomic"
  7. "github.com/kubecost/cost-model/pkg/util/json"
  8. appsv1 "k8s.io/api/apps/v1"
  9. autoscaling "k8s.io/api/autoscaling/v2beta1"
  10. batchv1 "k8s.io/api/batch/v1"
  11. v1 "k8s.io/api/core/v1"
  12. "k8s.io/api/policy/v1beta1"
  13. stv1 "k8s.io/api/storage/v1"
  14. )
  15. // clusterEncoding is used to represent the cluster objects in the encoded states.
  16. type clusterEncoding struct {
  17. Namespaces []*v1.Namespace `json:"namespaces,omitempty"`
  18. Nodes []*v1.Node `json:"nodes,omitempty"`
  19. Pods []*v1.Pod `json:"pods,omitempty"`
  20. Services []*v1.Service `json:"services,omitempty"`
  21. DaemonSets []*appsv1.DaemonSet `json:"daemonSets,omitempty"`
  22. Deployments []*appsv1.Deployment `json:"deployments,omitempty"`
  23. StatefulSets []*appsv1.StatefulSet `json:"statefulSets,omitempty"`
  24. ReplicaSets []*appsv1.ReplicaSet `json:"replicaSets,omitempty"`
  25. PersistentVolumes []*v1.PersistentVolume `json:"persistentVolumes,omitempty"`
  26. PersistentVolumeClaims []*v1.PersistentVolumeClaim `json:"persistentVolumeClaims,omitempty"`
  27. StorageClasses []*stv1.StorageClass `json:"storageClasses,omitempty"`
  28. Jobs []*batchv1.Job `json:"jobs,omitempty"`
  29. HorizontalPodAutoscalers []*autoscaling.HorizontalPodAutoscaler `json:"horizontalPodAutoscalers,omitempty"`
  30. PodDisruptionBudgets []*v1beta1.PodDisruptionBudget `json:"podDisruptionBudgets,omitEmpty"`
  31. ReplicationControllers []*v1.ReplicationController `json:"replicationController,omitEmpty"`
  32. }
  33. // ClusterExporter manages and runs an file export process which dumps the local kubernetes cluster to a target location.
  34. type ClusterExporter struct {
  35. cluster ClusterCache
  36. target *config.ConfigFile
  37. interval time.Duration
  38. runState atomic.AtomicRunState
  39. }
  40. // NewClusterExporter creates a new ClusterExporter instance for exporting the kubernetes cluster.
  41. func NewClusterExporter(cluster ClusterCache, target *config.ConfigFile, interval time.Duration) *ClusterExporter {
  42. return &ClusterExporter{
  43. cluster: cluster,
  44. target: target,
  45. interval: interval,
  46. }
  47. }
  48. // Run starts the automated process of running Export on a specific interval.
  49. func (ce *ClusterExporter) Run() {
  50. // in the event there is a race that occurs between Run() and Stop(), we
  51. // ensure that we wait for the reset to occur before starting again
  52. ce.runState.WaitForReset()
  53. if !ce.runState.Start() {
  54. log.Warningf("ClusterExporter already running")
  55. return
  56. }
  57. go func() {
  58. for {
  59. err := ce.Export()
  60. if err != nil {
  61. log.Warningf("Failed to export cluster: %s", err)
  62. }
  63. select {
  64. case <-time.After(ce.interval):
  65. case <-ce.runState.OnStop():
  66. ce.runState.Reset()
  67. return
  68. }
  69. }
  70. }()
  71. }
  72. // Stop halts the Cluster export on an interval
  73. func (ce *ClusterExporter) Stop() {
  74. ce.runState.Stop()
  75. }
  76. // Export stores the cluster cache data into a PODO, marshals as JSON, and saves it to the
  77. // target location.
  78. func (ce *ClusterExporter) Export() error {
  79. c := ce.cluster
  80. encoding := &clusterEncoding{
  81. Namespaces: c.GetAllNamespaces(),
  82. Nodes: c.GetAllNodes(),
  83. Pods: c.GetAllPods(),
  84. Services: c.GetAllServices(),
  85. DaemonSets: c.GetAllDaemonSets(),
  86. Deployments: c.GetAllDeployments(),
  87. StatefulSets: c.GetAllStatefulSets(),
  88. ReplicaSets: c.GetAllReplicaSets(),
  89. PersistentVolumes: c.GetAllPersistentVolumes(),
  90. PersistentVolumeClaims: c.GetAllPersistentVolumeClaims(),
  91. StorageClasses: c.GetAllStorageClasses(),
  92. Jobs: c.GetAllJobs(),
  93. HorizontalPodAutoscalers: c.GetAllHorizontalPodAutoscalers(),
  94. PodDisruptionBudgets: c.GetAllPodDisruptionBudgets(),
  95. ReplicationControllers: c.GetAllReplicationControllers(),
  96. }
  97. data, err := json.Marshal(encoding)
  98. if err != nil {
  99. return err
  100. }
  101. return ce.target.Write(data)
  102. }