|
|
@@ -94,6 +94,42 @@ func (cd *CostData) String() string {
|
|
|
len(cd.RAMReq), len(cd.RAMUsed), len(cd.RAMAllocation))
|
|
|
}
|
|
|
|
|
|
+// Error collection helper
|
|
|
+type ErrorCollector struct {
|
|
|
+ m sync.Mutex
|
|
|
+ errors []error
|
|
|
+}
|
|
|
+
|
|
|
+// Reports an error to the collector. Ignores if the error is nil.
|
|
|
+func (ec *ErrorCollector) Report(e error) {
|
|
|
+ if e == nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ ec.m.Lock()
|
|
|
+ defer ec.m.Unlock()
|
|
|
+
|
|
|
+ ec.errors = append(ec.errors, e)
|
|
|
+}
|
|
|
+
|
|
|
+// Whether or not the collector caught errors
|
|
|
+func (ec *ErrorCollector) IsError() bool {
|
|
|
+ ec.m.Lock()
|
|
|
+ defer ec.m.Unlock()
|
|
|
+
|
|
|
+ return len(ec.errors) > 0
|
|
|
+}
|
|
|
+
|
|
|
+// Errors caught by the collector
|
|
|
+func (ec *ErrorCollector) Errors() []error {
|
|
|
+ ec.m.Lock()
|
|
|
+ defer ec.m.Unlock()
|
|
|
+
|
|
|
+ errs := make([]error, len(ec.errors))
|
|
|
+ copy(errs, ec.errors)
|
|
|
+ return errs
|
|
|
+}
|
|
|
+
|
|
|
const (
|
|
|
queryRAMRequestsStr = `avg(
|
|
|
label_replace(
|
|
|
@@ -310,56 +346,97 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
|
|
|
var wg sync.WaitGroup
|
|
|
wg.Add(11)
|
|
|
|
|
|
- var promErr error
|
|
|
+ var ec ErrorCollector
|
|
|
var resultRAMRequests interface{}
|
|
|
go func() {
|
|
|
- resultRAMRequests, promErr = Query(cli, queryRAMRequests)
|
|
|
defer wg.Done()
|
|
|
+
|
|
|
+ var promErr error
|
|
|
+ resultRAMRequests, promErr = Query(cli, queryRAMRequests)
|
|
|
+
|
|
|
+ ec.Report(promErr)
|
|
|
}()
|
|
|
+
|
|
|
var resultRAMUsage interface{}
|
|
|
go func() {
|
|
|
- resultRAMUsage, promErr = Query(cli, queryRAMUsage)
|
|
|
defer wg.Done()
|
|
|
+
|
|
|
+ var promErr error
|
|
|
+ resultRAMUsage, promErr = Query(cli, queryRAMUsage)
|
|
|
+
|
|
|
+ ec.Report(promErr)
|
|
|
}()
|
|
|
var resultCPURequests interface{}
|
|
|
go func() {
|
|
|
- resultCPURequests, promErr = Query(cli, queryCPURequests)
|
|
|
defer wg.Done()
|
|
|
+
|
|
|
+ var promErr error
|
|
|
+ resultCPURequests, promErr = Query(cli, queryCPURequests)
|
|
|
+
|
|
|
+ ec.Report(promErr)
|
|
|
}()
|
|
|
var resultCPUUsage interface{}
|
|
|
go func() {
|
|
|
- resultCPUUsage, promErr = Query(cli, queryCPUUsage)
|
|
|
defer wg.Done()
|
|
|
+
|
|
|
+ var promErr error
|
|
|
+ resultCPUUsage, promErr = Query(cli, queryCPUUsage)
|
|
|
+
|
|
|
+ ec.Report(promErr)
|
|
|
}()
|
|
|
var resultGPURequests interface{}
|
|
|
go func() {
|
|
|
- resultGPURequests, promErr = Query(cli, queryGPURequests)
|
|
|
defer wg.Done()
|
|
|
+
|
|
|
+ var promErr error
|
|
|
+ resultGPURequests, promErr = Query(cli, queryGPURequests)
|
|
|
+
|
|
|
+ ec.Report(promErr)
|
|
|
}()
|
|
|
var resultPVRequests interface{}
|
|
|
go func() {
|
|
|
- resultPVRequests, promErr = Query(cli, queryPVRequests)
|
|
|
defer wg.Done()
|
|
|
+
|
|
|
+ var promErr error
|
|
|
+ resultPVRequests, promErr = Query(cli, queryPVRequests)
|
|
|
+
|
|
|
+ ec.Report(promErr)
|
|
|
}()
|
|
|
var resultNetZoneRequests interface{}
|
|
|
go func() {
|
|
|
- resultNetZoneRequests, promErr = Query(cli, queryNetZoneRequests)
|
|
|
defer wg.Done()
|
|
|
+
|
|
|
+ var promErr error
|
|
|
+ resultNetZoneRequests, promErr = Query(cli, queryNetZoneRequests)
|
|
|
+
|
|
|
+ ec.Report(promErr)
|
|
|
}()
|
|
|
var resultNetRegionRequests interface{}
|
|
|
go func() {
|
|
|
- resultNetRegionRequests, promErr = Query(cli, queryNetRegionRequests)
|
|
|
defer wg.Done()
|
|
|
+
|
|
|
+ var promErr error
|
|
|
+ resultNetRegionRequests, promErr = Query(cli, queryNetRegionRequests)
|
|
|
+
|
|
|
+ ec.Report(promErr)
|
|
|
}()
|
|
|
var resultNetInternetRequests interface{}
|
|
|
go func() {
|
|
|
- resultNetInternetRequests, promErr = Query(cli, queryNetInternetRequests)
|
|
|
defer wg.Done()
|
|
|
+
|
|
|
+ var promErr error
|
|
|
+ resultNetInternetRequests, promErr = Query(cli, queryNetInternetRequests)
|
|
|
+
|
|
|
+ ec.Report(promErr)
|
|
|
}()
|
|
|
var normalizationResult interface{}
|
|
|
go func() {
|
|
|
- normalizationResult, promErr = Query(cli, normalization)
|
|
|
defer wg.Done()
|
|
|
+
|
|
|
+ var promErr error
|
|
|
+ normalizationResult, promErr = Query(cli, normalization)
|
|
|
+
|
|
|
+ ec.Report(promErr)
|
|
|
}()
|
|
|
|
|
|
podDeploymentsMapping := make(map[string]map[string][]string)
|
|
|
@@ -379,6 +456,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
|
|
|
if k8sErr != nil {
|
|
|
return
|
|
|
}
|
|
|
+
|
|
|
namespaceLabelsMapping, k8sErr = getNamespaceLabels(cm.Cache, clusterID)
|
|
|
if k8sErr != nil {
|
|
|
return
|
|
|
@@ -389,8 +467,12 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
|
|
|
|
|
|
defer measureTime(time.Now(), "ComputeCostData: Processing Query Data")
|
|
|
|
|
|
- if promErr != nil {
|
|
|
- return nil, fmt.Errorf("Error querying prometheus: %s", promErr.Error())
|
|
|
+ if ec.IsError() {
|
|
|
+ for _, promErr := range ec.Errors() {
|
|
|
+ klog.V(1).Infof("[Warning] Query Error: %s", promErr.Error())
|
|
|
+ }
|
|
|
+ // TODO: Categorize fatal prometheus query failures
|
|
|
+ // return nil, fmt.Errorf("Error querying prometheus: %s", promErr.Error())
|
|
|
}
|
|
|
if k8sErr != nil {
|
|
|
return nil, fmt.Errorf("Error querying the kubernetes api: %s", k8sErr.Error())
|
|
|
@@ -1311,6 +1393,11 @@ func costDataPassesFilters(costs *CostData, namespace string, cluster string) bo
|
|
|
return passesNamespace && passesCluster
|
|
|
}
|
|
|
|
|
|
+// Finds the a closest multiple less than value
|
|
|
+func floorMultiple(value int64, multiple int64) int64 {
|
|
|
+ return (value / multiple) * multiple
|
|
|
+}
|
|
|
+
|
|
|
// Attempt to create a key for the request. Reduce the times to minutes in order to more easily group requests based on
|
|
|
// real time ranges. If for any reason, the key generation fails, return a uuid to ensure uniqueness.
|
|
|
func requestKeyFor(startString string, endString string, windowString string, filterNamespace string, filterCluster string, remoteEnabled bool) string {
|
|
|
@@ -1319,13 +1406,26 @@ func requestKeyFor(startString string, endString string, windowString string, fi
|
|
|
|
|
|
sTime, err := time.Parse(fullLayout, startString)
|
|
|
if err != nil {
|
|
|
+ klog.V(1).Infof("[Warning] Start=%s failed to parse when generating request key: %s", startString, err.Error())
|
|
|
return uuid.New().String()
|
|
|
}
|
|
|
- eTime, err := time.Parse(fullLayout, startString)
|
|
|
+ eTime, err := time.Parse(fullLayout, endString)
|
|
|
if err != nil {
|
|
|
+ klog.V(1).Infof("[Warning] End=%s failed to parse when generating request key: %s", endString, err.Error())
|
|
|
return uuid.New().String()
|
|
|
}
|
|
|
|
|
|
+ // We "snap" start time and duration to their closest 5 min multiple less than itself, by
|
|
|
+ // applying a snapped duration to a snapped start time.
|
|
|
+ durMins := int64(eTime.Sub(sTime).Minutes())
|
|
|
+ durMins = floorMultiple(durMins, 5)
|
|
|
+
|
|
|
+ sMins := int64(sTime.Minute())
|
|
|
+ sOffset := sMins - floorMultiple(sMins, 5)
|
|
|
+
|
|
|
+ sTime = sTime.Add(-time.Duration(sOffset) * time.Minute)
|
|
|
+ eTime = sTime.Add(time.Duration(durMins) * time.Minute)
|
|
|
+
|
|
|
startKey := sTime.Format(keyLayout)
|
|
|
endKey := eTime.Format(keyLayout)
|
|
|
|
|
|
@@ -1407,139 +1507,196 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
|
|
|
queryProfileStart := time.Now()
|
|
|
queryProfileCh := make(chan string, numQueries)
|
|
|
|
|
|
- var promErr error
|
|
|
+ var ec ErrorCollector
|
|
|
var resultRAMRequests interface{}
|
|
|
go func() {
|
|
|
defer wg.Done()
|
|
|
defer measureTimeAsync(time.Now(), "RAMRequests", queryProfileCh)
|
|
|
|
|
|
+ var promErr error
|
|
|
resultRAMRequests, promErr = QueryRange(cli, queryRAMRequests, start, end, window)
|
|
|
+
|
|
|
+ ec.Report(promErr)
|
|
|
}()
|
|
|
var resultRAMUsage interface{}
|
|
|
go func() {
|
|
|
defer wg.Done()
|
|
|
defer measureTimeAsync(time.Now(), "RAMUsage", queryProfileCh)
|
|
|
|
|
|
+ var promErr error
|
|
|
resultRAMUsage, promErr = QueryRange(cli, queryRAMUsage, start, end, window)
|
|
|
+
|
|
|
+ ec.Report(promErr)
|
|
|
}()
|
|
|
var resultCPURequests interface{}
|
|
|
go func() {
|
|
|
defer wg.Done()
|
|
|
defer measureTimeAsync(time.Now(), "CPURequests", queryProfileCh)
|
|
|
|
|
|
+ var promErr error
|
|
|
resultCPURequests, promErr = QueryRange(cli, queryCPURequests, start, end, window)
|
|
|
+
|
|
|
+ ec.Report(promErr)
|
|
|
}()
|
|
|
var resultCPUUsage interface{}
|
|
|
go func() {
|
|
|
defer wg.Done()
|
|
|
defer measureTimeAsync(time.Now(), "CPUUsage", queryProfileCh)
|
|
|
|
|
|
+ var promErr error
|
|
|
resultCPUUsage, promErr = QueryRange(cli, queryCPUUsage, start, end, window)
|
|
|
+
|
|
|
+ ec.Report(promErr)
|
|
|
}()
|
|
|
var resultRAMAllocations interface{}
|
|
|
go func() {
|
|
|
defer wg.Done()
|
|
|
defer measureTimeAsync(time.Now(), "RAMAllocations", queryProfileCh)
|
|
|
|
|
|
+ var promErr error
|
|
|
resultRAMAllocations, promErr = QueryRange(cli, queryRAMAlloc, start, end, window)
|
|
|
+
|
|
|
+ ec.Report(promErr)
|
|
|
}()
|
|
|
var resultCPUAllocations interface{}
|
|
|
go func() {
|
|
|
defer wg.Done()
|
|
|
defer measureTimeAsync(time.Now(), "CPUAllocations", queryProfileCh)
|
|
|
|
|
|
+ var promErr error
|
|
|
resultCPUAllocations, promErr = QueryRange(cli, queryCPUAlloc, start, end, window)
|
|
|
+
|
|
|
+ ec.Report(promErr)
|
|
|
}()
|
|
|
var resultGPURequests interface{}
|
|
|
go func() {
|
|
|
defer wg.Done()
|
|
|
defer measureTimeAsync(time.Now(), "GPURequests", queryProfileCh)
|
|
|
|
|
|
+ var promErr error
|
|
|
resultGPURequests, promErr = QueryRange(cli, queryGPURequests, start, end, window)
|
|
|
+
|
|
|
+ ec.Report(promErr)
|
|
|
}()
|
|
|
var resultPVRequests interface{}
|
|
|
go func() {
|
|
|
defer wg.Done()
|
|
|
defer measureTimeAsync(time.Now(), "PVRequests", queryProfileCh)
|
|
|
|
|
|
+ var promErr error
|
|
|
resultPVRequests, promErr = QueryRange(cli, queryPVRequests, start, end, window)
|
|
|
+
|
|
|
+ ec.Report(promErr)
|
|
|
}()
|
|
|
var resultNetZoneRequests interface{}
|
|
|
go func() {
|
|
|
defer wg.Done()
|
|
|
defer measureTimeAsync(time.Now(), "NetZoneRequests", queryProfileCh)
|
|
|
|
|
|
+ var promErr error
|
|
|
resultNetZoneRequests, promErr = QueryRange(cli, queryNetZoneRequests, start, end, window)
|
|
|
+
|
|
|
+ ec.Report(promErr)
|
|
|
}()
|
|
|
var resultNetRegionRequests interface{}
|
|
|
go func() {
|
|
|
defer wg.Done()
|
|
|
defer measureTimeAsync(time.Now(), "NetRegionRequests", queryProfileCh)
|
|
|
|
|
|
+ var promErr error
|
|
|
resultNetRegionRequests, promErr = QueryRange(cli, queryNetRegionRequests, start, end, window)
|
|
|
+
|
|
|
+ ec.Report(promErr)
|
|
|
}()
|
|
|
var resultNetInternetRequests interface{}
|
|
|
go func() {
|
|
|
defer wg.Done()
|
|
|
defer measureTimeAsync(time.Now(), "NetInternetRequests", queryProfileCh)
|
|
|
|
|
|
+ var promErr error
|
|
|
resultNetInternetRequests, promErr = QueryRange(cli, queryNetInternetRequests, start, end, window)
|
|
|
+
|
|
|
+ ec.Report(promErr)
|
|
|
}()
|
|
|
var pvPodAllocationResults interface{}
|
|
|
go func() {
|
|
|
defer wg.Done()
|
|
|
defer measureTimeAsync(time.Now(), "PVPodAllocation", queryProfileCh)
|
|
|
|
|
|
+ var promErr error
|
|
|
pvPodAllocationResults, promErr = QueryRange(cli, fmt.Sprintf(queryPVCAllocation, windowString), start, end, window)
|
|
|
+
|
|
|
+ ec.Report(promErr)
|
|
|
}()
|
|
|
var pvCostResults interface{}
|
|
|
go func() {
|
|
|
defer wg.Done()
|
|
|
defer measureTimeAsync(time.Now(), "PVCost", queryProfileCh)
|
|
|
|
|
|
+ var promErr error
|
|
|
pvCostResults, promErr = QueryRange(cli, fmt.Sprintf(queryPVHourlyCost, windowString), start, end, window)
|
|
|
+
|
|
|
+ ec.Report(promErr)
|
|
|
}()
|
|
|
var nsLabelsResults interface{}
|
|
|
go func() {
|
|
|
defer wg.Done()
|
|
|
defer measureTimeAsync(time.Now(), "NSLabels", queryProfileCh)
|
|
|
|
|
|
+ var promErr error
|
|
|
nsLabelsResults, promErr = QueryRange(cli, fmt.Sprintf(queryNSLabels, windowString), start, end, window)
|
|
|
+
|
|
|
+ ec.Report(promErr)
|
|
|
}()
|
|
|
var podLabelsResults interface{}
|
|
|
go func() {
|
|
|
defer wg.Done()
|
|
|
defer measureTimeAsync(time.Now(), "PodLabels", queryProfileCh)
|
|
|
|
|
|
+ var promErr error
|
|
|
podLabelsResults, promErr = QueryRange(cli, fmt.Sprintf(queryPodLabels, windowString), start, end, window)
|
|
|
+
|
|
|
+ ec.Report(promErr)
|
|
|
}()
|
|
|
var serviceLabelsResults interface{}
|
|
|
go func() {
|
|
|
defer wg.Done()
|
|
|
defer measureTimeAsync(time.Now(), "ServiceLabels", queryProfileCh)
|
|
|
|
|
|
+ var promErr error
|
|
|
serviceLabelsResults, promErr = QueryRange(cli, fmt.Sprintf(queryServiceLabels, windowString), start, end, window)
|
|
|
+
|
|
|
+ ec.Report(promErr)
|
|
|
}()
|
|
|
var deploymentLabelsResults interface{}
|
|
|
go func() {
|
|
|
defer wg.Done()
|
|
|
defer measureTimeAsync(time.Now(), "DeploymentLabels", queryProfileCh)
|
|
|
|
|
|
+ var promErr error
|
|
|
deploymentLabelsResults, promErr = QueryRange(cli, fmt.Sprintf(queryDeploymentLabels, windowString), start, end, window)
|
|
|
+
|
|
|
+ ec.Report(promErr)
|
|
|
}()
|
|
|
var statefulsetLabelsResults interface{}
|
|
|
go func() {
|
|
|
defer wg.Done()
|
|
|
defer measureTimeAsync(time.Now(), "StatefulSetLabels", queryProfileCh)
|
|
|
|
|
|
+ var promErr error
|
|
|
statefulsetLabelsResults, promErr = QueryRange(cli, fmt.Sprintf(queryStatefulsetLabels, windowString), start, end, window)
|
|
|
+
|
|
|
+ ec.Report(promErr)
|
|
|
}()
|
|
|
var normalizationResults interface{}
|
|
|
go func() {
|
|
|
defer wg.Done()
|
|
|
defer measureTimeAsync(time.Now(), "Normalization", queryProfileCh)
|
|
|
|
|
|
+ var promErr error
|
|
|
normalizationResults, promErr = QueryRange(cli, normalization, start, end, window)
|
|
|
+
|
|
|
+ ec.Report(promErr)
|
|
|
}()
|
|
|
|
|
|
podDeploymentsMapping := make(map[string]map[string][]string)
|
|
|
@@ -1583,8 +1740,12 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
|
|
|
|
|
|
defer measureTime(time.Now(), fmt.Sprintf("costDataRange(%fh): Processing Query Data", durHrs))
|
|
|
|
|
|
- if promErr != nil {
|
|
|
- return nil, fmt.Errorf("Error querying prometheus: %s", promErr.Error())
|
|
|
+ if ec.IsError() {
|
|
|
+ for _, promErr := range ec.Errors() {
|
|
|
+ klog.V(1).Infof("[Warning] Query Error: %s", promErr.Error())
|
|
|
+ }
|
|
|
+ // TODO: Categorize fatal prometheus query failures
|
|
|
+ // return nil, fmt.Errorf("Error querying prometheus: %s", promErr.Error())
|
|
|
}
|
|
|
if k8sErr != nil {
|
|
|
return nil, fmt.Errorf("Error querying the kubernetes api: %s", k8sErr.Error())
|