|
@@ -139,14 +139,7 @@ func (cm *CostModel) ComputeCostData(start, end time.Time) (map[string]*CostData
|
|
|
ds := cm.DataSource
|
|
ds := cm.DataSource
|
|
|
mq := ds.Metrics()
|
|
mq := ds.Metrics()
|
|
|
|
|
|
|
|
- grp := source.NewQueryGroup()
|
|
|
|
|
-
|
|
|
|
|
- resChRAMUsage := source.WithGroup(grp, mq.QueryRAMUsageAvg(start, end))
|
|
|
|
|
- resChCPUUsage := source.WithGroup(grp, mq.QueryCPUUsageAvg(start, end))
|
|
|
|
|
- resChNetZoneRequests := source.WithGroup(grp, mq.QueryNetZoneGiB(start, end))
|
|
|
|
|
- resChNetRegionRequests := source.WithGroup(grp, mq.QueryNetRegionGiB(start, end))
|
|
|
|
|
- resChNetInternetRequests := source.WithGroup(grp, mq.QueryNetInternetGiB(start, end))
|
|
|
|
|
-
|
|
|
|
|
|
|
+ // Get Kubernetes data
|
|
|
// Pull pod information from k8s API
|
|
// Pull pod information from k8s API
|
|
|
podlist := cm.Cache.GetAllPods()
|
|
podlist := cm.Cache.GetAllPods()
|
|
|
|
|
|
|
@@ -170,30 +163,10 @@ func (cm *CostModel) ComputeCostData(start, end time.Time) (map[string]*CostData
|
|
|
return nil, err
|
|
return nil, err
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // Process Prometheus query results. Handle errors using ctx.Errors.
|
|
|
|
|
- resRAMUsage, _ := resChRAMUsage.Await()
|
|
|
|
|
- resCPUUsage, _ := resChCPUUsage.Await()
|
|
|
|
|
- resNetZoneRequests, _ := resChNetZoneRequests.Await()
|
|
|
|
|
- resNetRegionRequests, _ := resChNetRegionRequests.Await()
|
|
|
|
|
- resNetInternetRequests, _ := resChNetInternetRequests.Await()
|
|
|
|
|
-
|
|
|
|
|
- // NOTE: The way we currently handle errors and warnings only early returns if there is an error. Warnings
|
|
|
|
|
- // NOTE: will not propagate unless coupled with errors.
|
|
|
|
|
- if grp.HasErrors() {
|
|
|
|
|
- // To keep the context of where the errors are occurring, we log the errors here and pass them the error
|
|
|
|
|
- // back to the caller. The caller should handle the specific case where error is an ErrorCollection
|
|
|
|
|
- for _, queryErr := range grp.Errors() {
|
|
|
|
|
- if queryErr.Error != nil {
|
|
|
|
|
- log.Errorf("ComputeCostData: Request Error: %s", queryErr.Error)
|
|
|
|
|
- }
|
|
|
|
|
- if queryErr.ParseError != nil {
|
|
|
|
|
- log.Errorf("ComputeCostData: Parsing Error: %s", queryErr.ParseError)
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- // ErrorCollection is an collection of errors wrapped in a single error implementation
|
|
|
|
|
- // We opt to not return an error for the sake of running as a pure exporter.
|
|
|
|
|
- log.Warnf("ComputeCostData: continuing despite prometheus errors: %s", grp.Error())
|
|
|
|
|
|
|
+ // Get metrics data
|
|
|
|
|
+ resRAMUsage, resCPUUsage, resNetZoneRequests, resNetRegionRequests, resNetInternetRequests, err := queryMetrics(mq, start, end)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ log.Warnf("ComputeCostData: continuing despite metrics errors: %s", err)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
defer measureTime(time.Now(), profileThreshold, "ComputeCostData: Processing Query Data")
|
|
defer measureTime(time.Now(), profileThreshold, "ComputeCostData: Processing Query Data")
|
|
@@ -254,7 +227,7 @@ func (cm *CostModel) ComputeCostData(start, end time.Time) (map[string]*CostData
|
|
|
return nil, err
|
|
return nil, err
|
|
|
}
|
|
}
|
|
|
for _, c := range cs {
|
|
for _, c := range cs {
|
|
|
- containers[c.Key()] = true // captures any containers that existed for a time < a prometheus scrape interval. We currently charge 0 for this but should charge something.
|
|
|
|
|
|
|
+ containers[c.Key()] = true // captures any containers that existed for a time < a metrics scrape interval. We currently charge 0 for this but should charge something.
|
|
|
currentContainers[c.Key()] = *pod
|
|
currentContainers[c.Key()] = *pod
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -268,6 +241,7 @@ func (cm *CostModel) ComputeCostData(start, end time.Time) (map[string]*CostData
|
|
|
// deleted so we have usage information but not request information. In that case,
|
|
// deleted so we have usage information but not request information. In that case,
|
|
|
// we return partial data for CPU and RAM: only usage and not requests.
|
|
// we return partial data for CPU and RAM: only usage and not requests.
|
|
|
if pod, ok := currentContainers[key]; ok {
|
|
if pod, ok := currentContainers[key]; ok {
|
|
|
|
|
+
|
|
|
podName := pod.Name
|
|
podName := pod.Name
|
|
|
ns := pod.Namespace
|
|
ns := pod.Namespace
|
|
|
|
|
|
|
@@ -361,7 +335,7 @@ func (cm *CostModel) ComputeCostData(start, end time.Time) (map[string]*CostData
|
|
|
ramRequestBytes := container.Resources.Requests.Memory().Value()
|
|
ramRequestBytes := container.Resources.Requests.Memory().Value()
|
|
|
|
|
|
|
|
// Because information on container RAM & CPU requests isn't
|
|
// Because information on container RAM & CPU requests isn't
|
|
|
- // coming from Prometheus, it won't have a timestamp associated
|
|
|
|
|
|
|
+ // coming from metrics, it won't have a timestamp associated
|
|
|
// with it. We need to provide a timestamp.
|
|
// with it. We need to provide a timestamp.
|
|
|
RAMReqV := []*util.Vector{
|
|
RAMReqV := []*util.Vector{
|
|
|
{
|
|
{
|
|
@@ -461,7 +435,7 @@ func (cm *CostModel) ComputeCostData(start, end time.Time) (map[string]*CostData
|
|
|
containerNameCost[newKey] = costs
|
|
containerNameCost[newKey] = costs
|
|
|
}
|
|
}
|
|
|
} else {
|
|
} else {
|
|
|
- // The container has been deleted. Not all information is sent to prometheus via ksm, so fill out what we can without k8s api
|
|
|
|
|
|
|
+ // The container has been deleted. Not all information is sent to metrics via ksm, so fill out what we can without k8s api
|
|
|
log.Debug("The container " + key + " has been deleted. Calculating allocation but resulting object will be missing data.")
|
|
log.Debug("The container " + key + " has been deleted. Calculating allocation but resulting object will be missing data.")
|
|
|
c, err := NewContainerMetricFromKey(key)
|
|
c, err := NewContainerMetricFromKey(key)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
@@ -543,6 +517,7 @@ func (cm *CostModel) ComputeCostData(start, end time.Time) (map[string]*CostData
|
|
|
missingContainers[key] = costs
|
|
missingContainers[key] = costs
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
// Use unmounted pvs to create a mapping of "Unmounted-<Namespace>" containers
|
|
// Use unmounted pvs to create a mapping of "Unmounted-<Namespace>" containers
|
|
|
// to pass along the cost data
|
|
// to pass along the cost data
|
|
|
unmounted := findUnmountedPVCostData(cm.ClusterMap, unmountedPVs, namespaceLabelsMapping, namespaceAnnotationsMapping)
|
|
unmounted := findUnmountedPVCostData(cm.ClusterMap, unmountedPVs, namespaceLabelsMapping, namespaceAnnotationsMapping)
|
|
@@ -564,6 +539,44 @@ func (cm *CostModel) ComputeCostData(start, end time.Time) (map[string]*CostData
|
|
|
return containerNameCost, err
|
|
return containerNameCost, err
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+func queryMetrics(mq source.MetricsQuerier, start, end time.Time) ([]*source.ContainerMetricResult, []*source.ContainerMetricResult, []*source.NetZoneGiBResult, []*source.NetRegionGiBResult, []*source.NetInternetGiBResult, error) {
|
|
|
|
|
+ grp := source.NewQueryGroup()
|
|
|
|
|
+
|
|
|
|
|
+ resChRAMUsage := source.WithGroup(grp, mq.QueryRAMUsageAvg(start, end))
|
|
|
|
|
+ resChCPUUsage := source.WithGroup(grp, mq.QueryCPUUsageAvg(start, end))
|
|
|
|
|
+ resChNetZoneRequests := source.WithGroup(grp, mq.QueryNetZoneGiB(start, end))
|
|
|
|
|
+ resChNetRegionRequests := source.WithGroup(grp, mq.QueryNetRegionGiB(start, end))
|
|
|
|
|
+ resChNetInternetRequests := source.WithGroup(grp, mq.QueryNetInternetGiB(start, end))
|
|
|
|
|
+
|
|
|
|
|
+ // Process metrics query results. Handle errors using ctx.Errors.
|
|
|
|
|
+ resRAMUsage, _ := resChRAMUsage.Await()
|
|
|
|
|
+ resCPUUsage, _ := resChCPUUsage.Await()
|
|
|
|
|
+ resNetZoneRequests, _ := resChNetZoneRequests.Await()
|
|
|
|
|
+ resNetRegionRequests, _ := resChNetRegionRequests.Await()
|
|
|
|
|
+ resNetInternetRequests, _ := resChNetInternetRequests.Await()
|
|
|
|
|
+
|
|
|
|
|
+ // NOTE: The way we currently handle errors and warnings only early returns if there is an error. Warnings
|
|
|
|
|
+ // NOTE: will not propagate unless coupled with errors.
|
|
|
|
|
+ if grp.HasErrors() {
|
|
|
|
|
+ // To keep the context of where the errors are occurring, we log the errors here and pass them the error
|
|
|
|
|
+ // back to the caller. The caller should handle the specific case where error is an ErrorCollection
|
|
|
|
|
+ for _, queryErr := range grp.Errors() {
|
|
|
|
|
+ if queryErr.Error != nil {
|
|
|
|
|
+ log.Errorf("ComputeCostData: Request Error: %s", queryErr.Error)
|
|
|
|
|
+ }
|
|
|
|
|
+ if queryErr.ParseError != nil {
|
|
|
|
|
+ log.Errorf("ComputeCostData: Parsing Error: %s", queryErr.ParseError)
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // ErrorCollection is an collection of errors wrapped in a single error implementation
|
|
|
|
|
+ // We opt to not return an error for the sake of running as a pure exporter.
|
|
|
|
|
+ return resRAMUsage, resCPUUsage, resNetZoneRequests, resNetRegionRequests, resNetInternetRequests, grp.Error()
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ return resRAMUsage, resCPUUsage, resNetZoneRequests, resNetRegionRequests, resNetInternetRequests, nil
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
func findUnmountedPVCostData(clusterMap clusters.ClusterMap, unmountedPVs map[string][]*PersistentVolumeClaimData, namespaceLabelsMapping map[string]map[string]string, namespaceAnnotationsMapping map[string]map[string]string) map[string]*CostData {
|
|
func findUnmountedPVCostData(clusterMap clusters.ClusterMap, unmountedPVs map[string][]*PersistentVolumeClaimData, namespaceLabelsMapping map[string]map[string]string, namespaceAnnotationsMapping map[string]map[string]string) map[string]*CostData {
|
|
|
costs := make(map[string]*CostData)
|
|
costs := make(map[string]*CostData)
|
|
|
if len(unmountedPVs) == 0 {
|
|
if len(unmountedPVs) == 0 {
|
|
@@ -673,14 +686,14 @@ func findDeletedNodeInfo(dataSource source.OpenCostDataSource, missingNodes map[
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
if len(cpuCosts) == 0 {
|
|
if len(cpuCosts) == 0 {
|
|
|
- log.Infof("Kubecost prometheus metrics not currently available. Ingest this server's /metrics endpoint to get that data.")
|
|
|
|
|
|
|
+ log.Infof("Opencost metrics not currently available. Ingest this server's /metrics endpoint to get that data.")
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
for node, costv := range cpuCosts {
|
|
for node, costv := range cpuCosts {
|
|
|
if _, ok := missingNodes[node]; ok {
|
|
if _, ok := missingNodes[node]; ok {
|
|
|
missingNodes[node].VCPUCost = fmt.Sprintf("%f", costv[0].Value)
|
|
missingNodes[node].VCPUCost = fmt.Sprintf("%f", costv[0].Value)
|
|
|
} else {
|
|
} else {
|
|
|
- log.DedupedWarningf(5, "Node `%s` in prometheus but not k8s api", node)
|
|
|
|
|
|
|
+ log.DedupedWarningf(5, "Node `%s` in metrics but not k8s api", node)
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
for node, costv := range ramCosts {
|
|
for node, costv := range ramCosts {
|