clusterexporter.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. package clustercache
  2. import (
  3. "time"
  4. "github.com/opencost/opencost/core/pkg/log"
  5. "github.com/opencost/opencost/core/pkg/util/atomic"
  6. "github.com/opencost/opencost/core/pkg/util/json"
  7. "github.com/opencost/opencost/pkg/config"
  8. appsv1 "k8s.io/api/apps/v1"
  9. batchv1 "k8s.io/api/batch/v1"
  10. v1 "k8s.io/api/core/v1"
  11. policyv1 "k8s.io/api/policy/v1"
  12. stv1 "k8s.io/api/storage/v1"
  13. )
  14. // clusterEncoding is used to represent the cluster objects in the encoded states.
  15. type clusterEncoding struct {
  16. Namespaces []*v1.Namespace `json:"namespaces,omitempty"`
  17. Nodes []*v1.Node `json:"nodes,omitempty"`
  18. Pods []*v1.Pod `json:"pods,omitempty"`
  19. Services []*v1.Service `json:"services,omitempty"`
  20. DaemonSets []*appsv1.DaemonSet `json:"daemonSets,omitempty"`
  21. Deployments []*appsv1.Deployment `json:"deployments,omitempty"`
  22. StatefulSets []*appsv1.StatefulSet `json:"statefulSets,omitempty"`
  23. ReplicaSets []*appsv1.ReplicaSet `json:"replicaSets,omitempty"`
  24. PersistentVolumes []*v1.PersistentVolume `json:"persistentVolumes,omitempty"`
  25. PersistentVolumeClaims []*v1.PersistentVolumeClaim `json:"persistentVolumeClaims,omitempty"`
  26. StorageClasses []*stv1.StorageClass `json:"storageClasses,omitempty"`
  27. Jobs []*batchv1.Job `json:"jobs,omitempty"`
  28. PodDisruptionBudgets []*policyv1.PodDisruptionBudget `json:"podDisruptionBudgets,omitempty"`
  29. ReplicationControllers []*v1.ReplicationController `json:"replicationController,omitempty"`
  30. }
  31. // ClusterExporter manages and runs an file export process which dumps the local kubernetes cluster to a target location.
  32. type ClusterExporter struct {
  33. cluster ClusterCache
  34. target *config.ConfigFile
  35. interval time.Duration
  36. runState atomic.AtomicRunState
  37. }
  38. // NewClusterExporter creates a new ClusterExporter instance for exporting the kubernetes cluster.
  39. func NewClusterExporter(cluster ClusterCache, target *config.ConfigFile, interval time.Duration) *ClusterExporter {
  40. return &ClusterExporter{
  41. cluster: cluster,
  42. target: target,
  43. interval: interval,
  44. }
  45. }
  46. // Run starts the automated process of running Export on a specific interval.
  47. func (ce *ClusterExporter) Run() {
  48. // in the event there is a race that occurs between Run() and Stop(), we
  49. // ensure that we wait for the reset to occur before starting again
  50. ce.runState.WaitForReset()
  51. if !ce.runState.Start() {
  52. log.Warnf("ClusterExporter already running")
  53. return
  54. }
  55. go func() {
  56. for {
  57. err := ce.Export()
  58. if err != nil {
  59. log.Warnf("Failed to export cluster: %s", err)
  60. }
  61. select {
  62. case <-time.After(ce.interval):
  63. case <-ce.runState.OnStop():
  64. ce.runState.Reset()
  65. return
  66. }
  67. }
  68. }()
  69. }
  70. // Stop halts the Cluster export on an interval
  71. func (ce *ClusterExporter) Stop() {
  72. ce.runState.Stop()
  73. }
  74. // Export stores the cluster cache data into a PODO, marshals as JSON, and saves it to the
  75. // target location.
  76. func (ce *ClusterExporter) Export() error {
  77. c := ce.cluster
  78. encoding := &clusterEncoding{
  79. Namespaces: c.GetAllNamespaces(),
  80. Nodes: c.GetAllNodes(),
  81. Pods: c.GetAllPods(),
  82. Services: c.GetAllServices(),
  83. DaemonSets: c.GetAllDaemonSets(),
  84. Deployments: c.GetAllDeployments(),
  85. StatefulSets: c.GetAllStatefulSets(),
  86. ReplicaSets: c.GetAllReplicaSets(),
  87. PersistentVolumes: c.GetAllPersistentVolumes(),
  88. PersistentVolumeClaims: c.GetAllPersistentVolumeClaims(),
  89. StorageClasses: c.GetAllStorageClasses(),
  90. Jobs: c.GetAllJobs(),
  91. PodDisruptionBudgets: c.GetAllPodDisruptionBudgets(),
  92. ReplicationControllers: c.GetAllReplicationControllers(),
  93. }
  94. data, err := json.Marshal(encoding)
  95. if err != nil {
  96. return err
  97. }
  98. return ce.target.Write(data)
  99. }