clusterexporter.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. package clustercache
  2. import (
  3. "time"
  4. cc "github.com/opencost/opencost/core/pkg/clustercache"
  5. "github.com/opencost/opencost/core/pkg/log"
  6. "github.com/opencost/opencost/core/pkg/util/atomic"
  7. "github.com/opencost/opencost/core/pkg/util/json"
  8. "github.com/opencost/opencost/pkg/config"
  9. )
  10. // clusterEncoding is used to represent the cluster objects in the encoded states.
  11. type clusterEncoding struct {
  12. Namespaces []*cc.Namespace `json:"namespaces,omitempty"`
  13. Nodes []*cc.Node `json:"nodes,omitempty"`
  14. Pods []*cc.Pod `json:"pods,omitempty"`
  15. Services []*cc.Service `json:"services,omitempty"`
  16. DaemonSets []*cc.DaemonSet `json:"daemonSets,omitempty"`
  17. Deployments []*cc.Deployment `json:"deployments,omitempty"`
  18. StatefulSets []*cc.StatefulSet `json:"statefulSets,omitempty"`
  19. ReplicaSets []*cc.ReplicaSet `json:"replicaSets,omitempty"`
  20. PersistentVolumes []*cc.PersistentVolume `json:"persistentVolumes,omitempty"`
  21. PersistentVolumeClaims []*cc.PersistentVolumeClaim `json:"persistentVolumeClaims,omitempty"`
  22. StorageClasses []*cc.StorageClass `json:"storageClasses,omitempty"`
  23. Jobs []*cc.Job `json:"jobs,omitempty"`
  24. PodDisruptionBudgets []*cc.PodDisruptionBudget `json:"podDisruptionBudgets,omitempty"`
  25. ReplicationControllers []*cc.ReplicationController `json:"replicationController,omitempty"`
  26. ResourceQuotas []*cc.ResourceQuota `json:"resourceQuotas,omitempty"`
  27. }
  28. // ClusterExporter manages and runs an file export process which dumps the local kubernetes cluster to a target location.
  29. type ClusterExporter struct {
  30. cluster cc.ClusterCache
  31. target *config.ConfigFile
  32. interval time.Duration
  33. runState atomic.AtomicRunState
  34. }
  35. // NewClusterExporter creates a new ClusterExporter instance for exporting the kubernetes cluster.
  36. func NewClusterExporter(cluster cc.ClusterCache, target *config.ConfigFile, interval time.Duration) *ClusterExporter {
  37. return &ClusterExporter{
  38. cluster: cluster,
  39. target: target,
  40. interval: interval,
  41. }
  42. }
  43. // Run starts the automated process of running Export on a specific interval.
  44. func (ce *ClusterExporter) Run() {
  45. // in the event there is a race that occurs between Run() and Stop(), we
  46. // ensure that we wait for the reset to occur before starting again
  47. ce.runState.WaitForReset()
  48. if !ce.runState.Start() {
  49. log.Warnf("ClusterExporter already running")
  50. return
  51. }
  52. go func() {
  53. for {
  54. err := ce.Export()
  55. if err != nil {
  56. log.Warnf("Failed to export cluster: %s", err)
  57. }
  58. select {
  59. case <-time.After(ce.interval):
  60. case <-ce.runState.OnStop():
  61. ce.runState.Reset()
  62. return
  63. }
  64. }
  65. }()
  66. }
  67. // Stop halts the Cluster export on an interval
  68. func (ce *ClusterExporter) Stop() {
  69. ce.runState.Stop()
  70. }
  71. // Export stores the cluster cache data into a PODO, marshals as JSON, and saves it to the
  72. // target location.
  73. func (ce *ClusterExporter) Export() error {
  74. c := ce.cluster
  75. encoding := &clusterEncoding{
  76. Namespaces: c.GetAllNamespaces(),
  77. Nodes: c.GetAllNodes(),
  78. Pods: c.GetAllPods(),
  79. Services: c.GetAllServices(),
  80. DaemonSets: c.GetAllDaemonSets(),
  81. Deployments: c.GetAllDeployments(),
  82. StatefulSets: c.GetAllStatefulSets(),
  83. ReplicaSets: c.GetAllReplicaSets(),
  84. PersistentVolumes: c.GetAllPersistentVolumes(),
  85. PersistentVolumeClaims: c.GetAllPersistentVolumeClaims(),
  86. StorageClasses: c.GetAllStorageClasses(),
  87. Jobs: c.GetAllJobs(),
  88. PodDisruptionBudgets: c.GetAllPodDisruptionBudgets(),
  89. ReplicationControllers: c.GetAllReplicationControllers(),
  90. ResourceQuotas: c.GetAllResourceQuotas(),
  91. }
  92. data, err := json.Marshal(encoding)
  93. if err != nil {
  94. return err
  95. }
  96. return ce.target.Write(data)
  97. }