|
|
@@ -18,48 +18,48 @@ import (
|
|
|
)
|
|
|
|
|
|
const (
|
|
|
- queryFmtPods = `avg(kube_pod_container_status_running{}) by (pod, namespace, %s)[%s:%s]%s`
|
|
|
- queryFmtPodsUID = `avg(kube_pod_container_status_running{}) by (pod, namespace, uid, %s)[%s:%s]%s`
|
|
|
- queryFmtRAMBytesAllocated = `avg(avg_over_time(container_memory_allocation_bytes{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, %s, provider_id)`
|
|
|
- queryFmtRAMRequests = `avg(avg_over_time(kube_pod_container_resource_requests{resource="memory", unit="byte", container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, %s)`
|
|
|
- queryFmtRAMUsageAvg = `avg(avg_over_time(container_memory_working_set_bytes{container!="", container_name!="POD", container!="POD"}[%s]%s)) by (container_name, container, pod_name, pod, namespace, instance, %s)`
|
|
|
- queryFmtRAMUsageMax = `max(max_over_time(container_memory_working_set_bytes{container!="", container_name!="POD", container!="POD"}[%s]%s)) by (container_name, container, pod_name, pod, namespace, instance, %s)`
|
|
|
- queryFmtCPUCoresAllocated = `avg(avg_over_time(container_cpu_allocation{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, %s)`
|
|
|
- queryFmtCPURequests = `avg(avg_over_time(kube_pod_container_resource_requests{resource="cpu", unit="core", container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, %s)`
|
|
|
- queryFmtCPUUsageAvg = `avg(rate(container_cpu_usage_seconds_total{container!="", container_name!="POD", container!="POD"}[%s]%s)) by (container_name, container, pod_name, pod, namespace, instance, %s)`
|
|
|
- queryFmtCPUUsageMax = `max(rate(container_cpu_usage_seconds_total{container!="", container_name!="POD", container!="POD"}[%s]%s)) by (container_name, container, pod_name, pod, namespace, instance, %s)`
|
|
|
- queryFmtGPUsRequested = `avg(avg_over_time(kube_pod_container_resource_requests{resource="nvidia_com_gpu", container!="",container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, %s)`
|
|
|
- queryFmtGPUsAllocated = `avg(avg_over_time(container_gpu_allocation{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, %s)`
|
|
|
- queryFmtNodeCostPerCPUHr = `avg(avg_over_time(node_cpu_hourly_cost[%s]%s)) by (node, %s, instance_type, provider_id)`
|
|
|
- queryFmtNodeCostPerRAMGiBHr = `avg(avg_over_time(node_ram_hourly_cost[%s]%s)) by (node, %s, instance_type, provider_id)`
|
|
|
- queryFmtNodeCostPerGPUHr = `avg(avg_over_time(node_gpu_hourly_cost[%s]%s)) by (node, %s, instance_type, provider_id)`
|
|
|
- queryFmtNodeIsSpot = `avg_over_time(kubecost_node_is_spot[%s]%s)`
|
|
|
- queryFmtPVCInfo = `avg(kube_persistentvolumeclaim_info{volumename != ""}) by (persistentvolumeclaim, storageclass, volumename, namespace, %s)[%s:%s]%s`
|
|
|
- queryFmtPVBytes = `avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s]%s)) by (persistentvolume, %s)`
|
|
|
- queryFmtPodPVCAllocation = `avg(avg_over_time(pod_pvc_allocation[%s]%s)) by (persistentvolume, persistentvolumeclaim, pod, namespace, %s)`
|
|
|
- queryFmtPVCBytesRequested = `avg(avg_over_time(kube_persistentvolumeclaim_resource_requests_storage_bytes{}[%s]%s)) by (persistentvolumeclaim, namespace, %s)`
|
|
|
- queryFmtPVCostPerGiBHour = `avg(avg_over_time(pv_hourly_cost[%s]%s)) by (volumename, %s)`
|
|
|
- queryFmtNetZoneGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="true"}[%s]%s)) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
|
|
|
- queryFmtNetZoneCostPerGiB = `avg(avg_over_time(kubecost_network_zone_egress_cost{}[%s]%s)) by (%s)`
|
|
|
- queryFmtNetRegionGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="false"}[%s]%s)) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
|
|
|
- queryFmtNetRegionCostPerGiB = `avg(avg_over_time(kubecost_network_region_egress_cost{}[%s]%s)) by (%s)`
|
|
|
- queryFmtNetInternetGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="true"}[%s]%s)) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
|
|
|
- queryFmtNetInternetCostPerGiB = `avg(avg_over_time(kubecost_network_internet_egress_cost{}[%s]%s)) by (%s)`
|
|
|
- queryFmtNetReceiveBytes = `sum(increase(container_network_receive_bytes_total{pod!="", container="POD"}[%s]%s)) by (pod_name, pod, namespace, %s)`
|
|
|
- queryFmtNetTransferBytes = `sum(increase(container_network_transmit_bytes_total{pod!="", container="POD"}[%s]%s)) by (pod_name, pod, namespace, %s)`
|
|
|
- queryFmtNamespaceLabels = `avg_over_time(kube_namespace_labels[%s]%s)`
|
|
|
- queryFmtNamespaceAnnotations = `avg_over_time(kube_namespace_annotations[%s]%s)`
|
|
|
- queryFmtPodLabels = `avg_over_time(kube_pod_labels[%s]%s)`
|
|
|
- queryFmtPodAnnotations = `avg_over_time(kube_pod_annotations[%s]%s)`
|
|
|
- queryFmtServiceLabels = `avg_over_time(service_selector_labels[%s]%s)`
|
|
|
- queryFmtDeploymentLabels = `avg_over_time(deployment_match_labels[%s]%s)`
|
|
|
- queryFmtStatefulSetLabels = `avg_over_time(statefulSet_match_labels[%s]%s)`
|
|
|
- queryFmtDaemonSetLabels = `sum(avg_over_time(kube_pod_owner{owner_kind="DaemonSet"}[%s]%s)) by (pod, owner_name, namespace, %s)`
|
|
|
- queryFmtJobLabels = `sum(avg_over_time(kube_pod_owner{owner_kind="Job"}[%s]%s)) by (pod, owner_name, namespace ,%s)`
|
|
|
- queryFmtPodsWithReplicaSetOwner = `sum(avg_over_time(kube_pod_owner{owner_kind="ReplicaSet"}[%s]%s)) by (pod, owner_name, namespace ,%s)`
|
|
|
- queryFmtReplicaSetsWithoutOwners = `avg(avg_over_time(kube_replicaset_owner{owner_kind="<none>", owner_name="<none>"}[%s]%s)) by (replicaset, namespace, %s)`
|
|
|
- queryFmtLBCostPerHr = `avg(avg_over_time(kubecost_load_balancer_cost[%s]%s)) by (namespace, service_name, %s)`
|
|
|
- queryFmtLBActiveMins = `count(kubecost_load_balancer_cost) by (namespace, service_name, %s)[%s:%s]%s`
|
|
|
+ queryFmtPods = `avg(kube_pod_container_status_running{}) by (pod, namespace, %s)[%s:%s]`
|
|
|
+ queryFmtPodsUID = `avg(kube_pod_container_status_running{}) by (pod, namespace, uid, %s)[%s:%s]`
|
|
|
+ queryFmtRAMBytesAllocated = `avg(avg_over_time(container_memory_allocation_bytes{container!="", container!="POD", node!=""}[%s])) by (container, pod, namespace, node, %s, provider_id)`
|
|
|
+ queryFmtRAMRequests = `avg(avg_over_time(kube_pod_container_resource_requests{resource="memory", unit="byte", container!="", container!="POD", node!=""}[%s])) by (container, pod, namespace, node, %s)`
|
|
|
+ queryFmtRAMUsageAvg = `avg(avg_over_time(container_memory_working_set_bytes{container!="", container_name!="POD", container!="POD"}[%s])) by (container_name, container, pod_name, pod, namespace, instance, %s)`
|
|
|
+ queryFmtRAMUsageMax = `max(max_over_time(container_memory_working_set_bytes{container!="", container_name!="POD", container!="POD"}[%s])) by (container_name, container, pod_name, pod, namespace, instance, %s)`
|
|
|
+ queryFmtCPUCoresAllocated = `avg(avg_over_time(container_cpu_allocation{container!="", container!="POD", node!=""}[%s])) by (container, pod, namespace, node, %s)`
|
|
|
+ queryFmtCPURequests = `avg(avg_over_time(kube_pod_container_resource_requests{resource="cpu", unit="core", container!="", container!="POD", node!=""}[%s])) by (container, pod, namespace, node, %s)`
|
|
|
+ queryFmtCPUUsageAvg = `avg(rate(container_cpu_usage_seconds_total{container!="", container_name!="POD", container!="POD"}[%s])) by (container_name, container, pod_name, pod, namespace, instance, %s)`
|
|
|
+ queryFmtCPUUsageMax = `max(rate(container_cpu_usage_seconds_total{container!="", container_name!="POD", container!="POD"}[%s])) by (container_name, container, pod_name, pod, namespace, instance, %s)`
|
|
|
+ queryFmtGPUsRequested = `avg(avg_over_time(kube_pod_container_resource_requests{resource="nvidia_com_gpu", container!="",container!="POD", node!=""}[%s])) by (container, pod, namespace, node, %s)`
|
|
|
+ queryFmtGPUsAllocated = `avg(avg_over_time(container_gpu_allocation{container!="", container!="POD", node!=""}[%s])) by (container, pod, namespace, node, %s)`
|
|
|
+ queryFmtNodeCostPerCPUHr = `avg(avg_over_time(node_cpu_hourly_cost[%s])) by (node, %s, instance_type, provider_id)`
|
|
|
+ queryFmtNodeCostPerRAMGiBHr = `avg(avg_over_time(node_ram_hourly_cost[%s])) by (node, %s, instance_type, provider_id)`
|
|
|
+ queryFmtNodeCostPerGPUHr = `avg(avg_over_time(node_gpu_hourly_cost[%s])) by (node, %s, instance_type, provider_id)`
|
|
|
+ queryFmtNodeIsSpot = `avg_over_time(kubecost_node_is_spot[%s])`
|
|
|
+ queryFmtPVCInfo = `avg(kube_persistentvolumeclaim_info{volumename != ""}) by (persistentvolumeclaim, storageclass, volumename, namespace, %s)[%s:%s]`
|
|
|
+ queryFmtPVBytes = `avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s])) by (persistentvolume, %s)`
|
|
|
+ queryFmtPodPVCAllocation = `avg(avg_over_time(pod_pvc_allocation[%s])) by (persistentvolume, persistentvolumeclaim, pod, namespace, %s)`
|
|
|
+ queryFmtPVCBytesRequested = `avg(avg_over_time(kube_persistentvolumeclaim_resource_requests_storage_bytes{}[%s])) by (persistentvolumeclaim, namespace, %s)`
|
|
|
+ queryFmtPVCostPerGiBHour = `avg(avg_over_time(pv_hourly_cost[%s])) by (volumename, %s)`
|
|
|
+ queryFmtNetZoneGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="true"}[%s])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
|
|
|
+ queryFmtNetZoneCostPerGiB = `avg(avg_over_time(kubecost_network_zone_egress_cost{}[%s])) by (%s)`
|
|
|
+ queryFmtNetRegionGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="false"}[%s])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
|
|
|
+ queryFmtNetRegionCostPerGiB = `avg(avg_over_time(kubecost_network_region_egress_cost{}[%s])) by (%s)`
|
|
|
+ queryFmtNetInternetGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="true"}[%s])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
|
|
|
+ queryFmtNetInternetCostPerGiB = `avg(avg_over_time(kubecost_network_internet_egress_cost{}[%s])) by (%s)`
|
|
|
+ queryFmtNetReceiveBytes = `sum(increase(container_network_receive_bytes_total{pod!="", container="POD"}[%s])) by (pod_name, pod, namespace, %s)`
|
|
|
+ queryFmtNetTransferBytes = `sum(increase(container_network_transmit_bytes_total{pod!="", container="POD"}[%s])) by (pod_name, pod, namespace, %s)`
|
|
|
+ queryFmtNamespaceLabels = `avg_over_time(kube_namespace_labels[%s])`
|
|
|
+ queryFmtNamespaceAnnotations = `avg_over_time(kube_namespace_annotations[%s])`
|
|
|
+ queryFmtPodLabels = `avg_over_time(kube_pod_labels[%s])`
|
|
|
+ queryFmtPodAnnotations = `avg_over_time(kube_pod_annotations[%s])`
|
|
|
+ queryFmtServiceLabels = `avg_over_time(service_selector_labels[%s])`
|
|
|
+ queryFmtDeploymentLabels = `avg_over_time(deployment_match_labels[%s])`
|
|
|
+ queryFmtStatefulSetLabels = `avg_over_time(statefulSet_match_labels[%s])`
|
|
|
+ queryFmtDaemonSetLabels = `sum(avg_over_time(kube_pod_owner{owner_kind="DaemonSet"}[%s])) by (pod, owner_name, namespace, %s)`
|
|
|
+ queryFmtJobLabels = `sum(avg_over_time(kube_pod_owner{owner_kind="Job"}[%s])) by (pod, owner_name, namespace ,%s)`
|
|
|
+ queryFmtPodsWithReplicaSetOwner = `sum(avg_over_time(kube_pod_owner{owner_kind="ReplicaSet"}[%s])) by (pod, owner_name, namespace ,%s)`
|
|
|
+ queryFmtReplicaSetsWithoutOwners = `avg(avg_over_time(kube_replicaset_owner{owner_kind="<none>", owner_name="<none>"}[%s])) by (replicaset, namespace, %s)`
|
|
|
+ queryFmtLBCostPerHr = `avg(avg_over_time(kubecost_load_balancer_cost[%s])) by (namespace, service_name, %s)`
|
|
|
+ queryFmtLBActiveMins = `count(kubecost_load_balancer_cost) by (namespace, service_name, %s)[%s:%s]`
|
|
|
)
|
|
|
|
|
|
// This is a bit of a hack to work around garbage data from cadvisor
|
|
|
@@ -84,6 +84,168 @@ func (cm *CostModel) Name() string {
|
|
|
// for the window defined by the given start and end times. The Allocations
|
|
|
// returned are unaggregated (i.e. down to the container level).
|
|
|
func (cm *CostModel) ComputeAllocation(start, end time.Time, resolution time.Duration) (*kubecost.AllocationSet, error) {
|
|
|
+ // If the duration is short enough, compute the AllocationSet directly
|
|
|
+ if end.Sub(start) <= cm.MaxPrometheusQueryDuration {
|
|
|
+ return cm.computeAllocation(start, end, resolution)
|
|
|
+ }
|
|
|
+
|
|
|
+ // If the duration exceeds the configured MaxPrometheusQueryDuration, then
|
|
|
+ // query for maximum-sized AllocationSets, collect them, and accumulate.
|
|
|
+
|
|
|
+ // s and e track the coverage of the entire given window over multiple
|
|
|
+ // internal queries.
|
|
|
+ s, e := start, start
|
|
|
+
|
|
|
+ // Collect AllocationSets in a range, then accumulate
|
|
|
+ // TODO optimize by collecting consecutive AllocationSets, accumulating as we go
|
|
|
+ asr := kubecost.NewAllocationSetRange()
|
|
|
+
|
|
|
+ for e.Before(end) {
|
|
|
+ // By default, query for the full remaining duration. But do not let
|
|
|
+ // any individual query duration exceed the configured max Prometheus
|
|
|
+ // query duration.
|
|
|
+ duration := end.Sub(e)
|
|
|
+ if duration > cm.MaxPrometheusQueryDuration {
|
|
|
+ duration = cm.MaxPrometheusQueryDuration
|
|
|
+ }
|
|
|
+
|
|
|
+ // Set start and end parameters (s, e) for next individual computation.
|
|
|
+ e = s.Add(duration)
|
|
|
+
|
|
|
+ // Compute the individual AllocationSet for just (s, e)
|
|
|
+ as, err := cm.computeAllocation(s, e, resolution)
|
|
|
+ if err != nil {
|
|
|
+ return kubecost.NewAllocationSet(start, end), fmt.Errorf("error computing allocation for %s: %s", kubecost.NewClosedWindow(s, e), err)
|
|
|
+ }
|
|
|
+
|
|
|
+ // Append to the range
|
|
|
+ asr.Append(as)
|
|
|
+
|
|
|
+ // Set s equal to e to set up the next query, if one exists.
|
|
|
+ s = e
|
|
|
+ }
|
|
|
+
|
|
|
+ // Populate annotations, labels, and services on each Allocation. This is
|
|
|
+ // necessary because Properties.Intersection does not propagate any values
|
|
|
+ // stored in maps or slices for performance reasons. In this case, however,
|
|
|
+ // it is both acceptable and necessary to do so.
|
|
|
+ allocationAnnotations := map[string]map[string]string{}
|
|
|
+ allocationLabels := map[string]map[string]string{}
|
|
|
+ allocationServices := map[string]map[string]bool{}
|
|
|
+
|
|
|
+ // Also record errors and warnings, then append them to the results later.
|
|
|
+ errors := []string{}
|
|
|
+ warnings := []string{}
|
|
|
+
|
|
|
+ asr.Each(func(i int, as *kubecost.AllocationSet) {
|
|
|
+ as.Each(func(k string, a *kubecost.Allocation) {
|
|
|
+ if len(a.Properties.Annotations) > 0 {
|
|
|
+ if _, ok := allocationAnnotations[k]; !ok {
|
|
|
+ allocationAnnotations[k] = map[string]string{}
|
|
|
+ }
|
|
|
+ for name, val := range a.Properties.Annotations {
|
|
|
+ allocationAnnotations[k][name] = val
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if len(a.Properties.Labels) > 0 {
|
|
|
+ if _, ok := allocationLabels[k]; !ok {
|
|
|
+ allocationLabels[k] = map[string]string{}
|
|
|
+ }
|
|
|
+ for name, val := range a.Properties.Labels {
|
|
|
+ allocationLabels[k][name] = val
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if len(a.Properties.Services) > 0 {
|
|
|
+ if _, ok := allocationServices[k]; !ok {
|
|
|
+ allocationServices[k] = map[string]bool{}
|
|
|
+ }
|
|
|
+ for _, val := range a.Properties.Services {
|
|
|
+ allocationServices[k][val] = true
|
|
|
+ }
|
|
|
+ }
|
|
|
+ })
|
|
|
+
|
|
|
+ errors = append(errors, as.Errors...)
|
|
|
+ warnings = append(warnings, as.Warnings...)
|
|
|
+ })
|
|
|
+
|
|
|
+ // Accumulate to yield the result AllocationSet. After this step, we will
|
|
|
+ // be nearly complete, but without the raw allocation data, which must be
|
|
|
+ // recomputed.
|
|
|
+ result, err := asr.Accumulate()
|
|
|
+ if err != nil {
|
|
|
+ return kubecost.NewAllocationSet(start, end), fmt.Errorf("error accumulating data for %s: %s", kubecost.NewClosedWindow(s, e), err)
|
|
|
+ }
|
|
|
+
|
|
|
+ // Apply the annotations, labels, and services to the post-accumulation
|
|
|
+ // results. (See above for why this is necessary.)
|
|
|
+ result.Each(func(k string, a *kubecost.Allocation) {
|
|
|
+ if annotations, ok := allocationAnnotations[k]; ok {
|
|
|
+ a.Properties.Annotations = annotations
|
|
|
+ }
|
|
|
+
|
|
|
+ if labels, ok := allocationLabels[k]; ok {
|
|
|
+ a.Properties.Labels = labels
|
|
|
+ }
|
|
|
+
|
|
|
+ if services, ok := allocationServices[k]; ok {
|
|
|
+ a.Properties.Services = []string{}
|
|
|
+ for s := range services {
|
|
|
+ a.Properties.Services = append(a.Properties.Services, s)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Expand the Window of all Allocations within the AllocationSet
|
|
|
+ // to match the Window of the AllocationSet, which gets expanded
|
|
|
+ // at the end of this function.
|
|
|
+ a.Window = a.Window.ExpandStart(start).ExpandEnd(end)
|
|
|
+ })
|
|
|
+
|
|
|
+ // Maintain RAM and CPU max usage values by iterating over the range,
|
|
|
+ // computing maximums on a rolling basis, and setting on the result set.
|
|
|
+ asr.Each(func(i int, as *kubecost.AllocationSet) {
|
|
|
+ as.Each(func(key string, alloc *kubecost.Allocation) {
|
|
|
+ resultAlloc := result.Get(key)
|
|
|
+ if resultAlloc == nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ if resultAlloc.RawAllocationOnly == nil {
|
|
|
+ resultAlloc.RawAllocationOnly = &kubecost.RawAllocationOnlyData{}
|
|
|
+ }
|
|
|
+
|
|
|
+ if alloc.RawAllocationOnly == nil {
|
|
|
+ // This will happen inevitably for unmounted disks, but should
|
|
|
+ // ideally not happen for any allocation with CPU and RAM data.
|
|
|
+ if !alloc.IsUnmounted() {
|
|
|
+ log.DedupedWarningf(10, "ComputeAllocation: raw allocation data missing for %s", key)
|
|
|
+ }
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ if alloc.RawAllocationOnly.CPUCoreUsageMax > resultAlloc.RawAllocationOnly.CPUCoreUsageMax {
|
|
|
+ resultAlloc.RawAllocationOnly.CPUCoreUsageMax = alloc.RawAllocationOnly.CPUCoreUsageMax
|
|
|
+ }
|
|
|
+
|
|
|
+ if alloc.RawAllocationOnly.RAMBytesUsageMax > resultAlloc.RawAllocationOnly.RAMBytesUsageMax {
|
|
|
+ resultAlloc.RawAllocationOnly.RAMBytesUsageMax = alloc.RawAllocationOnly.RAMBytesUsageMax
|
|
|
+ }
|
|
|
+ })
|
|
|
+ })
|
|
|
+
|
|
|
+ // Expand the window to match the queried time range.
|
|
|
+ result.Window = result.Window.ExpandStart(start).ExpandEnd(end)
|
|
|
+
|
|
|
+ // Append errors and warnings
|
|
|
+ result.Errors = errors
|
|
|
+ result.Warnings = warnings
|
|
|
+
|
|
|
+ return result, nil
|
|
|
+}
|
|
|
+
|
|
|
+func (cm *CostModel) computeAllocation(start, end time.Time, resolution time.Duration) (*kubecost.AllocationSet, error) {
|
|
|
// 1. Build out Pod map from resolution-tuned, batched Pod start/end query
|
|
|
// 2. Run and apply the results of the remaining queries to
|
|
|
// 3. Build out AllocationSet from completed Pod map
|
|
|
@@ -130,16 +292,15 @@ func (cm *CostModel) ComputeAllocation(start, end time.Time, resolution time.Dur
|
|
|
log.Debugf("CostModel.ComputeAllocation: ingesting UID data from KSM metrics...")
|
|
|
}
|
|
|
|
|
|
- cm.buildPodMap(window, resolution, env.GetETLMaxBatchDuration(), podMap, clusterStart, clusterEnd, ingestPodUID, podUIDKeyMap)
|
|
|
+ // TODO:CLEANUP remove "max batch" idea and clusterStart/End
|
|
|
+ cm.buildPodMap(window, resolution, env.GetETLMaxPrometheusQueryDuration(), podMap, clusterStart, clusterEnd, ingestPodUID, podUIDKeyMap)
|
|
|
|
|
|
// (2) Run and apply remaining queries
|
|
|
|
|
|
- // Convert window (start, end) to (duration, offset) for querying Prometheus,
|
|
|
- // including handling Thanos offset
|
|
|
- durStr, offStr, err := window.DurationOffsetForPrometheus()
|
|
|
- if err != nil {
|
|
|
- // Negative duration, so return empty set
|
|
|
- return allocSet, nil
|
|
|
+ // Query for the duration between start and end
|
|
|
+ durStr := timeutil.DurationString(end.Sub(start))
|
|
|
+ if durStr == "" {
|
|
|
+ return allocSet, fmt.Errorf("illegal duration value for %s", kubecost.NewClosedWindow(start, end))
|
|
|
}
|
|
|
|
|
|
// Convert resolution duration to a query-ready string
|
|
|
@@ -147,125 +308,125 @@ func (cm *CostModel) ComputeAllocation(start, end time.Time, resolution time.Dur
|
|
|
|
|
|
ctx := prom.NewNamedContext(cm.PrometheusClient, prom.AllocationContextName)
|
|
|
|
|
|
- queryRAMBytesAllocated := fmt.Sprintf(queryFmtRAMBytesAllocated, durStr, offStr, env.GetPromClusterLabel())
|
|
|
- resChRAMBytesAllocated := ctx.Query(queryRAMBytesAllocated)
|
|
|
+ queryRAMBytesAllocated := fmt.Sprintf(queryFmtRAMBytesAllocated, durStr, env.GetPromClusterLabel())
|
|
|
+ resChRAMBytesAllocated := ctx.QueryAtTime(queryRAMBytesAllocated, end)
|
|
|
|
|
|
- queryRAMRequests := fmt.Sprintf(queryFmtRAMRequests, durStr, offStr, env.GetPromClusterLabel())
|
|
|
- resChRAMRequests := ctx.Query(queryRAMRequests)
|
|
|
+ queryRAMRequests := fmt.Sprintf(queryFmtRAMRequests, durStr, env.GetPromClusterLabel())
|
|
|
+ resChRAMRequests := ctx.QueryAtTime(queryRAMRequests, end)
|
|
|
|
|
|
- queryRAMUsageAvg := fmt.Sprintf(queryFmtRAMUsageAvg, durStr, offStr, env.GetPromClusterLabel())
|
|
|
- resChRAMUsageAvg := ctx.Query(queryRAMUsageAvg)
|
|
|
+ queryRAMUsageAvg := fmt.Sprintf(queryFmtRAMUsageAvg, durStr, env.GetPromClusterLabel())
|
|
|
+ resChRAMUsageAvg := ctx.QueryAtTime(queryRAMUsageAvg, end)
|
|
|
|
|
|
- queryRAMUsageMax := fmt.Sprintf(queryFmtRAMUsageMax, durStr, offStr, env.GetPromClusterLabel())
|
|
|
- resChRAMUsageMax := ctx.Query(queryRAMUsageMax)
|
|
|
+ queryRAMUsageMax := fmt.Sprintf(queryFmtRAMUsageMax, durStr, env.GetPromClusterLabel())
|
|
|
+ resChRAMUsageMax := ctx.QueryAtTime(queryRAMUsageMax, end)
|
|
|
|
|
|
- queryCPUCoresAllocated := fmt.Sprintf(queryFmtCPUCoresAllocated, durStr, offStr, env.GetPromClusterLabel())
|
|
|
- resChCPUCoresAllocated := ctx.Query(queryCPUCoresAllocated)
|
|
|
+ queryCPUCoresAllocated := fmt.Sprintf(queryFmtCPUCoresAllocated, durStr, env.GetPromClusterLabel())
|
|
|
+ resChCPUCoresAllocated := ctx.QueryAtTime(queryCPUCoresAllocated, end)
|
|
|
|
|
|
- queryCPURequests := fmt.Sprintf(queryFmtCPURequests, durStr, offStr, env.GetPromClusterLabel())
|
|
|
- resChCPURequests := ctx.Query(queryCPURequests)
|
|
|
+ queryCPURequests := fmt.Sprintf(queryFmtCPURequests, durStr, env.GetPromClusterLabel())
|
|
|
+ resChCPURequests := ctx.QueryAtTime(queryCPURequests, end)
|
|
|
|
|
|
- queryCPUUsageAvg := fmt.Sprintf(queryFmtCPUUsageAvg, durStr, offStr, env.GetPromClusterLabel())
|
|
|
- resChCPUUsageAvg := ctx.Query(queryCPUUsageAvg)
|
|
|
+ queryCPUUsageAvg := fmt.Sprintf(queryFmtCPUUsageAvg, durStr, env.GetPromClusterLabel())
|
|
|
+ resChCPUUsageAvg := ctx.QueryAtTime(queryCPUUsageAvg, end)
|
|
|
|
|
|
- queryCPUUsageMax := fmt.Sprintf(queryFmtCPUUsageMax, durStr, offStr, env.GetPromClusterLabel())
|
|
|
- resChCPUUsageMax := ctx.Query(queryCPUUsageMax)
|
|
|
+ queryCPUUsageMax := fmt.Sprintf(queryFmtCPUUsageMax, durStr, env.GetPromClusterLabel())
|
|
|
+ resChCPUUsageMax := ctx.QueryAtTime(queryCPUUsageMax, end)
|
|
|
|
|
|
- queryGPUsRequested := fmt.Sprintf(queryFmtGPUsRequested, durStr, offStr, env.GetPromClusterLabel())
|
|
|
- resChGPUsRequested := ctx.Query(queryGPUsRequested)
|
|
|
+ queryGPUsRequested := fmt.Sprintf(queryFmtGPUsRequested, durStr, env.GetPromClusterLabel())
|
|
|
+ resChGPUsRequested := ctx.QueryAtTime(queryGPUsRequested, end)
|
|
|
|
|
|
- queryGPUsAllocated := fmt.Sprintf(queryFmtGPUsAllocated, durStr, offStr, env.GetPromClusterLabel())
|
|
|
- resChGPUsAllocated := ctx.Query(queryGPUsAllocated)
|
|
|
+ queryGPUsAllocated := fmt.Sprintf(queryFmtGPUsAllocated, durStr, env.GetPromClusterLabel())
|
|
|
+ resChGPUsAllocated := ctx.QueryAtTime(queryGPUsAllocated, end)
|
|
|
|
|
|
- queryNodeCostPerCPUHr := fmt.Sprintf(queryFmtNodeCostPerCPUHr, durStr, offStr, env.GetPromClusterLabel())
|
|
|
- resChNodeCostPerCPUHr := ctx.Query(queryNodeCostPerCPUHr)
|
|
|
+ queryNodeCostPerCPUHr := fmt.Sprintf(queryFmtNodeCostPerCPUHr, durStr, env.GetPromClusterLabel())
|
|
|
+ resChNodeCostPerCPUHr := ctx.QueryAtTime(queryNodeCostPerCPUHr, end)
|
|
|
|
|
|
- queryNodeCostPerRAMGiBHr := fmt.Sprintf(queryFmtNodeCostPerRAMGiBHr, durStr, offStr, env.GetPromClusterLabel())
|
|
|
- resChNodeCostPerRAMGiBHr := ctx.Query(queryNodeCostPerRAMGiBHr)
|
|
|
+ queryNodeCostPerRAMGiBHr := fmt.Sprintf(queryFmtNodeCostPerRAMGiBHr, durStr, env.GetPromClusterLabel())
|
|
|
+ resChNodeCostPerRAMGiBHr := ctx.QueryAtTime(queryNodeCostPerRAMGiBHr, end)
|
|
|
|
|
|
- queryNodeCostPerGPUHr := fmt.Sprintf(queryFmtNodeCostPerGPUHr, durStr, offStr, env.GetPromClusterLabel())
|
|
|
- resChNodeCostPerGPUHr := ctx.Query(queryNodeCostPerGPUHr)
|
|
|
+ queryNodeCostPerGPUHr := fmt.Sprintf(queryFmtNodeCostPerGPUHr, durStr, env.GetPromClusterLabel())
|
|
|
+ resChNodeCostPerGPUHr := ctx.QueryAtTime(queryNodeCostPerGPUHr, end)
|
|
|
|
|
|
- queryNodeIsSpot := fmt.Sprintf(queryFmtNodeIsSpot, durStr, offStr)
|
|
|
- resChNodeIsSpot := ctx.Query(queryNodeIsSpot)
|
|
|
+ queryNodeIsSpot := fmt.Sprintf(queryFmtNodeIsSpot, durStr)
|
|
|
+ resChNodeIsSpot := ctx.QueryAtTime(queryNodeIsSpot, end)
|
|
|
|
|
|
- queryPVCInfo := fmt.Sprintf(queryFmtPVCInfo, env.GetPromClusterLabel(), durStr, resStr, offStr)
|
|
|
- resChPVCInfo := ctx.Query(queryPVCInfo)
|
|
|
+ queryPVCInfo := fmt.Sprintf(queryFmtPVCInfo, env.GetPromClusterLabel(), durStr, resStr)
|
|
|
+ resChPVCInfo := ctx.QueryAtTime(queryPVCInfo, end)
|
|
|
|
|
|
- queryPVBytes := fmt.Sprintf(queryFmtPVBytes, durStr, offStr, env.GetPromClusterLabel())
|
|
|
- resChPVBytes := ctx.Query(queryPVBytes)
|
|
|
+ queryPVBytes := fmt.Sprintf(queryFmtPVBytes, durStr, env.GetPromClusterLabel())
|
|
|
+ resChPVBytes := ctx.QueryAtTime(queryPVBytes, end)
|
|
|
|
|
|
- queryPodPVCAllocation := fmt.Sprintf(queryFmtPodPVCAllocation, durStr, offStr, env.GetPromClusterLabel())
|
|
|
- resChPodPVCAllocation := ctx.Query(queryPodPVCAllocation)
|
|
|
+ queryPodPVCAllocation := fmt.Sprintf(queryFmtPodPVCAllocation, durStr, env.GetPromClusterLabel())
|
|
|
+ resChPodPVCAllocation := ctx.QueryAtTime(queryPodPVCAllocation, end)
|
|
|
|
|
|
- queryPVCBytesRequested := fmt.Sprintf(queryFmtPVCBytesRequested, durStr, offStr, env.GetPromClusterLabel())
|
|
|
- resChPVCBytesRequested := ctx.Query(queryPVCBytesRequested)
|
|
|
+ queryPVCBytesRequested := fmt.Sprintf(queryFmtPVCBytesRequested, durStr, env.GetPromClusterLabel())
|
|
|
+ resChPVCBytesRequested := ctx.QueryAtTime(queryPVCBytesRequested, end)
|
|
|
|
|
|
- queryPVCostPerGiBHour := fmt.Sprintf(queryFmtPVCostPerGiBHour, durStr, offStr, env.GetPromClusterLabel())
|
|
|
- resChPVCostPerGiBHour := ctx.Query(queryPVCostPerGiBHour)
|
|
|
+ queryPVCostPerGiBHour := fmt.Sprintf(queryFmtPVCostPerGiBHour, durStr, env.GetPromClusterLabel())
|
|
|
+ resChPVCostPerGiBHour := ctx.QueryAtTime(queryPVCostPerGiBHour, end)
|
|
|
|
|
|
- queryNetTransferBytes := fmt.Sprintf(queryFmtNetTransferBytes, durStr, offStr, env.GetPromClusterLabel())
|
|
|
- resChNetTransferBytes := ctx.Query(queryNetTransferBytes)
|
|
|
+ queryNetTransferBytes := fmt.Sprintf(queryFmtNetTransferBytes, durStr, env.GetPromClusterLabel())
|
|
|
+ resChNetTransferBytes := ctx.QueryAtTime(queryNetTransferBytes, end)
|
|
|
|
|
|
- queryNetReceiveBytes := fmt.Sprintf(queryFmtNetReceiveBytes, durStr, offStr, env.GetPromClusterLabel())
|
|
|
- resChNetReceiveBytes := ctx.Query(queryNetReceiveBytes)
|
|
|
+ queryNetReceiveBytes := fmt.Sprintf(queryFmtNetReceiveBytes, durStr, env.GetPromClusterLabel())
|
|
|
+ resChNetReceiveBytes := ctx.QueryAtTime(queryNetReceiveBytes, end)
|
|
|
|
|
|
- queryNetZoneGiB := fmt.Sprintf(queryFmtNetZoneGiB, durStr, offStr, env.GetPromClusterLabel())
|
|
|
- resChNetZoneGiB := ctx.Query(queryNetZoneGiB)
|
|
|
+ queryNetZoneGiB := fmt.Sprintf(queryFmtNetZoneGiB, durStr, env.GetPromClusterLabel())
|
|
|
+ resChNetZoneGiB := ctx.QueryAtTime(queryNetZoneGiB, end)
|
|
|
|
|
|
- queryNetZoneCostPerGiB := fmt.Sprintf(queryFmtNetZoneCostPerGiB, durStr, offStr, env.GetPromClusterLabel())
|
|
|
- resChNetZoneCostPerGiB := ctx.Query(queryNetZoneCostPerGiB)
|
|
|
+ queryNetZoneCostPerGiB := fmt.Sprintf(queryFmtNetZoneCostPerGiB, durStr, env.GetPromClusterLabel())
|
|
|
+ resChNetZoneCostPerGiB := ctx.QueryAtTime(queryNetZoneCostPerGiB, end)
|
|
|
|
|
|
- queryNetRegionGiB := fmt.Sprintf(queryFmtNetRegionGiB, durStr, offStr, env.GetPromClusterLabel())
|
|
|
- resChNetRegionGiB := ctx.Query(queryNetRegionGiB)
|
|
|
+ queryNetRegionGiB := fmt.Sprintf(queryFmtNetRegionGiB, durStr, env.GetPromClusterLabel())
|
|
|
+ resChNetRegionGiB := ctx.QueryAtTime(queryNetRegionGiB, end)
|
|
|
|
|
|
- queryNetRegionCostPerGiB := fmt.Sprintf(queryFmtNetRegionCostPerGiB, durStr, offStr, env.GetPromClusterLabel())
|
|
|
- resChNetRegionCostPerGiB := ctx.Query(queryNetRegionCostPerGiB)
|
|
|
+ queryNetRegionCostPerGiB := fmt.Sprintf(queryFmtNetRegionCostPerGiB, durStr, env.GetPromClusterLabel())
|
|
|
+ resChNetRegionCostPerGiB := ctx.QueryAtTime(queryNetRegionCostPerGiB, end)
|
|
|
|
|
|
- queryNetInternetGiB := fmt.Sprintf(queryFmtNetInternetGiB, durStr, offStr, env.GetPromClusterLabel())
|
|
|
- resChNetInternetGiB := ctx.Query(queryNetInternetGiB)
|
|
|
+ queryNetInternetGiB := fmt.Sprintf(queryFmtNetInternetGiB, durStr, env.GetPromClusterLabel())
|
|
|
+ resChNetInternetGiB := ctx.QueryAtTime(queryNetInternetGiB, end)
|
|
|
|
|
|
- queryNetInternetCostPerGiB := fmt.Sprintf(queryFmtNetInternetCostPerGiB, durStr, offStr, env.GetPromClusterLabel())
|
|
|
- resChNetInternetCostPerGiB := ctx.Query(queryNetInternetCostPerGiB)
|
|
|
+ queryNetInternetCostPerGiB := fmt.Sprintf(queryFmtNetInternetCostPerGiB, durStr, env.GetPromClusterLabel())
|
|
|
+ resChNetInternetCostPerGiB := ctx.QueryAtTime(queryNetInternetCostPerGiB, end)
|
|
|
|
|
|
- queryNamespaceLabels := fmt.Sprintf(queryFmtNamespaceLabels, durStr, offStr)
|
|
|
- resChNamespaceLabels := ctx.Query(queryNamespaceLabels)
|
|
|
+ queryNamespaceLabels := fmt.Sprintf(queryFmtNamespaceLabels, durStr)
|
|
|
+ resChNamespaceLabels := ctx.QueryAtTime(queryNamespaceLabels, end)
|
|
|
|
|
|
- queryNamespaceAnnotations := fmt.Sprintf(queryFmtNamespaceAnnotations, durStr, offStr)
|
|
|
- resChNamespaceAnnotations := ctx.Query(queryNamespaceAnnotations)
|
|
|
+ queryNamespaceAnnotations := fmt.Sprintf(queryFmtNamespaceAnnotations, durStr)
|
|
|
+ resChNamespaceAnnotations := ctx.QueryAtTime(queryNamespaceAnnotations, end)
|
|
|
|
|
|
- queryPodLabels := fmt.Sprintf(queryFmtPodLabels, durStr, offStr)
|
|
|
- resChPodLabels := ctx.Query(queryPodLabels)
|
|
|
+ queryPodLabels := fmt.Sprintf(queryFmtPodLabels, durStr)
|
|
|
+ resChPodLabels := ctx.QueryAtTime(queryPodLabels, end)
|
|
|
|
|
|
- queryPodAnnotations := fmt.Sprintf(queryFmtPodAnnotations, durStr, offStr)
|
|
|
- resChPodAnnotations := ctx.Query(queryPodAnnotations)
|
|
|
+ queryPodAnnotations := fmt.Sprintf(queryFmtPodAnnotations, durStr)
|
|
|
+ resChPodAnnotations := ctx.QueryAtTime(queryPodAnnotations, end)
|
|
|
|
|
|
- queryServiceLabels := fmt.Sprintf(queryFmtServiceLabels, durStr, offStr)
|
|
|
- resChServiceLabels := ctx.Query(queryServiceLabels)
|
|
|
+ queryServiceLabels := fmt.Sprintf(queryFmtServiceLabels, durStr)
|
|
|
+ resChServiceLabels := ctx.QueryAtTime(queryServiceLabels, end)
|
|
|
|
|
|
- queryDeploymentLabels := fmt.Sprintf(queryFmtDeploymentLabels, durStr, offStr)
|
|
|
- resChDeploymentLabels := ctx.Query(queryDeploymentLabels)
|
|
|
+ queryDeploymentLabels := fmt.Sprintf(queryFmtDeploymentLabels, durStr)
|
|
|
+ resChDeploymentLabels := ctx.QueryAtTime(queryDeploymentLabels, end)
|
|
|
|
|
|
- queryStatefulSetLabels := fmt.Sprintf(queryFmtStatefulSetLabels, durStr, offStr)
|
|
|
- resChStatefulSetLabels := ctx.Query(queryStatefulSetLabels)
|
|
|
+ queryStatefulSetLabels := fmt.Sprintf(queryFmtStatefulSetLabels, durStr)
|
|
|
+ resChStatefulSetLabels := ctx.QueryAtTime(queryStatefulSetLabels, end)
|
|
|
|
|
|
- queryDaemonSetLabels := fmt.Sprintf(queryFmtDaemonSetLabels, durStr, offStr, env.GetPromClusterLabel())
|
|
|
- resChDaemonSetLabels := ctx.Query(queryDaemonSetLabels)
|
|
|
+ queryDaemonSetLabels := fmt.Sprintf(queryFmtDaemonSetLabels, durStr, env.GetPromClusterLabel())
|
|
|
+ resChDaemonSetLabels := ctx.QueryAtTime(queryDaemonSetLabels, end)
|
|
|
|
|
|
- queryPodsWithReplicaSetOwner := fmt.Sprintf(queryFmtPodsWithReplicaSetOwner, durStr, offStr, env.GetPromClusterLabel())
|
|
|
- resChPodsWithReplicaSetOwner := ctx.Query(queryPodsWithReplicaSetOwner)
|
|
|
+ queryPodsWithReplicaSetOwner := fmt.Sprintf(queryFmtPodsWithReplicaSetOwner, durStr, env.GetPromClusterLabel())
|
|
|
+ resChPodsWithReplicaSetOwner := ctx.QueryAtTime(queryPodsWithReplicaSetOwner, end)
|
|
|
|
|
|
- queryReplicaSetsWithoutOwners := fmt.Sprintf(queryFmtReplicaSetsWithoutOwners, durStr, offStr, env.GetPromClusterLabel())
|
|
|
- resChReplicaSetsWithoutOwners := ctx.Query(queryReplicaSetsWithoutOwners)
|
|
|
+ queryReplicaSetsWithoutOwners := fmt.Sprintf(queryFmtReplicaSetsWithoutOwners, durStr, env.GetPromClusterLabel())
|
|
|
+ resChReplicaSetsWithoutOwners := ctx.QueryAtTime(queryReplicaSetsWithoutOwners, end)
|
|
|
|
|
|
- queryJobLabels := fmt.Sprintf(queryFmtJobLabels, durStr, offStr, env.GetPromClusterLabel())
|
|
|
- resChJobLabels := ctx.Query(queryJobLabels)
|
|
|
+ queryJobLabels := fmt.Sprintf(queryFmtJobLabels, durStr, env.GetPromClusterLabel())
|
|
|
+ resChJobLabels := ctx.QueryAtTime(queryJobLabels, end)
|
|
|
|
|
|
- queryLBCostPerHr := fmt.Sprintf(queryFmtLBCostPerHr, durStr, offStr, env.GetPromClusterLabel())
|
|
|
- resChLBCostPerHr := ctx.Query(queryLBCostPerHr)
|
|
|
+ queryLBCostPerHr := fmt.Sprintf(queryFmtLBCostPerHr, durStr, env.GetPromClusterLabel())
|
|
|
+ resChLBCostPerHr := ctx.QueryAtTime(queryLBCostPerHr, end)
|
|
|
|
|
|
- queryLBActiveMins := fmt.Sprintf(queryFmtLBActiveMins, env.GetPromClusterLabel(), durStr, resStr, offStr)
|
|
|
- resChLBActiveMins := ctx.Query(queryLBActiveMins)
|
|
|
+ queryLBActiveMins := fmt.Sprintf(queryFmtLBActiveMins, env.GetPromClusterLabel(), durStr, resStr)
|
|
|
+ resChLBActiveMins := ctx.QueryAtTime(queryLBActiveMins, end)
|
|
|
|
|
|
resCPUCoresAllocated, _ := resChCPUCoresAllocated.Await()
|
|
|
resCPURequests, _ := resChCPURequests.Await()
|
|
|
@@ -498,7 +659,7 @@ func (cm *CostModel) ComputeAllocation(start, end time.Time, resolution time.Dur
|
|
|
if pvcInterval, ok := pvcPodIntervalMap[pvcKey][podKey]; ok {
|
|
|
s, e = *pvcInterval.Start(), *pvcInterval.End()
|
|
|
} else {
|
|
|
- log.Warningf("CostModel.ComputeAllocation: allocation %s and PVC %s have no associated active window", alloc.Name, pvc.Name)
|
|
|
+ log.Warnf("CostModel.ComputeAllocation: allocation %s and PVC %s have no associated active window", alloc.Name, pvc.Name)
|
|
|
}
|
|
|
|
|
|
minutes := e.Sub(s).Minutes()
|
|
|
@@ -516,7 +677,7 @@ func (cm *CostModel) ComputeAllocation(start, end time.Time, resolution time.Dur
|
|
|
if coeffComponents, ok := sharedPVCCostCoefficientMap[pvcKey][podKey]; ok {
|
|
|
cost *= getCoefficientFromComponents(coeffComponents)
|
|
|
} else {
|
|
|
- log.Warningf("CostModel.ComputeAllocation: allocation %s and PVC %s have relation but no coeff", alloc.Name, pvc.Name)
|
|
|
+ log.Warnf("CostModel.ComputeAllocation: allocation %s and PVC %s have relation but no coeff", alloc.Name, pvc.Name)
|
|
|
}
|
|
|
|
|
|
// Apply the size and cost of the PV to the allocation, each
|
|
|
@@ -571,7 +732,6 @@ func (cm *CostModel) buildPodMap(window kubecost.Window, resolution, maxBatchSiz
|
|
|
if batchEnd.After(end) {
|
|
|
batchEnd = end
|
|
|
}
|
|
|
- batchWindow := kubecost.NewWindow(&batchStart, &batchEnd)
|
|
|
|
|
|
var resPods []*prom.QueryResult
|
|
|
var err error
|
|
|
@@ -580,10 +740,9 @@ func (cm *CostModel) buildPodMap(window kubecost.Window, resolution, maxBatchSiz
|
|
|
for resPods == nil && numTries < maxTries {
|
|
|
numTries++
|
|
|
|
|
|
- // Convert window (start, end) to (duration, offset) for querying Prometheus,
|
|
|
- // including handling Thanos offset
|
|
|
- durStr, offStr, err := batchWindow.DurationOffsetForPrometheus()
|
|
|
- if err != nil || durStr == "" {
|
|
|
+ // Query for the duration between start and end
|
|
|
+ durStr := timeutil.DurationString(batchEnd.Sub(batchStart))
|
|
|
+ if durStr == "" {
|
|
|
// Negative duration, so set empty results and don't query
|
|
|
resPods = []*prom.QueryResult{}
|
|
|
err = nil
|
|
|
@@ -595,13 +754,13 @@ func (cm *CostModel) buildPodMap(window kubecost.Window, resolution, maxBatchSiz
|
|
|
var queryPods string
|
|
|
// If ingesting UIDs, avg on them
|
|
|
if ingestPodUID {
|
|
|
- queryPods = fmt.Sprintf(queryFmtPodsUID, env.GetPromClusterLabel(), durStr, resStr, offStr)
|
|
|
+ queryPods = fmt.Sprintf(queryFmtPodsUID, env.GetPromClusterLabel(), durStr, resStr)
|
|
|
} else {
|
|
|
- queryPods = fmt.Sprintf(queryFmtPods, env.GetPromClusterLabel(), durStr, resStr, offStr)
|
|
|
+ queryPods = fmt.Sprintf(queryFmtPods, env.GetPromClusterLabel(), durStr, resStr)
|
|
|
}
|
|
|
|
|
|
queryProfile := time.Now()
|
|
|
- resPods, err = ctx.Query(queryPods).Await()
|
|
|
+ resPods, err = ctx.QueryAtTime(queryPods, batchEnd).Await()
|
|
|
if err != nil {
|
|
|
log.Profile(queryProfile, fmt.Sprintf("CostModel.ComputeAllocation: pod query %d try %d failed: %s", numQuery, numTries, queryPods))
|
|
|
resPods = nil
|
|
|
@@ -645,7 +804,7 @@ func (cm *CostModel) buildPodMap(window kubecost.Window, resolution, maxBatchSiz
|
|
|
func applyPodResults(window kubecost.Window, resolution time.Duration, podMap map[podKey]*Pod, clusterStart, clusterEnd map[string]time.Time, resPods []*prom.QueryResult, ingestPodUID bool, podUIDKeyMap map[podKey][]podKey) {
|
|
|
for _, res := range resPods {
|
|
|
if len(res.Values) == 0 {
|
|
|
- log.Warningf("CostModel.ComputeAllocation: empty minutes result")
|
|
|
+ log.Warnf("CostModel.ComputeAllocation: empty minutes result")
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
@@ -656,7 +815,7 @@ func applyPodResults(window kubecost.Window, resolution time.Duration, podMap ma
|
|
|
|
|
|
labels, err := res.GetStrings("namespace", "pod")
|
|
|
if err != nil {
|
|
|
- log.Warningf("CostModel.ComputeAllocation: minutes query result missing field: %s", err)
|
|
|
+ log.Warnf("CostModel.ComputeAllocation: minutes query result missing field: %s", err)
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
@@ -844,7 +1003,7 @@ func applyCPUCoresAllocated(podMap map[podKey]*Pod, resCPUCoresAllocated []*prom
|
|
|
|
|
|
node, err := res.GetString("node")
|
|
|
if err != nil {
|
|
|
- log.Warningf("CostModel.ComputeAllocation: CPU allocation query result missing 'node': %s", key)
|
|
|
+ log.Warnf("CostModel.ComputeAllocation: CPU allocation query result missing 'node': %s", key)
|
|
|
continue
|
|
|
}
|
|
|
pod.Allocations[container].Properties.Node = node
|
|
|
@@ -903,7 +1062,7 @@ func applyCPUCoresRequested(podMap map[podKey]*Pod, resCPUCoresRequested []*prom
|
|
|
}
|
|
|
node, err := res.GetString("node")
|
|
|
if err != nil {
|
|
|
- log.Warningf("CostModel.ComputeAllocation: CPU request query result missing 'node': %s", key)
|
|
|
+ log.Warnf("CostModel.ComputeAllocation: CPU request query result missing 'node': %s", key)
|
|
|
continue
|
|
|
}
|
|
|
pod.Allocations[container].Properties.Node = node
|
|
|
@@ -1057,7 +1216,7 @@ func applyRAMBytesAllocated(podMap map[podKey]*Pod, resRAMBytesAllocated []*prom
|
|
|
|
|
|
node, err := res.GetString("node")
|
|
|
if err != nil {
|
|
|
- log.Warningf("CostModel.ComputeAllocation: RAM allocation query result missing 'node': %s", key)
|
|
|
+ log.Warnf("CostModel.ComputeAllocation: RAM allocation query result missing 'node': %s", key)
|
|
|
continue
|
|
|
}
|
|
|
pod.Allocations[container].Properties.Node = node
|
|
|
@@ -1113,7 +1272,7 @@ func applyRAMBytesRequested(podMap map[podKey]*Pod, resRAMBytesRequested []*prom
|
|
|
|
|
|
node, err := res.GetString("node")
|
|
|
if err != nil {
|
|
|
- log.Warningf("CostModel.ComputeAllocation: RAM request query result missing 'node': %s", key)
|
|
|
+ log.Warnf("CostModel.ComputeAllocation: RAM request query result missing 'node': %s", key)
|
|
|
continue
|
|
|
}
|
|
|
pod.Allocations[container].Properties.Node = node
|
|
|
@@ -1680,7 +1839,7 @@ func resToPodDaemonSetMap(resDaemonSetLabels []*prom.QueryResult, podUIDKeyMap m
|
|
|
|
|
|
pod, err := res.GetString("pod")
|
|
|
if err != nil {
|
|
|
- log.Warningf("CostModel.ComputeAllocation: DaemonSetLabel result without pod: %s", controllerKey)
|
|
|
+ log.Warnf("CostModel.ComputeAllocation: DaemonSetLabel result without pod: %s", controllerKey)
|
|
|
}
|
|
|
|
|
|
key := newPodKey(controllerKey.Cluster, controllerKey.Namespace, pod)
|
|
|
@@ -1723,7 +1882,7 @@ func resToPodJobMap(resJobLabels []*prom.QueryResult, podUIDKeyMap map[podKey][]
|
|
|
|
|
|
pod, err := res.GetString("pod")
|
|
|
if err != nil {
|
|
|
- log.Warningf("CostModel.ComputeAllocation: JobLabel result without pod: %s", controllerKey)
|
|
|
+ log.Warnf("CostModel.ComputeAllocation: JobLabel result without pod: %s", controllerKey)
|
|
|
}
|
|
|
|
|
|
key := newPodKey(controllerKey.Cluster, controllerKey.Namespace, pod)
|
|
|
@@ -1779,7 +1938,7 @@ func resToPodReplicaSetMap(resPodsWithReplicaSetOwner []*prom.QueryResult, resRe
|
|
|
|
|
|
pod, err := res.GetString("pod")
|
|
|
if err != nil {
|
|
|
- log.Warningf("CostModel.ComputeAllocation: ReplicaSet result without pod: %s", controllerKey)
|
|
|
+ log.Warnf("CostModel.ComputeAllocation: ReplicaSet result without pod: %s", controllerKey)
|
|
|
}
|
|
|
|
|
|
key := newPodKey(controllerKey.Cluster, controllerKey.Namespace, pod)
|
|
|
@@ -1869,19 +2028,19 @@ func applyNodeCostPerCPUHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerCPUHr
|
|
|
|
|
|
node, err := res.GetString("node")
|
|
|
if err != nil {
|
|
|
- log.Warningf("CostModel.ComputeAllocation: Node CPU cost query result missing field: %s", err)
|
|
|
+ log.Warnf("CostModel.ComputeAllocation: Node CPU cost query result missing field: %s", err)
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
instanceType, err := res.GetString("instance_type")
|
|
|
if err != nil {
|
|
|
- log.Warningf("CostModel.ComputeAllocation: Node CPU cost query result missing field: %s", err)
|
|
|
+ log.Warnf("CostModel.ComputeAllocation: Node CPU cost query result missing field: %s", err)
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
providerID, err := res.GetString("provider_id")
|
|
|
if err != nil {
|
|
|
- log.Warningf("CostModel.ComputeAllocation: Node CPU cost query result missing field: %s", err)
|
|
|
+ log.Warnf("CostModel.ComputeAllocation: Node CPU cost query result missing field: %s", err)
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
@@ -1907,19 +2066,19 @@ func applyNodeCostPerRAMGiBHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerRA
|
|
|
|
|
|
node, err := res.GetString("node")
|
|
|
if err != nil {
|
|
|
- log.Warningf("CostModel.ComputeAllocation: Node RAM cost query result missing field: %s", err)
|
|
|
+ log.Warnf("CostModel.ComputeAllocation: Node RAM cost query result missing field: %s", err)
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
instanceType, err := res.GetString("instance_type")
|
|
|
if err != nil {
|
|
|
- log.Warningf("CostModel.ComputeAllocation: Node RAM cost query result missing field: %s", err)
|
|
|
+ log.Warnf("CostModel.ComputeAllocation: Node RAM cost query result missing field: %s", err)
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
providerID, err := res.GetString("provider_id")
|
|
|
if err != nil {
|
|
|
- log.Warningf("CostModel.ComputeAllocation: Node RAM cost query result missing field: %s", err)
|
|
|
+ log.Warnf("CostModel.ComputeAllocation: Node RAM cost query result missing field: %s", err)
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
@@ -1945,19 +2104,19 @@ func applyNodeCostPerGPUHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerGPUHr
|
|
|
|
|
|
node, err := res.GetString("node")
|
|
|
if err != nil {
|
|
|
- log.Warningf("CostModel.ComputeAllocation: Node GPU cost query result missing field: %s", err)
|
|
|
+ log.Warnf("CostModel.ComputeAllocation: Node GPU cost query result missing field: %s", err)
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
instanceType, err := res.GetString("instance_type")
|
|
|
if err != nil {
|
|
|
- log.Warningf("CostModel.ComputeAllocation: Node GPU cost query result missing field: %s", err)
|
|
|
+ log.Warnf("CostModel.ComputeAllocation: Node GPU cost query result missing field: %s", err)
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
providerID, err := res.GetString("provider_id")
|
|
|
if err != nil {
|
|
|
- log.Warningf("CostModel.ComputeAllocation: Node GPU cost query result missing field: %s", err)
|
|
|
+ log.Warnf("CostModel.ComputeAllocation: Node GPU cost query result missing field: %s", err)
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
@@ -1983,13 +2142,13 @@ func applyNodeSpot(nodeMap map[nodeKey]*NodePricing, resNodeIsSpot []*prom.Query
|
|
|
|
|
|
node, err := res.GetString("node")
|
|
|
if err != nil {
|
|
|
- log.Warningf("CostModel.ComputeAllocation: Node spot query result missing field: %s", err)
|
|
|
+ log.Warnf("CostModel.ComputeAllocation: Node spot query result missing field: %s", err)
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
key := newNodeKey(cluster, node)
|
|
|
if _, ok := nodeMap[key]; !ok {
|
|
|
- log.Warningf("CostModel.ComputeAllocation: Node spot query result for missing node: %s", key)
|
|
|
+ log.Warnf("CostModel.ComputeAllocation: Node spot query result for missing node: %s", key)
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
@@ -2037,7 +2196,7 @@ func buildPVMap(pvMap map[pvKey]*PV, resPVCostPerGiBHour []*prom.QueryResult) {
|
|
|
|
|
|
name, err := res.GetString("volumename")
|
|
|
if err != nil {
|
|
|
- log.Warningf("CostModel.ComputeAllocation: PV cost without volumename")
|
|
|
+ log.Warnf("CostModel.ComputeAllocation: PV cost without volumename")
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
@@ -2055,12 +2214,12 @@ func applyPVBytes(pvMap map[pvKey]*PV, resPVBytes []*prom.QueryResult) {
|
|
|
for _, res := range resPVBytes {
|
|
|
key, err := resultPVKey(res, env.GetPromClusterLabel(), "persistentvolume")
|
|
|
if err != nil {
|
|
|
- log.Warningf("CostModel.ComputeAllocation: PV bytes query result missing field: %s", err)
|
|
|
+ log.Warnf("CostModel.ComputeAllocation: PV bytes query result missing field: %s", err)
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
if _, ok := pvMap[key]; !ok {
|
|
|
- log.Warningf("CostModel.ComputeAllocation: PV bytes result for missing PV: %s", err)
|
|
|
+ log.Warnf("CostModel.ComputeAllocation: PV bytes result for missing PV: %s", err)
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
@@ -2105,7 +2264,7 @@ func buildPVCMap(window kubecost.Window, pvcMap map[pvcKey]*PVC, pvMap map[pvKey
|
|
|
}
|
|
|
}
|
|
|
if pvcStart.IsZero() || pvcEnd.IsZero() {
|
|
|
- log.Warningf("CostModel.ComputeAllocation: PVC %s has no running time", pvcKey)
|
|
|
+ log.Warnf("CostModel.ComputeAllocation: PVC %s has no running time", pvcKey)
|
|
|
}
|
|
|
pvcStart = pvcStart.Add(-time.Minute)
|
|
|
|
|
|
@@ -2344,7 +2503,7 @@ func getLoadBalancerCosts(resLBCost, resLBActiveMins []*prom.QueryResult, resolu
|
|
|
continue
|
|
|
}
|
|
|
if _, ok := lbHourlyCosts[serviceKey]; !ok {
|
|
|
- log.Warningf("CostModel: failed to find hourly cost for Load Balancer: %v", serviceKey)
|
|
|
+ log.Warnf("CostModel: failed to find hourly cost for Load Balancer: %v", serviceKey)
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
@@ -2415,7 +2574,7 @@ func (cm *CostModel) getNodePricing(nodeMap map[nodeKey]*NodePricing, nodeKey no
|
|
|
// node pricing with the custom values.
|
|
|
customPricingConfig, err := cm.Provider.GetConfig()
|
|
|
if err != nil {
|
|
|
- log.Warningf("CostModel: failed to load custom pricing: %s", err)
|
|
|
+ log.Warnf("CostModel: failed to load custom pricing: %s", err)
|
|
|
}
|
|
|
if cloud.CustomPricesEnabled(cm.Provider) && customPricingConfig != nil {
|
|
|
return cm.getCustomNodePricing(node.Preemptible)
|
|
|
@@ -2429,42 +2588,42 @@ func (cm *CostModel) getNodePricing(nodeMap map[nodeKey]*NodePricing, nodeKey no
|
|
|
// them as strings like this?
|
|
|
|
|
|
if node.CostPerCPUHr == 0 || math.IsNaN(node.CostPerCPUHr) {
|
|
|
- log.Warningf("CostModel: node pricing has illegal CostPerCPUHr; replacing with custom pricing: %s", nodeKey)
|
|
|
+ log.Warnf("CostModel: node pricing has illegal CostPerCPUHr; replacing with custom pricing: %s", nodeKey)
|
|
|
cpuCostStr := customPricingConfig.CPU
|
|
|
if node.Preemptible {
|
|
|
cpuCostStr = customPricingConfig.SpotCPU
|
|
|
}
|
|
|
costPerCPUHr, err := strconv.ParseFloat(cpuCostStr, 64)
|
|
|
if err != nil {
|
|
|
- log.Warningf("CostModel: custom pricing has illegal CPU cost: %s", cpuCostStr)
|
|
|
+ log.Warnf("CostModel: custom pricing has illegal CPU cost: %s", cpuCostStr)
|
|
|
}
|
|
|
node.CostPerCPUHr = costPerCPUHr
|
|
|
node.Source += "/customCPU"
|
|
|
}
|
|
|
|
|
|
if math.IsNaN(node.CostPerGPUHr) {
|
|
|
- log.Warningf("CostModel: node pricing has illegal CostPerGPUHr; replacing with custom pricing: %s", nodeKey)
|
|
|
+ log.Warnf("CostModel: node pricing has illegal CostPerGPUHr; replacing with custom pricing: %s", nodeKey)
|
|
|
gpuCostStr := customPricingConfig.GPU
|
|
|
if node.Preemptible {
|
|
|
gpuCostStr = customPricingConfig.SpotGPU
|
|
|
}
|
|
|
costPerGPUHr, err := strconv.ParseFloat(gpuCostStr, 64)
|
|
|
if err != nil {
|
|
|
- log.Warningf("CostModel: custom pricing has illegal GPU cost: %s", gpuCostStr)
|
|
|
+ log.Warnf("CostModel: custom pricing has illegal GPU cost: %s", gpuCostStr)
|
|
|
}
|
|
|
node.CostPerGPUHr = costPerGPUHr
|
|
|
node.Source += "/customGPU"
|
|
|
}
|
|
|
|
|
|
if node.CostPerRAMGiBHr == 0 || math.IsNaN(node.CostPerRAMGiBHr) {
|
|
|
- log.Warningf("CostModel: node pricing has illegal CostPerRAMHr; replacing with custom pricing: %s", nodeKey)
|
|
|
+ log.Warnf("CostModel: node pricing has illegal CostPerRAMHr; replacing with custom pricing: %s", nodeKey)
|
|
|
ramCostStr := customPricingConfig.RAM
|
|
|
if node.Preemptible {
|
|
|
ramCostStr = customPricingConfig.SpotRAM
|
|
|
}
|
|
|
costPerRAMHr, err := strconv.ParseFloat(ramCostStr, 64)
|
|
|
if err != nil {
|
|
|
- log.Warningf("CostModel: custom pricing has illegal RAM cost: %s", ramCostStr)
|
|
|
+ log.Warnf("CostModel: custom pricing has illegal RAM cost: %s", ramCostStr)
|
|
|
}
|
|
|
node.CostPerRAMGiBHr = costPerRAMHr
|
|
|
node.Source += "/customRAM"
|
|
|
@@ -2494,19 +2653,19 @@ func (cm *CostModel) getCustomNodePricing(spot bool) *NodePricing {
|
|
|
|
|
|
costPerCPUHr, err := strconv.ParseFloat(cpuCostStr, 64)
|
|
|
if err != nil {
|
|
|
- log.Warningf("CostModel: custom pricing has illegal CPU cost: %s", cpuCostStr)
|
|
|
+ log.Warnf("CostModel: custom pricing has illegal CPU cost: %s", cpuCostStr)
|
|
|
}
|
|
|
node.CostPerCPUHr = costPerCPUHr
|
|
|
|
|
|
costPerGPUHr, err := strconv.ParseFloat(gpuCostStr, 64)
|
|
|
if err != nil {
|
|
|
- log.Warningf("CostModel: custom pricing has illegal GPU cost: %s", gpuCostStr)
|
|
|
+ log.Warnf("CostModel: custom pricing has illegal GPU cost: %s", gpuCostStr)
|
|
|
}
|
|
|
node.CostPerGPUHr = costPerGPUHr
|
|
|
|
|
|
costPerRAMHr, err := strconv.ParseFloat(ramCostStr, 64)
|
|
|
if err != nil {
|
|
|
- log.Warningf("CostModel: custom pricing has illegal RAM cost: %s", ramCostStr)
|
|
|
+ log.Warnf("CostModel: custom pricing has illegal RAM cost: %s", ramCostStr)
|
|
|
}
|
|
|
node.CostPerRAMGiBHr = costPerRAMHr
|
|
|
|