|
|
@@ -16,7 +16,6 @@ import (
|
|
|
prometheusClient "github.com/prometheus/client_golang/api"
|
|
|
v1 "k8s.io/api/core/v1"
|
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
|
- "k8s.io/apimachinery/pkg/fields"
|
|
|
"k8s.io/apimachinery/pkg/labels"
|
|
|
"k8s.io/client-go/kubernetes"
|
|
|
"k8s.io/client-go/tools/cache"
|
|
|
@@ -43,20 +42,16 @@ const (
|
|
|
)
|
|
|
|
|
|
type CostModel struct {
|
|
|
- Clientset kubernetes.Interface
|
|
|
Controller *Controller
|
|
|
}
|
|
|
|
|
|
-func NewCostModel(clientset kubernetes.Interface) *CostModel {
|
|
|
- cm := &CostModel{
|
|
|
- Clientset: clientset,
|
|
|
- }
|
|
|
- cm.ContainerWatcher()
|
|
|
+func NewCostModel(podListWatcher cache.ListerWatcher) *CostModel {
|
|
|
+ cm := &CostModel{}
|
|
|
+ cm.ContainerWatcher(podListWatcher)
|
|
|
return cm
|
|
|
}
|
|
|
|
|
|
-func (cm *CostModel) ContainerWatcher() {
|
|
|
- podListWatcher := cache.NewListWatchFromClient(cm.Clientset.CoreV1().RESTClient(), "pods", "", fields.Everything())
|
|
|
+func (cm *CostModel) ContainerWatcher(podListWatcher cache.ListerWatcher) {
|
|
|
|
|
|
// create the workqueue
|
|
|
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
|
|
|
@@ -339,6 +334,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
|
|
|
podlist := cm.Controller.GetAll()
|
|
|
var k8sErr error
|
|
|
go func() {
|
|
|
+ defer wg.Done()
|
|
|
|
|
|
podDeploymentsMapping, k8sErr = getPodDeployments(clientset, podlist)
|
|
|
if k8sErr != nil {
|
|
|
@@ -354,7 +350,6 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
|
|
|
return
|
|
|
}
|
|
|
|
|
|
- wg.Done()
|
|
|
}()
|
|
|
|
|
|
wg.Wait()
|
|
|
@@ -1026,32 +1021,32 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
|
|
|
var promErr error
|
|
|
var resultRAMRequests interface{}
|
|
|
go func() {
|
|
|
- resultRAMRequests, promErr = queryRange(cli, queryRAMRequests, start, end, window)
|
|
|
+ resultRAMRequests, promErr = QueryRange(cli, queryRAMRequests, start, end, window)
|
|
|
defer wg.Done()
|
|
|
}()
|
|
|
var resultRAMUsage interface{}
|
|
|
go func() {
|
|
|
- resultRAMUsage, promErr = queryRange(cli, queryRAMUsage, start, end, window)
|
|
|
+ resultRAMUsage, promErr = QueryRange(cli, queryRAMUsage, start, end, window)
|
|
|
defer wg.Done()
|
|
|
}()
|
|
|
var resultCPURequests interface{}
|
|
|
go func() {
|
|
|
- resultCPURequests, promErr = queryRange(cli, queryCPURequests, start, end, window)
|
|
|
+ resultCPURequests, promErr = QueryRange(cli, queryCPURequests, start, end, window)
|
|
|
defer wg.Done()
|
|
|
}()
|
|
|
var resultCPUUsage interface{}
|
|
|
go func() {
|
|
|
- resultCPUUsage, promErr = queryRange(cli, queryCPUUsage, start, end, window)
|
|
|
+ resultCPUUsage, promErr = QueryRange(cli, queryCPUUsage, start, end, window)
|
|
|
defer wg.Done()
|
|
|
}()
|
|
|
var resultGPURequests interface{}
|
|
|
go func() {
|
|
|
- resultGPURequests, promErr = queryRange(cli, queryGPURequests, start, end, window)
|
|
|
+ resultGPURequests, promErr = QueryRange(cli, queryGPURequests, start, end, window)
|
|
|
defer wg.Done()
|
|
|
}()
|
|
|
var resultPVRequests interface{}
|
|
|
go func() {
|
|
|
- resultPVRequests, promErr = queryRange(cli, queryPVRequests, start, end, window)
|
|
|
+ resultPVRequests, promErr = QueryRange(cli, queryPVRequests, start, end, window)
|
|
|
defer wg.Done()
|
|
|
}()
|
|
|
var normalizationResult interface{}
|
|
|
@@ -1119,7 +1114,7 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
|
|
|
containerNameCost := make(map[string]*CostData)
|
|
|
containers := make(map[string]bool)
|
|
|
|
|
|
- RAMReqMap, err := getContainerMetricVectors(resultRAMRequests, true, normalizationValue)
|
|
|
+ RAMReqMap, err := GetContainerMetricVectors(resultRAMRequests, true, normalizationValue)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
@@ -1127,28 +1122,28 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
|
|
|
containers[key] = true
|
|
|
}
|
|
|
|
|
|
- RAMUsedMap, err := getContainerMetricVectors(resultRAMUsage, true, normalizationValue)
|
|
|
+ RAMUsedMap, err := GetContainerMetricVectors(resultRAMUsage, true, normalizationValue)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
for key := range RAMUsedMap {
|
|
|
containers[key] = true
|
|
|
}
|
|
|
- CPUReqMap, err := getContainerMetricVectors(resultCPURequests, true, normalizationValue)
|
|
|
+ CPUReqMap, err := GetContainerMetricVectors(resultCPURequests, true, normalizationValue)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
for key := range CPUReqMap {
|
|
|
containers[key] = true
|
|
|
}
|
|
|
- GPUReqMap, err := getContainerMetricVectors(resultGPURequests, true, normalizationValue)
|
|
|
+ GPUReqMap, err := GetContainerMetricVectors(resultGPURequests, true, normalizationValue)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
for key := range GPUReqMap {
|
|
|
containers[key] = true
|
|
|
}
|
|
|
- CPUUsedMap, err := getContainerMetricVectors(resultCPUUsage, false, 0) // No need to normalize here, as this comes from a counter
|
|
|
+ CPUUsedMap, err := GetContainerMetricVectors(resultCPUUsage, false, 0) // No need to normalize here, as this comes from a counter
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
@@ -1633,7 +1628,7 @@ func getPVInfoVector(qr interface{}) (map[string]*PersistentVolumeClaimData, err
|
|
|
return pvmap, nil
|
|
|
}
|
|
|
|
|
|
-func queryRange(cli prometheusClient.Client, query string, start, end time.Time, step time.Duration) (interface{}, error) {
|
|
|
+func QueryRange(cli prometheusClient.Client, query string, start, end time.Time, step time.Duration) (interface{}, error) {
|
|
|
u := cli.URL(epQueryRange, nil)
|
|
|
q := u.Query()
|
|
|
q.Set("query", query)
|
|
|
@@ -1853,7 +1848,7 @@ func getContainerMetricVector(qr interface{}, normalize bool, normalizationValue
|
|
|
return containerData, nil
|
|
|
}
|
|
|
|
|
|
-func getContainerMetricVectors(qr interface{}, normalize bool, normalizationValue float64) (map[string][]*Vector, error) {
|
|
|
+func GetContainerMetricVectors(qr interface{}, normalize bool, normalizationValue float64) (map[string][]*Vector, error) {
|
|
|
data, ok := qr.(map[string]interface{})["data"]
|
|
|
if !ok {
|
|
|
e, err := wrapPrometheusError(qr)
|