|
|
@@ -16,6 +16,7 @@ import (
|
|
|
"github.com/kubecost/cost-model/pkg/metrics"
|
|
|
"github.com/kubecost/cost-model/pkg/prom"
|
|
|
"github.com/kubecost/cost-model/pkg/util"
|
|
|
+ "github.com/kubecost/cost-model/pkg/util/atomic"
|
|
|
|
|
|
promclient "github.com/prometheus/client_golang/api"
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
|
@@ -250,9 +251,7 @@ type CostModelMetricsEmitter struct {
|
|
|
NetworkInternetEgressRecorder prometheus.Gauge
|
|
|
|
|
|
// Flow Control
|
|
|
- recordingLock *sync.Mutex
|
|
|
- recordingStopping bool
|
|
|
- recordingStop chan bool
|
|
|
+ runState atomic.AtomicRunState
|
|
|
}
|
|
|
|
|
|
// NewCostModelMetricsEmitter creates a new cost-model metrics emitter. Use Start() to begin metric emission.
|
|
|
@@ -289,33 +288,12 @@ func NewCostModelMetricsEmitter(promClient promclient.Client, clusterCache clust
|
|
|
NetworkInternetEgressRecorder: networkInternetEgressCostG,
|
|
|
ClusterManagementCostRecorder: clusterManagementCostGv,
|
|
|
LBCostRecorder: lbCostGv,
|
|
|
- recordingLock: new(sync.Mutex),
|
|
|
- recordingStopping: false,
|
|
|
- recordingStop: nil,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-// Checks to see if there is a metric recording stop channel. If it exists, a new
|
|
|
-// channel is not created and false is returned. If it doesn't exist, a new channel
|
|
|
-// is created and true is returned.
|
|
|
-func (cmme *CostModelMetricsEmitter) checkOrCreateRecordingChan() bool {
|
|
|
- cmme.recordingLock.Lock()
|
|
|
- defer cmme.recordingLock.Unlock()
|
|
|
-
|
|
|
- if cmme.recordingStop != nil {
|
|
|
- return false
|
|
|
- }
|
|
|
-
|
|
|
- cmme.recordingStop = make(chan bool, 1)
|
|
|
- return true
|
|
|
-}
|
|
|
-
|
|
|
// IsRunning returns true if metric recording is running.
|
|
|
func (cmme *CostModelMetricsEmitter) IsRunning() bool {
|
|
|
- cmme.recordingLock.Lock()
|
|
|
- defer cmme.recordingLock.Unlock()
|
|
|
-
|
|
|
- return cmme.recordingStop != nil
|
|
|
+ return cmme.runState.IsRunning()
|
|
|
}
|
|
|
|
|
|
// NodeCostAverages tracks a running average of a node's cost attributes.
|
|
|
@@ -330,10 +308,11 @@ type NodeCostAverages struct {
|
|
|
// StartCostModelMetricRecording starts the go routine that emits metrics used to determine
|
|
|
// cluster costs.
|
|
|
func (cmme *CostModelMetricsEmitter) Start() bool {
|
|
|
- // Check to see if we're already recording
|
|
|
- // This function will create the stop recording channel and return true
|
|
|
- // if it doesn't exist.
|
|
|
- if !cmme.checkOrCreateRecordingChan() {
|
|
|
+ // wait for a reset to prevent a race between start and stop calls
|
|
|
+ cmme.runState.WaitForReset()
|
|
|
+
|
|
|
+ // Check to see if we're already recording, and atomically advance the run state to start if we're not
|
|
|
+ if !cmme.runState.Start() {
|
|
|
log.Errorf("Attempted to start cost model metric recording when it's already running.")
|
|
|
return false
|
|
|
}
|
|
|
@@ -695,11 +674,8 @@ func (cmme *CostModelMetricsEmitter) Start() bool {
|
|
|
|
|
|
select {
|
|
|
case <-time.After(time.Minute):
|
|
|
- case <-cmme.recordingStop:
|
|
|
- cmme.recordingLock.Lock()
|
|
|
- cmme.recordingStopping = false
|
|
|
- cmme.recordingStop = nil
|
|
|
- cmme.recordingLock.Unlock()
|
|
|
+ case <-cmme.runState.OnStop():
|
|
|
+ cmme.runState.Reset()
|
|
|
return
|
|
|
}
|
|
|
}
|
|
|
@@ -711,11 +687,5 @@ func (cmme *CostModelMetricsEmitter) Start() bool {
|
|
|
// Stop halts the metrics emission loop after the current emission is completed
|
|
|
// or if the emission is paused.
|
|
|
func (cmme *CostModelMetricsEmitter) Stop() {
|
|
|
- cmme.recordingLock.Lock()
|
|
|
- defer cmme.recordingLock.Unlock()
|
|
|
-
|
|
|
- if !cmme.recordingStopping && cmme.recordingStop != nil {
|
|
|
- cmme.recordingStopping = true
|
|
|
- close(cmme.recordingStop)
|
|
|
- }
|
|
|
+ cmme.runState.Stop()
|
|
|
}
|