clusterexporter.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. package clustercache
  2. import (
  3. "time"
  4. "github.com/opencost/opencost/core/pkg/log"
  5. "github.com/opencost/opencost/core/pkg/util/atomic"
  6. "github.com/opencost/opencost/core/pkg/util/json"
  7. "github.com/opencost/opencost/pkg/config"
  8. )
  9. // clusterEncoding is used to represent the cluster objects in the encoded states.
  10. type clusterEncoding struct {
  11. Namespaces []*Namespace `json:"namespaces,omitempty"`
  12. Nodes []*Node `json:"nodes,omitempty"`
  13. Pods []*Pod `json:"pods,omitempty"`
  14. Services []*Service `json:"services,omitempty"`
  15. DaemonSets []*DaemonSet `json:"daemonSets,omitempty"`
  16. Deployments []*Deployment `json:"deployments,omitempty"`
  17. StatefulSets []*StatefulSet `json:"statefulSets,omitempty"`
  18. ReplicaSets []*ReplicaSet `json:"replicaSets,omitempty"`
  19. PersistentVolumes []*PersistentVolume `json:"persistentVolumes,omitempty"`
  20. PersistentVolumeClaims []*PersistentVolumeClaim `json:"persistentVolumeClaims,omitempty"`
  21. StorageClasses []*StorageClass `json:"storageClasses,omitempty"`
  22. Jobs []*Job `json:"jobs,omitempty"`
  23. PodDisruptionBudgets []*PodDisruptionBudget `json:"podDisruptionBudgets,omitempty"`
  24. ReplicationControllers []*ReplicationController `json:"replicationController,omitempty"`
  25. }
  26. // ClusterExporter manages and runs an file export process which dumps the local kubernetes cluster to a target location.
  27. type ClusterExporter struct {
  28. cluster ClusterCache
  29. target *config.ConfigFile
  30. interval time.Duration
  31. runState atomic.AtomicRunState
  32. }
  33. // NewClusterExporter creates a new ClusterExporter instance for exporting the kubernetes cluster.
  34. func NewClusterExporter(cluster ClusterCache, target *config.ConfigFile, interval time.Duration) *ClusterExporter {
  35. return &ClusterExporter{
  36. cluster: cluster,
  37. target: target,
  38. interval: interval,
  39. }
  40. }
  41. // Run starts the automated process of running Export on a specific interval.
  42. func (ce *ClusterExporter) Run() {
  43. // in the event there is a race that occurs between Run() and Stop(), we
  44. // ensure that we wait for the reset to occur before starting again
  45. ce.runState.WaitForReset()
  46. if !ce.runState.Start() {
  47. log.Warnf("ClusterExporter already running")
  48. return
  49. }
  50. go func() {
  51. for {
  52. err := ce.Export()
  53. if err != nil {
  54. log.Warnf("Failed to export cluster: %s", err)
  55. }
  56. select {
  57. case <-time.After(ce.interval):
  58. case <-ce.runState.OnStop():
  59. ce.runState.Reset()
  60. return
  61. }
  62. }
  63. }()
  64. }
  65. // Stop halts the Cluster export on an interval
  66. func (ce *ClusterExporter) Stop() {
  67. ce.runState.Stop()
  68. }
  69. // Export stores the cluster cache data into a PODO, marshals as JSON, and saves it to the
  70. // target location.
  71. func (ce *ClusterExporter) Export() error {
  72. c := ce.cluster
  73. encoding := &clusterEncoding{
  74. Namespaces: c.GetAllNamespaces(),
  75. Nodes: c.GetAllNodes(),
  76. Pods: c.GetAllPods(),
  77. Services: c.GetAllServices(),
  78. DaemonSets: c.GetAllDaemonSets(),
  79. Deployments: c.GetAllDeployments(),
  80. StatefulSets: c.GetAllStatefulSets(),
  81. ReplicaSets: c.GetAllReplicaSets(),
  82. PersistentVolumes: c.GetAllPersistentVolumes(),
  83. PersistentVolumeClaims: c.GetAllPersistentVolumeClaims(),
  84. StorageClasses: c.GetAllStorageClasses(),
  85. Jobs: c.GetAllJobs(),
  86. PodDisruptionBudgets: c.GetAllPodDisruptionBudgets(),
  87. ReplicationControllers: c.GetAllReplicationControllers(),
  88. }
  89. data, err := json.Marshal(encoding)
  90. if err != nil {
  91. return err
  92. }
  93. return ce.target.Write(data)
  94. }