Przeglądaj źródła

Merge branch 'niko/cdmr' into niko/computeallocation

Niko Kovacevic 5 lat temu
rodzic
commit
0b48bcdf63

+ 2 - 0
pkg/costmodel/aggregation.go

@@ -47,6 +47,8 @@ type Aggregation struct {
 	Environment                string               `json:"environment"`
 	Cluster                    string               `json:"cluster,omitempty"`
 	Properties                 *kubecost.Properties `json:"-"`
+	Start                      time.Time            `json:"-"`
+	End                        time.Time            `json:"-"`
 	CPUAllocationHourlyAverage float64              `json:"cpuAllocationAverage"`
 	CPUAllocationVectors       []*util.Vector       `json:"-"`
 	CPUAllocationTotal         float64              `json:"-"`

+ 1741 - 0
pkg/costmodel/allocation.go

@@ -0,0 +1,1741 @@
+package costmodel
+
+import (
+	"fmt"
+	"time"
+
+	"github.com/kubecost/cost-model/pkg/env"
+	"github.com/kubecost/cost-model/pkg/kubecost"
+	"github.com/kubecost/cost-model/pkg/log"
+	"github.com/kubecost/cost-model/pkg/prom"
+	"github.com/kubecost/cost-model/pkg/thanos"
+	"k8s.io/apimachinery/pkg/labels"
+)
+
+// TODO niko/cdmr NodeProp issue
+// http://kubecost.nikovacevic.io/model/allocation?window=yesterday => Error: NodeProp not set
+
+// TODO niko/cdmr split into required and optional queries?
+
+// TODO niko/cdmr move to pkg/kubecost
+// TODO niko/cdmr add PersistenVolumeClaims to type Allocation?
+type PVC struct {
+	Bytes     float64   `json:"bytes"`
+	Count     int       `json:"count"`
+	Name      string    `json:"name"`
+	Cluster   string    `json:"cluster"`
+	Namespace string    `json:"namespace"`
+	Volume    *PV       `json:"persistentVolume"`
+	Start     time.Time `json:"start"`
+	End       time.Time `json:"end"`
+}
+
+func (pvc *PVC) Cost() float64 {
+	if pvc == nil || pvc.Volume == nil {
+		return 0.0
+	}
+
+	gib := pvc.Bytes / 1024 / 1024 / 1024
+	hrs := pvc.Minutes() / 60.0
+
+	return pvc.Volume.CostPerGiBHour * gib * hrs
+}
+
+func (pvc *PVC) Minutes() float64 {
+	if pvc == nil {
+		return 0.0
+	}
+
+	return pvc.End.Sub(pvc.Start).Minutes()
+}
+
+func (pvc *PVC) String() string {
+	if pvc == nil {
+		return "<nil>"
+	}
+	return fmt.Sprintf("%s/%s/%s{Bytes:%.2f, Cost:%.6f, Start,End:%s}", pvc.Cluster, pvc.Namespace, pvc.Name, pvc.Bytes, pvc.Cost(), kubecost.NewWindow(&pvc.Start, &pvc.End))
+}
+
+// TODO niko/cdmr move to pkg/kubecost
+type PV struct {
+	Bytes          float64 `json:"bytes"`
+	CostPerGiBHour float64 `json:"costPerGiBHour"` // TODO niko/cdmr GiB or GB?
+	Cluster        string  `json:"cluster"`
+	Name           string  `json:"name"`
+	StorageClass   string  `json:"storageClass"`
+}
+
+func (pv *PV) String() string {
+	if pv == nil {
+		return "<nil>"
+	}
+	return fmt.Sprintf("%s/%s{Bytes:%.2f, Cost/GiB*Hr:%.6f, StorageClass:%s}", pv.Cluster, pv.Name, pv.Bytes, pv.CostPerGiBHour, pv.StorageClass)
+}
+
+// ComputeAllocation uses the CostModel instance to compute an AllocationSet
+// for the window defined by the given start and end times. The Allocations
+// returned are unaggregated (i.e. down to the container level).
+func (cm *CostModel) ComputeAllocation(start, end time.Time) (*kubecost.AllocationSet, error) {
+	// Create a window spanning the requested query
+	s, e := start, end
+	window := kubecost.NewWindow(&s, &e)
+
+	// Create an empty AllocationSet. For safety, in the case of an error, we
+	// should prefer to return this empty set with the error. (In the case of
+	// no error, of course we populate the set and return it.)
+	allocSet := kubecost.NewAllocationSet(start, end)
+
+	// Convert window (start, end) to (duration, offset) for querying Prometheus
+	timesToDurations := func(s, e time.Time) (dur, off time.Duration) {
+		now := time.Now()
+		off = now.Sub(e)
+		dur = e.Sub(s)
+		return dur, off
+	}
+	duration, offset := timesToDurations(start, end)
+
+	// If using Thanos, increase offset to 3 hours, reducing the duration by
+	// equal measure to maintain the same starting point.
+	thanosDur := thanos.OffsetDuration()
+	// TODO niko/cdmr confirm that this flag works interchangeably with ThanosClient != nil
+	if offset < thanosDur && env.IsThanosEnabled() {
+		diff := thanosDur - offset
+		offset += diff
+		duration -= diff
+	}
+
+	// If duration < 0, return an empty set
+	if duration < 0 {
+		return allocSet, nil
+	}
+
+	// Negative offset means that the end time is in the future. Prometheus
+	// fails for non-positive offset values, so shrink the duration and
+	// remove the offset altogether.
+	if offset < 0 {
+		duration = duration + offset
+		offset = 0
+	}
+
+	durStr := fmt.Sprintf("%dm", int64(duration.Minutes()))
+	offStr := fmt.Sprintf(" offset %dm", int64(offset.Minutes()))
+	if offset < time.Minute {
+		offStr = ""
+	}
+
+	// TODO niko/cdmr dynamic resolution? add to ComputeAllocation() in allocation.Source?
+	resStr := "1m"
+	// resPerHr := 60
+
+	// TODO niko/cdmr remove after testing
+	startQuerying := time.Now()
+
+	ctx := prom.NewContext(cm.PrometheusClient)
+
+	// TODO niko/cdmr retries? (That should probably go into the Store.)
+
+	// TODO niko/cmdr check: will multiple Prometheus jobs multiply the totals?
+
+	// TODO niko/cdmr should we try doing this without resolution? Could yield
+	// more accurate results, but might also be more challenging in some
+	// respects; e.g. "correcting" the start point by what amount?
+	queryMinutes := fmt.Sprintf(`avg(kube_pod_container_status_running{}) by (container, pod, namespace, cluster_id)[%s:%s]%s`, durStr, resStr, offStr)
+	resChMinutes := ctx.Query(queryMinutes)
+
+	queryRAMBytesAllocated := fmt.Sprintf(`avg(avg_over_time(container_memory_allocation_bytes{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, cluster_id)`, durStr, offStr)
+	resChRAMBytesAllocated := ctx.Query(queryRAMBytesAllocated)
+
+	queryRAMRequests := fmt.Sprintf(`avg(avg_over_time(kube_pod_container_resource_requests_memory_bytes{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, cluster_id)`, durStr, offStr)
+	resChRAMRequests := ctx.Query(queryRAMRequests)
+
+	queryRAMUsage := fmt.Sprintf(`avg(avg_over_time(container_memory_working_set_bytes{container_name!="", container_name!="POD", instance!=""}[%s]%s)) by (container_name, pod_name, namespace, instance, cluster_id)`, durStr, offStr)
+	resChRAMUsage := ctx.Query(queryRAMUsage)
+
+	queryCPUCoresAllocated := fmt.Sprintf(`avg(avg_over_time(container_cpu_allocation{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, cluster_id)`, durStr, offStr)
+	resChCPUCoresAllocated := ctx.Query(queryCPUCoresAllocated)
+
+	queryCPURequests := fmt.Sprintf(`avg(avg_over_time(kube_pod_container_resource_requests_cpu_cores{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, cluster_id)`, durStr, offStr)
+	resChCPURequests := ctx.Query(queryCPURequests)
+
+	queryCPUUsage := fmt.Sprintf(`avg(rate(container_cpu_usage_seconds_total{container_name!="", container_name!="POD", instance!=""}[%s]%s)) by (container_name, pod_name, namespace, instance, cluster_id)`, durStr, offStr)
+	resChCPUUsage := ctx.Query(queryCPUUsage)
+
+	// TODO niko/cdmr find an env with GPUs to test this (generate one?)
+	queryGPUsRequested := fmt.Sprintf(`avg(avg_over_time(kube_pod_container_resource_requests{resource="nvidia_com_gpu", container!="",container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, cluster_id)`, durStr, offStr)
+	resChGPUsRequested := ctx.Query(queryGPUsRequested)
+
+	queryNodeCostPerCPUHr := fmt.Sprintf(`avg(avg_over_time(node_cpu_hourly_cost[%s]%s)) by (node, cluster_id, instance_type)`, durStr, offStr)
+	resChNodeCostPerCPUHr := ctx.Query(queryNodeCostPerCPUHr)
+
+	queryNodeCostPerRAMGiBHr := fmt.Sprintf(`avg(avg_over_time(node_ram_hourly_cost[%s]%s)) by (node, cluster_id, instance_type)`, durStr, offStr)
+	resChNodeCostPerRAMGiBHr := ctx.Query(queryNodeCostPerRAMGiBHr)
+
+	queryNodeCostPerGPUHr := fmt.Sprintf(`avg(avg_over_time(node_gpu_hourly_cost[%s]%s)) by (node, cluster_id, instance_type)`, durStr, offStr)
+	resChNodeCostPerGPUHr := ctx.Query(queryNodeCostPerGPUHr)
+
+	queryNodeIsSpot := fmt.Sprintf(`avg_over_time(kubecost_node_is_spot[%s]%s)`, durStr, offStr)
+	resChNodeIsSpot := ctx.Query(queryNodeIsSpot)
+
+	queryPVCInfo := fmt.Sprintf(`avg(kube_persistentvolumeclaim_info{volumename != ""}) by (persistentvolumeclaim, storageclass, volumename, namespace, cluster_id)[%s:%s]%s`, durStr, resStr, offStr)
+	resChPVCInfo := ctx.Query(queryPVCInfo)
+
+	queryPVBytes := fmt.Sprintf(`avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s]%s)) by (persistentvolume, cluster_id)`, durStr, offStr)
+	resChPVBytes := ctx.Query(queryPVBytes)
+
+	queryPodPVCAllocation := fmt.Sprintf(`avg(avg_over_time(pod_pvc_allocation[%s]%s)) by (persistentvolume, persistentvolumeclaim, pod, namespace, cluster_id)`, durStr, offStr)
+	resChPodPVCAllocation := ctx.Query(queryPodPVCAllocation)
+
+	queryPVCBytesRequested := fmt.Sprintf(`avg(avg_over_time(kube_persistentvolumeclaim_resource_requests_storage_bytes{}[%s]%s)) by (persistentvolumeclaim, namespace, cluster_id)`, durStr, offStr)
+	resChPVCBytesRequested := ctx.Query(queryPVCBytesRequested)
+
+	queryPVCostPerGiBHour := fmt.Sprintf(`avg(avg_over_time(pv_hourly_cost[%s]%s)) by (volumename, cluster_id)`, durStr, offStr)
+	resChPVCostPerGiBHour := ctx.Query(queryPVCostPerGiBHour)
+
+	queryNetZoneGiB := fmt.Sprintf(`sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="true"}[%s]%s)) by (pod_name, namespace, cluster_id) / 1024 / 1024 / 1024`, durStr, offStr)
+	resChNetZoneGiB := ctx.Query(queryNetZoneGiB)
+
+	queryNetZoneCostPerGiB := fmt.Sprintf(`avg(avg_over_time(kubecost_network_zone_egress_cost{}[%s]%s)) by (cluster_id)`, durStr, offStr)
+	resChNetZoneCostPerGiB := ctx.Query(queryNetZoneCostPerGiB)
+
+	queryNetRegionGiB := fmt.Sprintf(`sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="false"}[%s]%s)) by (pod_name, namespace, cluster_id) / 1024 / 1024 / 1024`, durStr, offStr)
+	resChNetRegionGiB := ctx.Query(queryNetRegionGiB)
+
+	queryNetRegionCostPerGiB := fmt.Sprintf(`avg(avg_over_time(kubecost_network_region_egress_cost{}[%s]%s)) by (cluster_id)`, durStr, offStr)
+	resChNetRegionCostPerGiB := ctx.Query(queryNetRegionCostPerGiB)
+
+	queryNetInternetGiB := fmt.Sprintf(`sum(increase(kubecost_pod_network_egress_bytes_total{internet="true"}[%s]%s)) by (pod_name, namespace, cluster_id) / 1024 / 1024 / 1024`, durStr, offStr)
+	resChNetInternetGiB := ctx.Query(queryNetInternetGiB)
+
+	queryNetInternetCostPerGiB := fmt.Sprintf(`avg(avg_over_time(kubecost_network_internet_egress_cost{}[%s]%s)) by (cluster_id)`, durStr, offStr)
+	resChNetInternetCostPerGiB := ctx.Query(queryNetInternetCostPerGiB)
+
+	queryNamespaceLabels := fmt.Sprintf(`avg_over_time(kube_namespace_labels[%s]%s)`, durStr, offStr)
+	resChNamespaceLabels := ctx.Query(queryNamespaceLabels)
+
+	queryNamespaceAnnotations := fmt.Sprintf(`avg_over_time(kube_namespace_annotations[%s]%s)`, durStr, offStr)
+	resChNamespaceAnnotations := ctx.Query(queryNamespaceAnnotations)
+
+	queryPodLabels := fmt.Sprintf(`avg_over_time(kube_pod_labels[%s]%s)`, durStr, offStr)
+	resChPodLabels := ctx.Query(queryPodLabels)
+
+	queryPodAnnotations := fmt.Sprintf(`avg_over_time(kube_pod_annotations[%s]%s)`, durStr, offStr)
+	resChPodAnnotations := ctx.Query(queryPodAnnotations)
+
+	queryServiceLabels := fmt.Sprintf(`avg_over_time(service_selector_labels[%s]%s)`, durStr, offStr)
+	resChServiceLabels := ctx.Query(queryServiceLabels)
+
+	queryDeploymentLabels := fmt.Sprintf(`avg_over_time(deployment_match_labels[%s]%s)`, durStr, offStr)
+	resChDeploymentLabels := ctx.Query(queryDeploymentLabels)
+
+	queryStatefulSetLabels := fmt.Sprintf(`avg_over_time(statefulSet_match_labels[%s]%s)`, durStr, offStr)
+	resChStatefulSetLabels := ctx.Query(queryStatefulSetLabels)
+
+	queryDaemonSetLabels := fmt.Sprintf(`sum(avg_over_time(kube_pod_owner{owner_kind="DaemonSet"}[%s]%s)) by (pod, owner_name, namespace, cluster_id)`, durStr, offStr)
+	resChDaemonSetLabels := ctx.Query(queryDaemonSetLabels)
+
+	queryJobLabels := fmt.Sprintf(`sum(avg_over_time(kube_pod_owner{owner_kind="Job"}[%s]%s)) by (pod, owner_name, namespace ,cluster_id)`, durStr, offStr)
+	resChJobLabels := ctx.Query(queryJobLabels)
+
+	resMinutes, _ := resChMinutes.Await()
+	resCPUCoresAllocated, _ := resChCPUCoresAllocated.Await()
+	resCPURequests, _ := resChCPURequests.Await()
+	resCPUUsage, _ := resChCPUUsage.Await()
+	resRAMBytesAllocated, _ := resChRAMBytesAllocated.Await()
+	resRAMRequests, _ := resChRAMRequests.Await()
+	resRAMUsage, _ := resChRAMUsage.Await()
+	resGPUsRequested, _ := resChGPUsRequested.Await()
+
+	resNodeCostPerCPUHr, _ := resChNodeCostPerCPUHr.Await()
+	resNodeCostPerRAMGiBHr, _ := resChNodeCostPerRAMGiBHr.Await()
+	resNodeCostPerGPUHr, _ := resChNodeCostPerGPUHr.Await()
+	resNodeIsSpot, _ := resChNodeIsSpot.Await()
+
+	resPVBytes, _ := resChPVBytes.Await()
+	resPVCostPerGiBHour, _ := resChPVCostPerGiBHour.Await()
+
+	resPVCInfo, _ := resChPVCInfo.Await()
+	resPVCBytesRequested, _ := resChPVCBytesRequested.Await()
+	resPodPVCAllocation, _ := resChPodPVCAllocation.Await()
+
+	resNetZoneGiB, _ := resChNetZoneGiB.Await()
+	resNetZoneCostPerGiB, _ := resChNetZoneCostPerGiB.Await()
+	resNetRegionGiB, _ := resChNetRegionGiB.Await()
+	resNetRegionCostPerGiB, _ := resChNetRegionCostPerGiB.Await()
+	resNetInternetGiB, _ := resChNetInternetGiB.Await()
+	resNetInternetCostPerGiB, _ := resChNetInternetCostPerGiB.Await()
+
+	resNamespaceLabels, _ := resChNamespaceLabels.Await()
+	resNamespaceAnnotations, _ := resChNamespaceAnnotations.Await()
+	resPodLabels, _ := resChPodLabels.Await()
+	resPodAnnotations, _ := resChPodAnnotations.Await()
+	resServiceLabels, _ := resChServiceLabels.Await()
+	resDeploymentLabels, _ := resChDeploymentLabels.Await()
+	resStatefulSetLabels, _ := resChStatefulSetLabels.Await()
+	resDaemonSetLabels, _ := resChDaemonSetLabels.Await()
+	resJobLabels, _ := resChJobLabels.Await()
+
+	// ----------------------------------------------------------------------//
+	// TODO niko/cdmr remove all logs after testing
+
+	// log.Infof("CostModel.ComputeAllocation: minutes  : %s", queryMinutes)
+
+	// log.Infof("CostModel.ComputeAllocation: CPU cores: %s", queryCPUCoresAllocated)
+	// log.Infof("CostModel.ComputeAllocation: CPU req  : %s", queryCPURequests)
+	// log.Infof("CostModel.ComputeAllocation: CPU use  : %s", queryCPUUsage)
+	// log.Infof("CostModel.ComputeAllocation: $/CPU*Hr : %s", queryNodeCostPerCPUHr)
+
+	// log.Infof("CostModel.ComputeAllocation: RAM bytes: %s", queryRAMBytesAllocated)
+	// log.Infof("CostModel.ComputeAllocation: RAM req  : %s", queryRAMRequests)
+	// log.Infof("CostModel.ComputeAllocation: RAM use  : %s", queryRAMUsage)
+	// log.Infof("CostModel.ComputeAllocation: $/GiB*Hr : %s", queryNodeCostPerRAMGiBHr)
+
+	// log.Infof("CostModel.ComputeAllocation: PV $/gbhr: %s", queryPVCostPerGiBHour)
+	// log.Infof("CostModel.ComputeAllocation: PV bytes : %s", queryPVBytes)
+
+	// log.Infof("CostModel.ComputeAllocation: PVC alloc: %s", queryPodPVCAllocation)
+	// log.Infof("CostModel.ComputeAllocation: PVC bytes: %s", queryPVCBytesRequested)
+	// log.Infof("CostModel.ComputeAllocation: PVC info : %s", queryPVCInfo)
+
+	// log.Infof("CostModel.ComputeAllocation: Net Z GiB: %s", queryNetZoneGiB)
+	// log.Infof("CostModel.ComputeAllocation: Net Z $  : %s", queryNetZoneCostPerGiB)
+	// log.Infof("CostModel.ComputeAllocation: Net R GiB: %s", queryNetRegionGiB)
+	// log.Infof("CostModel.ComputeAllocation: Net R $  : %s", queryNetRegionCostPerGiB)
+	// log.Infof("CostModel.ComputeAllocation: Net I GiB: %s", queryNetInternetGiB)
+	// log.Infof("CostModel.ComputeAllocation: Net I $  : %s", queryNetInternetCostPerGiB)
+
+	// log.Infof("CostModel.ComputeAllocation: NamespaceLabels: %s", queryNamespaceLabels)
+	// log.Infof("CostModel.ComputeAllocation: NamespaceAnnotations: %s", queryNamespaceAnnotations)
+	// log.Infof("CostModel.ComputeAllocation: PodLabels: %s", queryPodLabels)
+	// log.Infof("CostModel.ComputeAllocation: PodAnnotations: %s", queryPodAnnotations)
+	// log.Infof("CostModel.ComputeAllocation: ServiceLabels: %s", queryServiceLabels)
+	// log.Infof("CostModel.ComputeAllocation: DeploymentLabels: %s", queryDeploymentLabels)
+	// log.Infof("CostModel.ComputeAllocation: StatefulSetLabels: %s", queryStatefulSetLabels)
+	// log.Infof("CostModel.ComputeAllocation: DaemonSetLabels: %s", queryDaemonSetLabels)
+	// log.Infof("CostModel.ComputeAllocation: JobLabels: %s", queryJobLabels)
+
+	log.Profile(startQuerying, "CostModel.ComputeAllocation: queries complete")
+	defer log.Profile(time.Now(), "CostModel.ComputeAllocation: processing complete")
+
+	// ----------------------------------------------------------------------//
+
+	// Build out a map of Allocations, starting with (start, end) so that we
+	// begin with minutes, from which we compute resource allocation and cost
+	// totals from measured rate data.
+	// TODO niko/cdmr can we start with a reasonable guess at map size?
+	allocationMap := map[containerKey]*kubecost.Allocation{}
+
+	// Keep track of the allocations per pod, for the sake of splitting PVC and
+	// Network allocation into per-Allocation from per-Pod.
+	podAllocation := map[podKey][]*kubecost.Allocation{}
+
+	// clusterStarts and clusterEnds record the earliest start and latest end
+	// times, respectively, on a cluster-basis. These are used for unmounted
+	// PVs and other "virtual" Allocations so that minutes are maximally
+	// accurate during start-up or spin-down of a cluster
+	clusterStart := map[string]time.Time{}
+	clusterEnd := map[string]time.Time{}
+
+	buildAllocationMap(window, allocationMap, podAllocation, clusterStart, clusterEnd, resMinutes)
+	applyCPUCoresAllocated(allocationMap, resCPUCoresAllocated)
+	applyCPUCoresRequested(allocationMap, resCPURequests)
+	applyCPUCoresUsed(allocationMap, resCPUUsage)
+	applyRAMBytesAllocated(allocationMap, resRAMBytesAllocated)
+	applyRAMBytesRequested(allocationMap, resRAMRequests)
+	applyRAMBytesUsed(allocationMap, resRAMUsage)
+	applyGPUsRequested(allocationMap, resGPUsRequested)
+	applyNetworkAllocation(allocationMap, podAllocation, resNetZoneGiB, resNetZoneCostPerGiB)
+	applyNetworkAllocation(allocationMap, podAllocation, resNetRegionGiB, resNetRegionCostPerGiB)
+	applyNetworkAllocation(allocationMap, podAllocation, resNetInternetGiB, resNetInternetCostPerGiB)
+
+	// TODO niko/cdmr pruneDuplicateData? (see costmodel.go)
+
+	namespaceLabels := resToNamespaceLabels(resNamespaceLabels)
+	podLabels := resToPodLabels(resPodLabels)
+	namespaceAnnotations := resToNamespaceAnnotations(resNamespaceAnnotations)
+	podAnnotations := resToPodAnnotations(resPodAnnotations)
+	applyLabels(allocationMap, namespaceLabels, podLabels)
+	applyAnnotations(allocationMap, namespaceAnnotations, podAnnotations)
+
+	serviceLabels := getServiceLabels(resServiceLabels)
+	applyServicesToPods(allocationMap, podLabels, serviceLabels)
+
+	podDeploymentMap := labelsToPodControllerMap(podLabels, resToDeploymentLabels(resDeploymentLabels))
+	podStatefulSetMap := labelsToPodControllerMap(podLabels, resToStatefulSetLabels(resStatefulSetLabels))
+	podDaemonSetMap := resToPodDaemonSetMap(resDaemonSetLabels)
+	podJobMap := resToPodJobMap(resJobLabels)
+	applyControllersToPods(allocationMap, podDeploymentMap)
+	applyControllersToPods(allocationMap, podStatefulSetMap)
+	applyControllersToPods(allocationMap, podDaemonSetMap)
+	applyControllersToPods(allocationMap, podJobMap)
+
+	// TODO niko/cdmr breakdown network costs?
+
+	// Build out a map of Nodes with resource costs, discounts, and node types
+	// for converting resource allocation data to cumulative costs.
+	nodeMap := map[nodeKey]*Node{}
+
+	applyNodeCostPerCPUHr(nodeMap, resNodeCostPerCPUHr)
+	applyNodeCostPerRAMGiBHr(nodeMap, resNodeCostPerRAMGiBHr)
+	applyNodeCostPerGPUHr(nodeMap, resNodeCostPerGPUHr)
+	applyNodeSpot(nodeMap, resNodeIsSpot)
+	applyNodeDiscount(nodeMap, cm)
+
+	// TODO niko/cdmr comment
+	pvMap := map[pvKey]*PV{}
+	buildPVMap(pvMap, resPVCostPerGiBHour)
+	applyPVBytes(pvMap, resPVBytes)
+	// TODO niko/cdmr apply PV bytes?
+
+	// TODO niko/cdmr comment
+	pvcMap := map[pvcKey]*PVC{}
+	buildPVCMap(window, pvcMap, pvMap, resPVCInfo)
+	applyPVCBytesRequested(pvcMap, resPVCBytesRequested)
+
+	// TODO niko/cdmr comment
+	podPVCMap := map[podKey][]*PVC{}
+	buildPodPVCMap(podPVCMap, pvMap, pvcMap, podAllocation, resPodPVCAllocation)
+
+	// Identify unmounted PVs (PVs without PVCs) and add one Allocation per
+	// cluster representing each cluster's unmounted PVs (if necessary).
+	applyUnmountedPVs(window, allocationMap, pvMap, pvcMap)
+
+	for _, alloc := range allocationMap {
+		cluster, _ := alloc.Properties.GetCluster()
+		node, _ := alloc.Properties.GetNode()
+		namespace, _ := alloc.Properties.GetNamespace()
+		pod, _ := alloc.Properties.GetPod()
+		container, _ := alloc.Properties.GetContainer()
+
+		podKey := newPodKey(cluster, namespace, pod)
+		nodeKey := newNodeKey(cluster, node)
+
+		if n, ok := nodeMap[nodeKey]; !ok {
+			if pod != "unmounted-pvs" {
+				log.Warningf("CostModel.ComputeAllocation: failed to find node %s for %s", nodeKey, alloc.Name)
+			}
+		} else {
+			alloc.CPUCost = alloc.CPUCoreHours * n.CostPerCPUHr
+			alloc.RAMCost = (alloc.RAMByteHours / 1024 / 1024 / 1024) * n.CostPerRAMGiBHr
+			alloc.GPUCost = alloc.GPUHours * n.CostPerGPUHr
+		}
+
+		if pvcs, ok := podPVCMap[podKey]; ok {
+			for _, pvc := range pvcs {
+				// Determine the (start, end) of the relationship between the
+				// given PVC and the associated Allocation so that a precise
+				// number of hours can be used to compute cumulative cost.
+				s, e := alloc.Start, alloc.End
+				if pvc.Start.After(alloc.Start) {
+					s = pvc.Start
+				}
+				if pvc.End.Before(alloc.End) {
+					e = pvc.End
+				}
+				minutes := e.Sub(s).Minutes()
+				hrs := minutes / 60.0
+				gib := pvc.Bytes / 1024 / 1024 / 1024
+
+				alloc.PVByteHours += pvc.Bytes * hrs
+				alloc.PVCost += pvc.Volume.CostPerGiBHour * gib * hrs / float64(pvc.Count)
+			}
+		}
+
+		alloc.TotalCost = 0.0
+		alloc.TotalCost += alloc.CPUCost
+		alloc.TotalCost += alloc.RAMCost
+		alloc.TotalCost += alloc.GPUCost
+		alloc.TotalCost += alloc.PVCost
+		alloc.TotalCost += alloc.NetworkCost
+		alloc.TotalCost += alloc.SharedCost
+		alloc.TotalCost += alloc.ExternalCost
+
+		if alloc.RAMBytesRequestAverage > 0 {
+			alloc.RAMEfficiency = alloc.RAMBytesUsageAverage / alloc.RAMBytesRequestAverage
+		}
+
+		if alloc.CPUCoreRequestAverage > 0 {
+			alloc.CPUEfficiency = alloc.CPUCoreUsageAverage / alloc.CPUCoreRequestAverage
+		}
+
+		if alloc.CPUCost+alloc.RAMCost > 0 {
+			ramCostEff := alloc.RAMEfficiency * alloc.RAMCost
+			cpuCostEff := alloc.CPUEfficiency * alloc.CPUCost
+			alloc.TotalEfficiency = (ramCostEff + cpuCostEff) / (alloc.CPUCost + alloc.RAMCost)
+		}
+
+		// Make sure that the name is correct (node may not be present at this
+		// point due to it missing from queryMinutes) then insert.
+		alloc.Name = fmt.Sprintf("%s/%s/%s/%s/%s", cluster, node, namespace, pod, container)
+		allocSet.Set(alloc)
+	}
+
+	return allocSet, nil
+}
+
+func buildAllocationMap(window kubecost.Window, allocationMap map[containerKey]*kubecost.Allocation, podAllocation map[podKey][]*kubecost.Allocation, clusterStart, clusterEnd map[string]time.Time, resMinutes []*prom.QueryResult) {
+	for _, res := range resMinutes {
+		if len(res.Values) == 0 {
+			log.Warningf("CostModel.ComputeAllocation: empty minutes result")
+			continue
+		}
+
+		cluster, err := res.GetString("cluster_id")
+		if err != nil {
+			cluster = env.GetClusterID()
+		}
+
+		labels, err := res.GetStrings("namespace", "pod", "container")
+		if err != nil {
+			log.Warningf("CostModel.ComputeAllocation: minutes query result missing field: %s", err)
+			continue
+		}
+
+		namespace := labels["namespace"]
+		pod := labels["pod"]
+		container := labels["container"]
+
+		containerKey := newContainerKey(cluster, namespace, pod, container)
+		podKey := newPodKey(cluster, namespace, pod)
+
+		// allocStart and allocEnd are the timestamps of the first and last
+		// minutes the allocation was running, respectively. We subtract 1m
+		// from allocStart because this point will actually represent the end
+		// of the first minute. We don't subtract from allocEnd because it
+		// already represents the end of the last minute.
+		var allocStart, allocEnd time.Time
+		for _, datum := range res.Values {
+			t := time.Unix(int64(datum.Timestamp), 0)
+			if allocStart.IsZero() && datum.Value > 0 && window.Contains(t) {
+				allocStart = t
+			}
+			if datum.Value > 0 && window.Contains(t) {
+				allocEnd = t
+			}
+		}
+		if allocStart.IsZero() || allocEnd.IsZero() {
+			continue
+		}
+		allocStart = allocStart.Add(-time.Minute)
+
+		// Set start if unset or this datum's start time is earlier than the
+		// current earliest time.
+		if _, ok := clusterStart[cluster]; !ok || allocStart.Before(clusterStart[cluster]) {
+			clusterStart[cluster] = allocStart
+		}
+
+		// Set end if unset or this datum's end time is later than the
+		// current latest time.
+		if _, ok := clusterEnd[cluster]; !ok || allocEnd.After(clusterEnd[cluster]) {
+			clusterEnd[cluster] = allocEnd
+		}
+
+		name := fmt.Sprintf("%s/%s/%s/%s", cluster, namespace, pod, container)
+
+		alloc := &kubecost.Allocation{
+			Name:       name,
+			Properties: kubecost.Properties{},
+			Window:     window.Clone(),
+			Start:      allocStart,
+			End:        allocEnd,
+		}
+		alloc.Properties.SetContainer(container)
+		alloc.Properties.SetPod(pod)
+		alloc.Properties.SetNamespace(namespace)
+		alloc.Properties.SetCluster(cluster)
+
+		allocationMap[containerKey] = alloc
+
+		if _, ok := podAllocation[podKey]; !ok {
+			podAllocation[podKey] = []*kubecost.Allocation{}
+		}
+		podAllocation[podKey] = append(podAllocation[podKey], alloc)
+	}
+}
+
+func applyCPUCoresAllocated(allocationMap map[containerKey]*kubecost.Allocation, resCPUCoresAllocated []*prom.QueryResult) {
+	for _, res := range resCPUCoresAllocated {
+		key, err := resultContainerKey(res, "cluster_id", "namespace", "pod", "container")
+		if err != nil {
+			log.Warningf("CostModel.ComputeAllocation: CPU allocation query result missing field: %s", err)
+			continue
+		}
+
+		_, ok := allocationMap[key]
+		if !ok {
+			log.Warningf("CostModel.ComputeAllocation: unidentified CPU allocation query result: %s", key)
+			continue
+		}
+
+		cpuCores := res.Values[0].Value
+		hours := allocationMap[key].Minutes() / 60.0
+		allocationMap[key].CPUCoreHours = cpuCores * hours
+
+		node, err := res.GetString("node")
+		if err != nil {
+			log.Warningf("CostModel.ComputeAllocation: CPU allocation query result missing 'node': %s", key)
+			continue
+		}
+		allocationMap[key].Properties.SetNode(node)
+	}
+}
+
+func applyCPUCoresRequested(allocationMap map[containerKey]*kubecost.Allocation, resCPUCoresRequested []*prom.QueryResult) {
+	for _, res := range resCPUCoresRequested {
+		key, err := resultContainerKey(res, "cluster_id", "namespace", "pod", "container")
+		if err != nil {
+			log.Warningf("CostModel.ComputeAllocation: CPU request query result missing field: %s", err)
+			continue
+		}
+
+		_, ok := allocationMap[key]
+		if !ok {
+			continue
+		}
+
+		allocationMap[key].CPUCoreRequestAverage = res.Values[0].Value
+
+		// CPU allocation is less than requests, so set CPUCoreHours to
+		// request level.
+		// TODO niko/cdmr why is this happening?
+		if allocationMap[key].CPUCores() < res.Values[0].Value {
+			allocationMap[key].CPUCoreHours = res.Values[0].Value * (allocationMap[key].Minutes() / 60.0)
+		}
+
+		node, err := res.GetString("node")
+		if err != nil {
+			log.Warningf("CostModel.ComputeAllocation: CPU request query result missing 'node': %s", key)
+			continue
+		}
+		allocationMap[key].Properties.SetNode(node)
+	}
+}
+
+func applyCPUCoresUsed(allocationMap map[containerKey]*kubecost.Allocation, resCPUCoresUsed []*prom.QueryResult) {
+	for _, res := range resCPUCoresUsed {
+		key, err := resultContainerKey(res, "cluster_id", "namespace", "pod_name", "container_name")
+		if err != nil {
+			log.Warningf("CostModel.ComputeAllocation: CPU usage query result missing field: %s", err)
+			continue
+		}
+
+		_, ok := allocationMap[key]
+		if !ok {
+			log.Warningf("CostModel.ComputeAllocation: unidentified CPU usage query result: %s", key)
+			continue
+		}
+
+		allocationMap[key].CPUCoreUsageAverage = res.Values[0].Value
+	}
+}
+
+func applyRAMBytesAllocated(allocationMap map[containerKey]*kubecost.Allocation, resRAMBytesAllocated []*prom.QueryResult) {
+	for _, res := range resRAMBytesAllocated {
+		key, err := resultContainerKey(res, "cluster_id", "namespace", "pod", "container")
+		if err != nil {
+			log.Warningf("CostModel.ComputeAllocation: RAM allocation query result missing field: %s", err)
+			continue
+		}
+
+		_, ok := allocationMap[key]
+		if !ok {
+			log.Warningf("CostModel.ComputeAllocation: unidentified RAM allocation query result: %s", key)
+			continue
+		}
+
+		ramBytes := res.Values[0].Value
+		hours := allocationMap[key].Minutes() / 60.0
+		allocationMap[key].RAMByteHours = ramBytes * hours
+
+		node, err := res.GetString("node")
+		if err != nil {
+			log.Warningf("CostModel.ComputeAllocation: RAM allocation query result missing 'node': %s", key)
+			continue
+		}
+		allocationMap[key].Properties.SetNode(node)
+	}
+}
+
+func applyRAMBytesRequested(allocationMap map[containerKey]*kubecost.Allocation, resRAMBytesRequested []*prom.QueryResult) {
+	for _, res := range resRAMBytesRequested {
+		key, err := resultContainerKey(res, "cluster_id", "namespace", "pod", "container")
+		if err != nil {
+			log.Warningf("CostModel.ComputeAllocation: RAM request query result missing field: %s", err)
+			continue
+		}
+
+		_, ok := allocationMap[key]
+		if !ok {
+			continue
+		}
+
+		allocationMap[key].RAMBytesRequestAverage = res.Values[0].Value
+
+		// RAM allocation is less than requests, so set RAMByteHours to
+		// request level.
+		// TODO niko/cdmr why is this happening?
+		if allocationMap[key].RAMBytes() < res.Values[0].Value {
+			allocationMap[key].RAMByteHours = res.Values[0].Value * (allocationMap[key].Minutes() / 60.0)
+		}
+
+		node, err := res.GetString("node")
+		if err != nil {
+			log.Warningf("CostModel.ComputeAllocation: RAM request query result missing 'node': %s", key)
+			continue
+		}
+		allocationMap[key].Properties.SetNode(node)
+	}
+}
+
+func applyRAMBytesUsed(allocationMap map[containerKey]*kubecost.Allocation, resRAMBytesUsed []*prom.QueryResult) {
+	for _, res := range resRAMBytesUsed {
+		key, err := resultContainerKey(res, "cluster_id", "namespace", "pod_name", "container_name")
+		if err != nil {
+			log.Warningf("CostModel.ComputeAllocation: RAM usage query result missing field: %s", err)
+			continue
+		}
+
+		_, ok := allocationMap[key]
+		if !ok {
+			log.Warningf("CostModel.ComputeAllocation: unidentified RAM usage query result: %s", key)
+			continue
+		}
+
+		allocationMap[key].RAMBytesUsageAverage = res.Values[0].Value
+	}
+}
+
+func applyGPUsRequested(allocationMap map[containerKey]*kubecost.Allocation, resGPUsRequested []*prom.QueryResult) {
+	for _, res := range resGPUsRequested {
+		key, err := resultContainerKey(res, "cluster_id", "namespace", "pod", "container")
+		if err != nil {
+			log.Warningf("CostModel.ComputeAllocation: GPU allocation query result missing field: %s", err)
+			continue
+		}
+
+		_, ok := allocationMap[key]
+		if !ok {
+			log.Warningf("CostModel.ComputeAllocation: unidentified GPU allocation query result: %s", key)
+			continue
+		}
+
+		// TODO niko/cdmr complete
+		log.Infof("CostModel.ComputeAllocation: GPU results: %s=%f", key, res.Values[0].Value)
+	}
+}
+
+func applyNetworkAllocation(allocationMap map[containerKey]*kubecost.Allocation, podAllocation map[podKey][]*kubecost.Allocation, resNetworkGiB []*prom.QueryResult, resNetworkCostPerGiB []*prom.QueryResult) {
+	costPerGiBByCluster := map[string]float64{}
+
+	for _, res := range resNetworkCostPerGiB {
+		cluster, err := res.GetString("cluster_id")
+		if err != nil {
+			cluster = env.GetClusterID()
+		}
+
+		costPerGiBByCluster[cluster] = res.Values[0].Value
+	}
+
+	for _, res := range resNetworkGiB {
+		podKey, err := resultPodKey(res, "cluster_id", "namespace", "pod_name")
+		if err != nil {
+			log.Warningf("CostModel.ComputeAllocation: Network allocation query result missing field: %s", err)
+			continue
+		}
+
+		allocs, ok := podAllocation[podKey]
+		if !ok {
+			log.Warningf("CostModel.ComputeAllocation: Network allocation query result for unidentified pod allocations: %s", podKey)
+			continue
+		}
+
+		for _, alloc := range allocs {
+			gib := res.Values[0].Value
+			costPerGiB := costPerGiBByCluster[podKey.Cluster]
+			alloc.NetworkCost = gib * costPerGiB
+		}
+	}
+}
+
+func resToNamespaceLabels(resNamespaceLabels []*prom.QueryResult) map[string]map[string]string {
+	namespaceLabels := map[string]map[string]string{}
+
+	for _, res := range resNamespaceLabels {
+		namespace, err := res.GetString("namespace")
+		if err != nil {
+			continue
+		}
+
+		if _, ok := namespaceLabels[namespace]; !ok {
+			namespaceLabels[namespace] = map[string]string{}
+		}
+
+		for k, l := range res.GetLabels() {
+			namespaceLabels[namespace][k] = l
+		}
+	}
+
+	return namespaceLabels
+}
+
+func resToPodLabels(resPodLabels []*prom.QueryResult) map[podKey]map[string]string {
+	podLabels := map[podKey]map[string]string{}
+
+	for _, res := range resPodLabels {
+		podKey, err := resultPodKey(res, "cluster_id", "namespace", "pod")
+		if err != nil {
+			continue
+		}
+
+		if _, ok := podLabels[podKey]; !ok {
+			podLabels[podKey] = map[string]string{}
+		}
+
+		for k, l := range res.GetLabels() {
+			podLabels[podKey][k] = l
+		}
+	}
+
+	return podLabels
+}
+
+func resToNamespaceAnnotations(resNamespaceAnnotations []*prom.QueryResult) map[string]map[string]string {
+	namespaceAnnotations := map[string]map[string]string{}
+
+	for _, res := range resNamespaceAnnotations {
+		namespace, err := res.GetString("namespace")
+		if err != nil {
+			continue
+		}
+
+		if _, ok := namespaceAnnotations[namespace]; !ok {
+			namespaceAnnotations[namespace] = map[string]string{}
+		}
+
+		for k, l := range res.GetAnnotations() {
+			namespaceAnnotations[namespace][k] = l
+		}
+	}
+
+	return namespaceAnnotations
+}
+
+func resToPodAnnotations(resPodAnnotations []*prom.QueryResult) map[podKey]map[string]string {
+	podAnnotations := map[podKey]map[string]string{}
+
+	for _, res := range resPodAnnotations {
+		podKey, err := resultPodKey(res, "cluster_id", "namespace", "pod")
+		if err != nil {
+			continue
+		}
+
+		if _, ok := podAnnotations[podKey]; !ok {
+			podAnnotations[podKey] = map[string]string{}
+		}
+
+		for k, l := range res.GetAnnotations() {
+			podAnnotations[podKey][k] = l
+		}
+	}
+
+	return podAnnotations
+}
+
+func applyLabels(allocationMap map[containerKey]*kubecost.Allocation, namespaceLabels map[string]map[string]string, podLabels map[podKey]map[string]string) {
+	for key, alloc := range allocationMap {
+		allocLabels, err := alloc.Properties.GetLabels()
+		if err != nil {
+			allocLabels = map[string]string{}
+		}
+
+		// Apply namespace labels first, then pod labels so that pod labels
+		// overwrite namespace labels.
+		if labels, ok := namespaceLabels[key.Namespace]; ok {
+			for k, v := range labels {
+				allocLabels[k] = v
+			}
+		}
+		podKey := newPodKey(key.Cluster, key.Namespace, key.Pod)
+		if labels, ok := podLabels[podKey]; ok {
+			for k, v := range labels {
+				allocLabels[k] = v
+			}
+		}
+
+		alloc.Properties.SetLabels(allocLabels)
+	}
+}
+
+func applyAnnotations(allocationMap map[containerKey]*kubecost.Allocation, namespaceAnnotations map[string]map[string]string, podAnnotations map[podKey]map[string]string) {
+	for key, alloc := range allocationMap {
+		allocAnnotations, err := alloc.Properties.GetAnnotations()
+		if err != nil {
+			allocAnnotations = map[string]string{}
+		}
+
+		// Apply namespace annotations first, then pod annotations so that
+		// pod labels overwrite namespace labels.
+		if labels, ok := namespaceAnnotations[key.Namespace]; ok {
+			for k, v := range labels {
+				allocAnnotations[k] = v
+			}
+		}
+		podKey := newPodKey(key.Cluster, key.Namespace, key.Pod)
+		if labels, ok := podAnnotations[podKey]; ok {
+			for k, v := range labels {
+				allocAnnotations[k] = v
+			}
+		}
+
+		alloc.Properties.SetAnnotations(allocAnnotations)
+	}
+}
+
+func getServiceLabels(resServiceLabels []*prom.QueryResult) map[serviceKey]map[string]string {
+	serviceLabels := map[serviceKey]map[string]string{}
+
+	for _, res := range resServiceLabels {
+		serviceKey, err := resultServiceKey(res, "cluster_id", "namespace", "service")
+		if err != nil {
+			continue
+		}
+
+		if _, ok := serviceLabels[serviceKey]; !ok {
+			serviceLabels[serviceKey] = map[string]string{}
+		}
+
+		for k, l := range res.GetLabels() {
+			serviceLabels[serviceKey][k] = l
+		}
+	}
+
+	return serviceLabels
+}
+
+func resToDeploymentLabels(resDeploymentLabels []*prom.QueryResult) map[controllerKey]map[string]string {
+	deploymentLabels := map[controllerKey]map[string]string{}
+
+	for _, res := range resDeploymentLabels {
+		controllerKey, err := resultDeploymentKey(res, "cluster_id", "namespace", "deployment")
+		if err != nil {
+			continue
+		}
+
+		if _, ok := deploymentLabels[controllerKey]; !ok {
+			deploymentLabels[controllerKey] = map[string]string{}
+		}
+
+		for k, l := range res.GetLabels() {
+			deploymentLabels[controllerKey][k] = l
+		}
+	}
+
+	return deploymentLabels
+}
+
+func resToStatefulSetLabels(resStatefulSetLabels []*prom.QueryResult) map[controllerKey]map[string]string {
+	statefulSetLabels := map[controllerKey]map[string]string{}
+
+	for _, res := range resStatefulSetLabels {
+		controllerKey, err := resultStatefulSetKey(res, "cluster_id", "namespace", "statefulSet")
+		if err != nil {
+			continue
+		}
+
+		if _, ok := statefulSetLabels[controllerKey]; !ok {
+			statefulSetLabels[controllerKey] = map[string]string{}
+		}
+
+		for k, l := range res.GetLabels() {
+			statefulSetLabels[controllerKey][k] = l
+		}
+	}
+
+	return statefulSetLabels
+}
+
+func labelsToPodControllerMap(podLabels map[podKey]map[string]string, controllerLabels map[controllerKey]map[string]string) map[podKey]controllerKey {
+	podControllerMap := map[podKey]controllerKey{}
+
+	// For each controller, turn the labels into a selector and attempt to
+	// match it with each set of pod labels. A match indicates that the pod
+	// belongs to the controller.
+	for cKey, cLabels := range controllerLabels {
+		selector := labels.Set(cLabels).AsSelectorPreValidated()
+
+		for pKey, pLabels := range podLabels {
+			// If the pod is in a different cluster or namespace, there is
+			// no need to compare the labels.
+			if cKey.Cluster != pKey.Cluster || cKey.Namespace != pKey.Namespace {
+				continue
+			}
+
+			podLabelSet := labels.Set(pLabels)
+			if selector.Matches(podLabelSet) {
+				// TODO niko/cdmr does this need to be one-to-many? In that case, we'd
+				// need a different Allocation schema
+				if _, ok := podControllerMap[pKey]; ok {
+					log.Warningf("CostModel.ComputeAllocation: PodControllerMap match already exists: %s matches %s and %s", pKey, podControllerMap[pKey], cKey)
+				}
+				podControllerMap[pKey] = cKey
+			}
+		}
+	}
+
+	return podControllerMap
+}
+
+func resToPodDaemonSetMap(resDaemonSetLabels []*prom.QueryResult) map[podKey]controllerKey {
+	daemonSetLabels := map[podKey]controllerKey{}
+
+	for _, res := range resDaemonSetLabels {
+		controllerKey, err := resultDaemonSetKey(res, "cluster_id", "namespace", "owner_name")
+		if err != nil {
+			continue
+		}
+
+		pod, err := res.GetString("pod")
+		if err != nil {
+			log.Warningf("CostModel.ComputeAllocation: DaemonSetLabel result without pod: %s", controllerKey)
+		}
+
+		podKey := newPodKey(controllerKey.Cluster, controllerKey.Namespace, pod)
+
+		daemonSetLabels[podKey] = controllerKey
+	}
+
+	return daemonSetLabels
+}
+
+func resToPodJobMap(resJobLabels []*prom.QueryResult) map[podKey]controllerKey {
+	jobLabels := map[podKey]controllerKey{}
+
+	for _, res := range resJobLabels {
+		controllerKey, err := resultJobKey(res, "cluster_id", "namespace", "owner_name")
+		if err != nil {
+			continue
+		}
+
+		pod, err := res.GetString("pod")
+		if err != nil {
+			log.Warningf("CostModel.ComputeAllocation: JobLabel result without pod: %s", controllerKey)
+		}
+
+		podKey := newPodKey(controllerKey.Cluster, controllerKey.Namespace, pod)
+
+		jobLabels[podKey] = controllerKey
+	}
+
+	return jobLabels
+}
+
+func applyServicesToPods(allocationMap map[containerKey]*kubecost.Allocation, podLabels map[podKey]map[string]string, serviceLabels map[serviceKey]map[string]string) {
+	podServicesMap := map[podKey][]serviceKey{}
+
+	// For each service, turn the labels into a selector and attempt to
+	// match it with each set of pod labels. A match indicates that the pod
+	// belongs to the service.
+	for sKey, sLabels := range serviceLabels {
+		selector := labels.Set(sLabels).AsSelectorPreValidated()
+
+		for pKey, pLabels := range podLabels {
+			// If the pod is in a different cluster or namespace, there is
+			// no need to compare the labels.
+			if sKey.Cluster != pKey.Cluster || sKey.Namespace != pKey.Namespace {
+				continue
+			}
+
+			podLabelSet := labels.Set(pLabels)
+			if selector.Matches(podLabelSet) {
+				if _, ok := podServicesMap[pKey]; !ok {
+					podServicesMap[pKey] = []serviceKey{}
+				}
+				podServicesMap[pKey] = append(podServicesMap[pKey], sKey)
+			}
+		}
+	}
+
+	// For each allocation, attempt to find and apply the list of services
+	// associated with the allocation's pod.
+	for key, alloc := range allocationMap {
+		pKey := newPodKey(key.Cluster, key.Namespace, key.Pod)
+		if sKeys, ok := podServicesMap[pKey]; ok {
+			services := []string{}
+			for _, sKey := range sKeys {
+				services = append(services, sKey.Service)
+			}
+			alloc.Properties.SetServices(services)
+		}
+	}
+}
+
+func applyControllersToPods(allocationMap map[containerKey]*kubecost.Allocation, podControllerMap map[podKey]controllerKey) {
+	for key, alloc := range allocationMap {
+		podKey := newPodKey(key.Cluster, key.Namespace, key.Pod)
+		if controllerKey, ok := podControllerMap[podKey]; ok {
+			alloc.Properties.SetControllerKind(controllerKey.ControllerKind)
+			alloc.Properties.SetController(controllerKey.Controller)
+		}
+	}
+}
+
+func applyNodeCostPerCPUHr(nodeMap map[nodeKey]*Node, resNodeCostPerCPUHr []*prom.QueryResult) {
+	for _, res := range resNodeCostPerCPUHr {
+		cluster, err := res.GetString("cluster_id")
+		if err != nil {
+			cluster = env.GetClusterID()
+		}
+
+		node, err := res.GetString("node")
+		if err != nil {
+			log.Warningf("CostModel.ComputeAllocation: Node CPU cost query result missing field: %s", err)
+			continue
+		}
+
+		instanceType, err := res.GetString("instance_type")
+		if err != nil {
+			log.Warningf("CostModel.ComputeAllocation: Node CPU cost query result missing field: %s", err)
+			continue
+		}
+
+		key := newNodeKey(cluster, node)
+		if _, ok := nodeMap[key]; !ok {
+			nodeMap[key] = &Node{
+				Name:     node,
+				NodeType: instanceType,
+			}
+		}
+
+		nodeMap[key].CostPerCPUHr = res.Values[0].Value
+	}
+}
+
+func applyNodeCostPerRAMGiBHr(nodeMap map[nodeKey]*Node, resNodeCostPerRAMGiBHr []*prom.QueryResult) {
+	for _, res := range resNodeCostPerRAMGiBHr {
+		cluster, err := res.GetString("cluster_id")
+		if err != nil {
+			cluster = env.GetClusterID()
+		}
+
+		node, err := res.GetString("node")
+		if err != nil {
+			log.Warningf("CostModel.ComputeAllocation: Node RAM cost query result missing field: %s", err)
+			continue
+		}
+
+		instanceType, err := res.GetString("instance_type")
+		if err != nil {
+			log.Warningf("CostModel.ComputeAllocation: Node RAM cost query result missing field: %s", err)
+			continue
+		}
+
+		key := newNodeKey(cluster, node)
+		if _, ok := nodeMap[key]; !ok {
+			nodeMap[key] = &Node{
+				Name:     node,
+				NodeType: instanceType,
+			}
+		}
+
+		nodeMap[key].CostPerRAMGiBHr = res.Values[0].Value
+	}
+}
+
+func applyNodeCostPerGPUHr(nodeMap map[nodeKey]*Node, resNodeCostPerGPUHr []*prom.QueryResult) {
+	for _, res := range resNodeCostPerGPUHr {
+		cluster, err := res.GetString("cluster_id")
+		if err != nil {
+			cluster = env.GetClusterID()
+		}
+
+		node, err := res.GetString("node")
+		if err != nil {
+			log.Warningf("CostModel.ComputeAllocation: Node GPU cost query result missing field: %s", err)
+			continue
+		}
+
+		instanceType, err := res.GetString("instance_type")
+		if err != nil {
+			log.Warningf("CostModel.ComputeAllocation: Node GPU cost query result missing field: %s", err)
+			continue
+		}
+
+		key := newNodeKey(cluster, node)
+		if _, ok := nodeMap[key]; !ok {
+			nodeMap[key] = &Node{
+				Name:     node,
+				NodeType: instanceType,
+			}
+		}
+
+		nodeMap[key].CostPerGPUHr = res.Values[0].Value
+	}
+}
+
+func applyNodeSpot(nodeMap map[nodeKey]*Node, resNodeIsSpot []*prom.QueryResult) {
+	for _, res := range resNodeIsSpot {
+		cluster, err := res.GetString("cluster_id")
+		if err != nil {
+			cluster = env.GetClusterID()
+		}
+
+		node, err := res.GetString("node")
+		if err != nil {
+			log.Warningf("CostModel.ComputeAllocation: Node spot query result missing field: %s", err)
+			continue
+		}
+
+		key := newNodeKey(cluster, node)
+		if _, ok := nodeMap[key]; !ok {
+			log.Warningf("CostModel.ComputeAllocation: Node spot  query result for missing node: %s", key)
+			continue
+		}
+
+		nodeMap[key].Preemptible = res.Values[0].Value > 0
+	}
+}
+
+func applyNodeDiscount(nodeMap map[nodeKey]*Node, cm *CostModel) {
+	if cm == nil {
+		return
+	}
+
+	c, err := cm.Provider.GetConfig()
+	if err != nil {
+		log.Errorf("CostModel.ComputeAllocation: applyNodeDiscount: %s", err)
+		return
+	}
+
+	discount, err := ParsePercentString(c.Discount)
+	if err != nil {
+		log.Errorf("CostModel.ComputeAllocation: applyNodeDiscount: %s", err)
+		return
+	}
+
+	negotiatedDiscount, err := ParsePercentString(c.NegotiatedDiscount)
+	if err != nil {
+		log.Errorf("CostModel.ComputeAllocation: applyNodeDiscount: %s", err)
+		return
+	}
+
+	for _, node := range nodeMap {
+		// TODO niko/cdmr take RI into account?
+		node.Discount = cm.Provider.CombinedDiscountForNode(node.NodeType, node.Preemptible, discount, negotiatedDiscount)
+		node.CostPerCPUHr *= (1.0 - node.Discount)
+		node.CostPerRAMGiBHr *= (1.0 - node.Discount)
+	}
+}
+
+func buildPVMap(pvMap map[pvKey]*PV, resPVCostPerGiBHour []*prom.QueryResult) {
+	for _, res := range resPVCostPerGiBHour {
+		cluster, err := res.GetString("cluster_id")
+		if err != nil {
+			cluster = env.GetClusterID()
+		}
+
+		name, err := res.GetString("volumename")
+		if err != nil {
+			log.Warningf("CostModel.ComputeAllocation: PV cost without volumename")
+			continue
+		}
+
+		key := newPVKey(cluster, name)
+
+		pvMap[key] = &PV{
+			Cluster:        cluster,
+			Name:           name,
+			CostPerGiBHour: res.Values[0].Value,
+		}
+	}
+}
+
+func applyPVBytes(pvMap map[pvKey]*PV, resPVBytes []*prom.QueryResult) {
+	for _, res := range resPVBytes {
+		key, err := resultPVKey(res, "cluster_id", "persistentvolume")
+		if err != nil {
+			log.Warningf("CostModel.ComputeAllocation: PV bytes query result missing field: %s", err)
+			continue
+		}
+
+		if _, ok := pvMap[key]; !ok {
+			log.Warningf("CostModel.ComputeAllocation: PV bytes result for missing PV: %s", err)
+			continue
+		}
+
+		pvMap[key].Bytes = res.Values[0].Value
+	}
+}
+
+func buildPVCMap(window kubecost.Window, pvcMap map[pvcKey]*PVC, pvMap map[pvKey]*PV, resPVCInfo []*prom.QueryResult) {
+	for _, res := range resPVCInfo {
+		cluster, err := res.GetString("cluster_id")
+		if err != nil {
+			cluster = env.GetClusterID()
+		}
+
+		values, err := res.GetStrings("persistentvolumeclaim", "storageclass", "volumename", "namespace")
+		if err != nil {
+			log.Warningf("CostModel.ComputeAllocation: PVC info query result missing field: %s", err)
+			continue
+		}
+
+		namespace := values["namespace"]
+		name := values["persistentvolumeclaim"]
+		volume := values["volumename"]
+		storageClass := values["storageclass"]
+
+		pvKey := newPVKey(cluster, volume)
+		pvcKey := newPVCKey(cluster, namespace, name)
+
+		// pvcStart and pvcEnd are the timestamps of the first and last minutes
+		// the PVC was running, respectively. We subtract 1m from pvcStart
+		// because this point will actually represent the end of the first
+		// minute. We don't subtract from pvcEnd because it already represents
+		// the end of the last minute.
+		var pvcStart, pvcEnd time.Time
+		for _, datum := range res.Values {
+			t := time.Unix(int64(datum.Timestamp), 0)
+			if pvcStart.IsZero() && datum.Value > 0 && window.Contains(t) {
+				pvcStart = t
+			}
+			if datum.Value > 0 && window.Contains(t) {
+				pvcEnd = t
+			}
+		}
+		if pvcStart.IsZero() || pvcEnd.IsZero() {
+			log.Warningf("CostModel.ComputeAllocation: PVC %s has no running time", pvcKey)
+		}
+		pvcStart = pvcStart.Add(-time.Minute)
+
+		if _, ok := pvMap[pvKey]; !ok {
+			log.Warningf("CostModel.ComputeAllocation: PV missing for PVC info query result: %s", pvKey)
+			continue
+		}
+
+		pvMap[pvKey].StorageClass = storageClass
+
+		if _, ok := pvcMap[pvcKey]; !ok {
+			pvcMap[pvcKey] = &PVC{}
+		}
+
+		pvcMap[pvcKey].Name = name
+		pvcMap[pvcKey].Namespace = namespace
+		pvcMap[pvcKey].Volume = pvMap[pvKey]
+		pvcMap[pvcKey].Start = pvcStart
+		pvcMap[pvcKey].End = pvcEnd
+	}
+}
+
+func applyPVCBytesRequested(pvcMap map[pvcKey]*PVC, resPVCBytesRequested []*prom.QueryResult) {
+	for _, res := range resPVCBytesRequested {
+		key, err := resultPVCKey(res, "cluster_id", "namespace", "persistentvolumeclaim")
+		if err != nil {
+			log.Warningf("CostModel.ComputeAllocation: PVC bytes requested query result missing field: %s", err)
+			continue
+		}
+
+		if _, ok := pvcMap[key]; !ok {
+			log.Warningf("CostModel.ComputeAllocation: PVC bytes requested result for missing PVC: %s", key)
+			continue
+		}
+
+		pvcMap[key].Bytes = res.Values[0].Value
+	}
+}
+
+func buildPodPVCMap(podPVCMap map[podKey][]*PVC, pvMap map[pvKey]*PV, pvcMap map[pvcKey]*PVC, podAllocation map[podKey][]*kubecost.Allocation, resPodPVCAllocation []*prom.QueryResult) {
+	for _, res := range resPodPVCAllocation {
+		cluster, err := res.GetString("cluster_id")
+		if err != nil {
+			cluster = env.GetClusterID()
+		}
+
+		values, err := res.GetStrings("persistentvolume", "persistentvolumeclaim", "pod", "namespace")
+		if err != nil {
+			log.Warningf("CostModel.ComputeAllocation: PVC allocation query result missing field: %s", err)
+			continue
+		}
+
+		namespace := values["namespace"]
+		pod := values["pod"]
+		name := values["persistentvolumeclaim"]
+		volume := values["persistentvolume"]
+
+		podKey := newPodKey(cluster, namespace, pod)
+		pvKey := newPVKey(cluster, volume)
+		pvcKey := newPVCKey(cluster, namespace, name)
+
+		if _, ok := pvMap[pvKey]; !ok {
+			log.Warningf("CostModel.ComputeAllocation: PV missing for PVC allocation query result: %s", pvKey)
+			continue
+		}
+
+		if _, ok := podPVCMap[podKey]; !ok {
+			podPVCMap[podKey] = []*PVC{}
+		}
+
+		pvc, ok := pvcMap[pvcKey]
+		if !ok {
+			log.Warningf("CostModel.ComputeAllocation: PVC missing for PVC allocation query: %s", pvcKey)
+			continue
+		}
+
+		pvc.Count = len(podAllocation[podKey])
+
+		podPVCMap[podKey] = append(podPVCMap[podKey], pvc)
+	}
+}
+
+func applyUnmountedPVs(window kubecost.Window, allocationMap map[containerKey]*kubecost.Allocation, pvMap map[pvKey]*PV, pvcMap map[pvcKey]*PVC) {
+	unmountedPVBytes := map[string]float64{}
+	unmountedPVCost := map[string]float64{}
+
+	for _, pv := range pvMap {
+		mounted := false
+		for _, pvc := range pvcMap {
+			if pvc.Volume == nil {
+				continue
+			}
+			if pvc.Volume == pv {
+				mounted = true
+				break
+			}
+		}
+
+		if !mounted {
+			gib := pv.Bytes / 1024 / 1024 / 1024
+			hrs := window.Minutes() / 60.0
+			cost := pv.CostPerGiBHour * gib * hrs
+			unmountedPVCost[pv.Cluster] += cost
+			unmountedPVBytes[pv.Cluster] += pv.Bytes
+		}
+	}
+
+	for cluster, amount := range unmountedPVCost {
+		container := "unmounted-pvs"
+		pod := "unmounted-pvs"
+		namespace := "" // TODO niko/cdmr what about this?
+
+		containerKey := newContainerKey(cluster, namespace, pod, container)
+		allocationMap[containerKey] = &kubecost.Allocation{
+			Name: fmt.Sprintf("%s/%s/%s/%s", cluster, namespace, pod, container),
+			Properties: kubecost.Properties{
+				kubecost.ClusterProp:   cluster,
+				kubecost.NamespaceProp: namespace,
+				kubecost.PodProp:       pod,
+				kubecost.ContainerProp: container,
+			},
+			Window:      window.Clone(),
+			Start:       *window.Start(),
+			End:         *window.End(),
+			PVByteHours: unmountedPVBytes[cluster] * window.Minutes() / 60.0,
+			PVCost:      amount,
+			TotalCost:   amount,
+		}
+	}
+}
+
+type containerKey struct {
+	Cluster   string
+	Namespace string
+	Pod       string
+	Container string
+}
+
+func (k containerKey) String() string {
+	return fmt.Sprintf("%s/%s/%s/%s", k.Cluster, k.Namespace, k.Pod, k.Container)
+}
+
+func newContainerKey(cluster, namespace, pod, container string) containerKey {
+	return containerKey{
+		Cluster:   cluster,
+		Namespace: namespace,
+		Pod:       pod,
+		Container: container,
+	}
+}
+
+func resultContainerKey(res *prom.QueryResult, clusterLabel, namespaceLabel, podLabel, containerLabel string) (containerKey, error) {
+	key := containerKey{}
+
+	cluster, err := res.GetString(clusterLabel)
+	if err != nil {
+		cluster = env.GetClusterID()
+	}
+	key.Cluster = cluster
+
+	namespace, err := res.GetString(namespaceLabel)
+	if err != nil {
+		return key, err
+	}
+	key.Namespace = namespace
+
+	pod, err := res.GetString(podLabel)
+	if err != nil {
+		return key, err
+	}
+	key.Pod = pod
+
+	container, err := res.GetString(containerLabel)
+	if err != nil {
+		return key, err
+	}
+	key.Container = container
+
+	return key, nil
+}
+
+type podKey struct {
+	Cluster   string
+	Namespace string
+	Pod       string
+}
+
+func (k podKey) String() string {
+	return fmt.Sprintf("%s/%s/%s", k.Cluster, k.Namespace, k.Pod)
+}
+
+func newPodKey(cluster, namespace, pod string) podKey {
+	return podKey{
+		Cluster:   cluster,
+		Namespace: namespace,
+		Pod:       pod,
+	}
+}
+
+func resultPodKey(res *prom.QueryResult, clusterLabel, namespaceLabel, podLabel string) (podKey, error) {
+	key := podKey{}
+
+	cluster, err := res.GetString(clusterLabel)
+	if err != nil {
+		cluster = env.GetClusterID()
+	}
+	key.Cluster = cluster
+
+	namespace, err := res.GetString(namespaceLabel)
+	if err != nil {
+		return key, err
+	}
+	key.Namespace = namespace
+
+	pod, err := res.GetString(podLabel)
+	if err != nil {
+		return key, err
+	}
+	key.Pod = pod
+
+	return key, nil
+}
+
+type controllerKey struct {
+	Cluster        string
+	Namespace      string
+	ControllerKind string
+	Controller     string
+}
+
+func (k controllerKey) String() string {
+	return fmt.Sprintf("%s/%s/%s/%s", k.Cluster, k.Namespace, k.ControllerKind, k.Controller)
+}
+
+func newControllerKey(cluster, namespace, controllerKind, controller string) controllerKey {
+	return controllerKey{
+		Cluster:        cluster,
+		Namespace:      namespace,
+		ControllerKind: controllerKind,
+		Controller:     controller,
+	}
+}
+
+func resultControllerKey(controllerKind string, res *prom.QueryResult, clusterLabel, namespaceLabel, controllerLabel string) (controllerKey, error) {
+	key := controllerKey{}
+
+	cluster, err := res.GetString(clusterLabel)
+	if err != nil {
+		cluster = env.GetClusterID()
+	}
+	key.Cluster = cluster
+
+	namespace, err := res.GetString(namespaceLabel)
+	if err != nil {
+		return key, err
+	}
+	key.Namespace = namespace
+
+	controller, err := res.GetString(controllerLabel)
+	if err != nil {
+		return key, err
+	}
+	key.Controller = controller
+
+	key.ControllerKind = controllerKind
+
+	return key, nil
+}
+
+func resultDeploymentKey(res *prom.QueryResult, clusterLabel, namespaceLabel, controllerLabel string) (controllerKey, error) {
+	return resultControllerKey("deployment", res, clusterLabel, namespaceLabel, controllerLabel)
+}
+
+func resultStatefulSetKey(res *prom.QueryResult, clusterLabel, namespaceLabel, controllerLabel string) (controllerKey, error) {
+	return resultControllerKey("statefulset", res, clusterLabel, namespaceLabel, controllerLabel)
+}
+
+func resultDaemonSetKey(res *prom.QueryResult, clusterLabel, namespaceLabel, controllerLabel string) (controllerKey, error) {
+	return resultControllerKey("daemonset", res, clusterLabel, namespaceLabel, controllerLabel)
+}
+
+func resultJobKey(res *prom.QueryResult, clusterLabel, namespaceLabel, controllerLabel string) (controllerKey, error) {
+	return resultControllerKey("job", res, clusterLabel, namespaceLabel, controllerLabel)
+}
+
+type serviceKey struct {
+	Cluster   string
+	Namespace string
+	Service   string
+}
+
+func (k serviceKey) String() string {
+	return fmt.Sprintf("%s/%s/%s", k.Cluster, k.Namespace, k.Service)
+}
+
+func newServiceKey(cluster, namespace, service string) serviceKey {
+	return serviceKey{
+		Cluster:   cluster,
+		Namespace: namespace,
+		Service:   service,
+	}
+}
+
+func resultServiceKey(res *prom.QueryResult, clusterLabel, namespaceLabel, serviceLabel string) (serviceKey, error) {
+	key := serviceKey{}
+
+	cluster, err := res.GetString(clusterLabel)
+	if err != nil {
+		cluster = env.GetClusterID()
+	}
+	key.Cluster = cluster
+
+	namespace, err := res.GetString(namespaceLabel)
+	if err != nil {
+		return key, err
+	}
+	key.Namespace = namespace
+
+	service, err := res.GetString(serviceLabel)
+	if err != nil {
+		return key, err
+	}
+	key.Service = service
+
+	return key, nil
+}
+
+type nodeKey struct {
+	Cluster string
+	Node    string
+}
+
+func (k nodeKey) String() string {
+	return fmt.Sprintf("%s/%s", k.Cluster, k.Node)
+}
+
+func newNodeKey(cluster, node string) nodeKey {
+	return nodeKey{
+		Cluster: cluster,
+		Node:    node,
+	}
+}
+
+func resultNodeKey(res *prom.QueryResult, clusterLabel, nodeLabel string) (nodeKey, error) {
+	key := nodeKey{}
+
+	cluster, err := res.GetString(clusterLabel)
+	if err != nil {
+		cluster = env.GetClusterID()
+	}
+	key.Cluster = cluster
+
+	node, err := res.GetString(nodeLabel)
+	if err != nil {
+		return key, err
+	}
+	key.Node = node
+
+	return key, nil
+}
+
+type pvcKey struct {
+	Cluster               string
+	Namespace             string
+	PersistentVolumeClaim string
+}
+
+func (k pvcKey) String() string {
+	return fmt.Sprintf("%s/%s/%s", k.Cluster, k.Namespace, k.PersistentVolumeClaim)
+}
+
+func newPVCKey(cluster, namespace, persistentVolumeClaim string) pvcKey {
+	return pvcKey{
+		Cluster:               cluster,
+		Namespace:             namespace,
+		PersistentVolumeClaim: persistentVolumeClaim,
+	}
+}
+
+func resultPVCKey(res *prom.QueryResult, clusterLabel, namespaceLabel, pvcLabel string) (pvcKey, error) {
+	key := pvcKey{}
+
+	cluster, err := res.GetString(clusterLabel)
+	if err != nil {
+		cluster = env.GetClusterID()
+	}
+	key.Cluster = cluster
+
+	namespace, err := res.GetString(namespaceLabel)
+	if err != nil {
+		return key, err
+	}
+	key.Namespace = namespace
+
+	pvc, err := res.GetString(pvcLabel)
+	if err != nil {
+		return key, err
+	}
+	key.PersistentVolumeClaim = pvc
+
+	return key, nil
+}
+
+type pvKey struct {
+	Cluster          string
+	PersistentVolume string
+}
+
+func (k pvKey) String() string {
+	return fmt.Sprintf("%s/%s", k.Cluster, k.PersistentVolume)
+}
+
+func newPVKey(cluster, persistentVolume string) pvKey {
+	return pvKey{
+		Cluster:          cluster,
+		PersistentVolume: persistentVolume,
+	}
+}
+
+func resultPVKey(res *prom.QueryResult, clusterLabel, persistentVolumeLabel string) (pvKey, error) {
+	key := pvKey{}
+
+	cluster, err := res.GetString(clusterLabel)
+	if err != nil {
+		cluster = env.GetClusterID()
+	}
+	key.Cluster = cluster
+
+	persistentVolume, err := res.GetString(persistentVolumeLabel)
+	if err != nil {
+		return key, err
+	}
+	key.PersistentVolume = persistentVolume
+
+	return key, nil
+}

+ 20 - 17
pkg/costmodel/cluster.go

@@ -375,23 +375,26 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, o
 }
 
 type Node struct {
-	Cluster      string
-	Name         string
-	ProviderID   string
-	NodeType     string
-	CPUCost      float64
-	CPUCores     float64
-	GPUCost      float64
-	RAMCost      float64
-	RAMBytes     float64
-	Discount     float64
-	Preemptible  bool
-	CPUBreakdown *ClusterCostsBreakdown
-	RAMBreakdown *ClusterCostsBreakdown
-	Start        time.Time
-	End          time.Time
-	Minutes      float64
-	Labels       map[string]string
+	Cluster         string
+	Name            string
+	ProviderID      string
+	NodeType        string
+	CPUCost         float64
+	CPUCores        float64
+	GPUCost         float64
+	RAMCost         float64
+	RAMBytes        float64
+	Discount        float64
+	Preemptible     bool
+	CPUBreakdown    *ClusterCostsBreakdown
+	RAMBreakdown    *ClusterCostsBreakdown
+	Start           time.Time
+	End             time.Time
+	Minutes         float64
+	Labels          map[string]string
+	CostPerCPUHr    float64
+	CostPerRAMGiBHr float64
+	CostPerGPUHr    float64
 }
 
 // GKE lies about the number of cores e2 nodes have. This table

+ 50 - 14
pkg/costmodel/costmodel.go

@@ -16,6 +16,7 @@ import (
 	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/prom"
 	"github.com/kubecost/cost-model/pkg/util"
+	prometheus "github.com/prometheus/client_golang/api"
 	prometheusClient "github.com/prometheus/client_golang/api"
 	v1 "k8s.io/api/core/v1"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -45,26 +46,65 @@ const (
 // isCron matches a CronJob name and captures the non-timestamp name
 var isCron = regexp.MustCompile(`^(.+)-\d{10}$`)
 
+// TODO niko/cdmr both Thanos and Prometheus, or just one?
 type CostModel struct {
-	Cache           clustercache.ClusterCache
-	ClusterMap      clusters.ClusterMap
-	ScrapeInterval  time.Duration
-	RequestGroup    *singleflight.Group
-	pricingMetadata *costAnalyzerCloud.PricingMatchMetadata
+	Cache            clustercache.ClusterCache
+	ClusterMap       clusters.ClusterMap
+	RequestGroup     *singleflight.Group
+	ScrapeInterval   time.Duration
+	PrometheusClient prometheus.Client
+	Provider         costAnalyzerCloud.Provider
+	pricingMetadata  *costAnalyzerCloud.PricingMatchMetadata
 }
 
-func NewCostModel(cache clustercache.ClusterCache, clusterMap clusters.ClusterMap, scrapeInterval time.Duration) *CostModel {
+func NewCostModel(client prometheus.Client, provider costAnalyzerCloud.Provider, cache clustercache.ClusterCache, clusterMap clusters.ClusterMap, scrapeInterval time.Duration) *CostModel {
 	// request grouping to prevent over-requesting the same data prior to caching
 	requestGroup := new(singleflight.Group)
 
 	return &CostModel{
-		Cache:          cache,
-		ClusterMap:     clusterMap,
-		RequestGroup:   requestGroup,
-		ScrapeInterval: scrapeInterval,
+		Cache:            cache,
+		ClusterMap:       clusterMap,
+		PrometheusClient: client,
+		Provider:         provider,
+		RequestGroup:     requestGroup,
+		ScrapeInterval:   scrapeInterval,
 	}
 }
 
+// TODO niko/cdmr
+type ContainerAllocation struct {
+	Properties      ContainerProperties          `json:"properties"`
+	RAMReq          []*util.Vector               `json:"ramreq,omitempty"`
+	RAMUsed         []*util.Vector               `json:"ramused,omitempty"`
+	RAMAllocation   []*util.Vector               `json:"ramallocated,omitempty"`
+	CPUReq          []*util.Vector               `json:"cpureq,omitempty"`
+	CPUUsed         []*util.Vector               `json:"cpuused,omitempty"`
+	CPUAllocation   []*util.Vector               `json:"cpuallocated,omitempty"`
+	GPUReq          []*util.Vector               `json:"gpureq,omitempty"`
+	PVCData         []*PersistentVolumeClaimData `json:"pvcData,omitempty"`
+	NetworkData     []*util.Vector               `json:"network,omitempty"`
+	Annotations     map[string]string            `json:"annotations,omitempty"`
+	Labels          map[string]string            `json:"labels,omitempty"`
+	NamespaceLabels map[string]string            `json:"namespaceLabels,omitempty"`
+	ClusterID       string                       `json:"clusterId"`
+	ClusterName     string                       `json:"clusterName"`
+}
+
+// TODO niko/cdmr
+type ContainerProperties struct {
+	Container      string            `json:"container"`
+	Pod            string            `json:"pod"`
+	Namespace      string            `json:"namespace"`
+	Node           string            `json:"node"`
+	ClusterID      string            `json:"clusterID"`
+	Cluster        string            `json:"cluster"`
+	Controller     string            `json:"controller"`
+	ControllerKind string            `json:"controllerKind"`
+	Services       []string          `json:"services"`
+	Labels         map[string]string `json:"labels"`
+	Annotations    map[string]string `json:"annotations"`
+}
+
 type CostData struct {
 	Name            string                       `json:"name,omitempty"`
 	PodName         string                       `json:"podName,omitempty"`
@@ -1504,10 +1544,6 @@ func requestKeyFor(window kubecost.Window, resolution time.Duration, filterNames
 	return fmt.Sprintf("%s,%s,%s,%s,%s,%t", startKey, endKey, resolution.String(), filterNamespace, filterCluster, remoteEnabled)
 }
 
-// func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, cp costAnalyzerCloud.Provider,
-// 	startString, endString, windowString string, resolutionHours float64, filterNamespace string,
-// 	filterCluster string, remoteEnabled bool, offset string) (map[string]*CostData, error)
-
 // ComputeCostDataRange executes a range query for cost data.
 // Note that "offset" represents the time between the function call and "endString", and is also passed for convenience
 func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, cp costAnalyzerCloud.Provider, window kubecost.Window, resolution time.Duration, filterNamespace string, filterCluster string, remoteEnabled bool) (map[string]*CostData, error) {

+ 8 - 1
pkg/costmodel/router.go

@@ -28,6 +28,7 @@ import (
 	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/prom"
 	"github.com/kubecost/cost-model/pkg/thanos"
+	prometheus "github.com/prometheus/client_golang/api"
 	prometheusClient "github.com/prometheus/client_golang/api"
 	prometheusAPI "github.com/prometheus/client_golang/api/prometheus/v1"
 	v1 "k8s.io/api/core/v1"
@@ -1013,7 +1014,13 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) *Accesses {
 		30 * day: maxCacheMinutes30d * time.Minute,
 	}
 
-	costModel := NewCostModel(k8sCache, clusterMap, scrapeInterval)
+	var pc prometheus.Client
+	if thanosClient != nil {
+		pc = thanosClient
+	} else {
+		pc = promCli
+	}
+	costModel := NewCostModel(pc, cloudProvider, k8sCache, clusterMap, scrapeInterval)
 	metricsEmitter := NewCostModelMetricsEmitter(promCli, k8sCache, cloudProvider, costModel)
 
 	a := &Accesses{

+ 187 - 116
pkg/kubecost/allocation.go

@@ -1,6 +1,7 @@
 package kubecost
 
 import (
+	"bytes"
 	"encoding/json"
 	"fmt"
 	"sort"
@@ -42,28 +43,32 @@ const ShareNone = "__none__"
 // Allocation is a unit of resource allocation and cost for a given window
 // of time and for a given kubernetes construct with its associated set of
 // properties.
+// TODO niko/cdmr compute efficiency on the fly?
 type Allocation struct {
-	Name            string     `json:"name"`
-	Properties      Properties `json:"properties,omitempty"`
-	Start           time.Time  `json:"start"`
-	End             time.Time  `json:"end"`
-	Minutes         float64    `json:"minutes"`
-	ActiveStart     time.Time  `json:"-"`
-	CPUCoreHours    float64    `json:"cpuCoreHours"`
-	CPUCost         float64    `json:"cpuCost"`
-	CPUEfficiency   float64    `json:"cpuEfficiency"`
-	GPUHours        float64    `json:"gpuHours"`
-	GPUCost         float64    `json:"gpuCost"`
-	NetworkCost     float64    `json:"networkCost"`
-	PVByteHours     float64    `json:"pvByteHours"`
-	PVCost          float64    `json:"pvCost"`
-	RAMByteHours    float64    `json:"ramByteHours"`
-	RAMCost         float64    `json:"ramCost"`
-	RAMEfficiency   float64    `json:"ramEfficiency"`
-	SharedCost      float64    `json:"sharedCost"`
-	ExternalCost    float64    `json:"externalCost"`
-	TotalCost       float64    `json:"totalCost"`
-	TotalEfficiency float64    `json:"totalEfficiency"`
+	Name                   string     `json:"name"`
+	Properties             Properties `json:"properties,omitempty"`
+	Window                 Window     `json:"window"`
+	Start                  time.Time  `json:"start"`
+	End                    time.Time  `json:"end"`
+	CPUCoreHours           float64    `json:"cpuCoreHours"`
+	CPUCoreRequestAverage  float64    `json:"cpuCoreRequestAverage"`
+	CPUCoreUsageAverage    float64    `json:"cpuCoreUsageAverage"`
+	CPUCost                float64    `json:"cpuCost"`
+	CPUEfficiency          float64    `json:"cpuEfficiency"`
+	GPUHours               float64    `json:"gpuHours"`
+	GPUCost                float64    `json:"gpuCost"`
+	NetworkCost            float64    `json:"networkCost"`
+	PVByteHours            float64    `json:"pvByteHours"`
+	PVCost                 float64    `json:"pvCost"`
+	RAMByteHours           float64    `json:"ramByteHours"`
+	RAMBytesRequestAverage float64    `json:"ramBytesRequestAverage"`
+	RAMBytesUsageAverage   float64    `json:"ramBytesUsageAverage"`
+	RAMCost                float64    `json:"ramCost"`
+	RAMEfficiency          float64    `json:"ramEfficiency"`
+	SharedCost             float64    `json:"sharedCost"`
+	ExternalCost           float64    `json:"externalCost"`
+	TotalCost              float64    `json:"totalCost"`
+	TotalEfficiency        float64    `json:"totalEfficiency"`
 }
 
 // AllocationMatchFunc is a function that can be used to match Allocations by
@@ -78,12 +83,13 @@ func (a *Allocation) Add(that *Allocation) (*Allocation, error) {
 		return that.Clone(), nil
 	}
 
-	if !a.Start.Equal(that.Start) || !a.End.Equal(that.End) {
-		return nil, fmt.Errorf("error adding Allocations: mismatched windows")
+	if that == nil {
+		return a.Clone(), nil
 	}
 
+	// Note: no need to clone "that", as add only mutates the receiver
 	agg := a.Clone()
-	agg.add(that, false, false)
+	agg.add(that)
 
 	return agg, nil
 }
@@ -97,10 +103,9 @@ func (a *Allocation) Clone() *Allocation {
 	return &Allocation{
 		Name:            a.Name,
 		Properties:      a.Properties.Clone(),
+		Window:          a.Window.Clone(),
 		Start:           a.Start,
 		End:             a.End,
-		Minutes:         a.Minutes,
-		ActiveStart:     a.ActiveStart,
 		CPUCoreHours:    a.CPUCoreHours,
 		CPUCost:         a.CPUCost,
 		CPUEfficiency:   a.CPUEfficiency,
@@ -129,16 +134,16 @@ func (a *Allocation) Equal(that *Allocation) bool {
 	if a.Name != that.Name {
 		return false
 	}
-	if !a.Start.Equal(that.Start) {
+	if !a.Properties.Equal(&that.Properties) {
 		return false
 	}
-	if !a.End.Equal(that.End) {
+	if !a.Window.Equal(that.Window) {
 		return false
 	}
-	if a.Minutes != that.Minutes {
+	if !a.Start.Equal(that.Start) {
 		return false
 	}
-	if !a.ActiveStart.Equal(that.ActiveStart) {
+	if !a.End.Equal(that.End) {
 		return false
 	}
 	if a.CPUCoreHours != that.CPUCoreHours {
@@ -186,13 +191,71 @@ func (a *Allocation) Equal(that *Allocation) bool {
 	if a.TotalEfficiency != that.TotalEfficiency {
 		return false
 	}
-	if !a.Properties.Equal(&that.Properties) {
-		return false
-	}
 
 	return true
 }
 
+// CPUCores converts the Allocation's CPUCoreHours into average CPUCores
+func (a *Allocation) CPUCores() float64 {
+	if a.Minutes() <= 0.0 {
+		return 0.0
+	}
+	return a.CPUCoreHours / (a.Minutes() / 60.0)
+}
+
+// RAMBytes converts the Allocation's RAMByteHours into average RAMBytes
+func (a *Allocation) RAMBytes() float64 {
+	if a.Minutes() <= 0.0 {
+		return 0.0
+	}
+	return a.RAMByteHours / (a.Minutes() / 60.0)
+}
+
+// PVBytes converts the Allocation's PVByteHours into average PVBytes
+func (a *Allocation) PVBytes() float64 {
+	if a.Minutes() <= 0.0 {
+		return 0.0
+	}
+	return a.PVByteHours / (a.Minutes() / 60.0)
+}
+
+// MarshalJSON implements json.Marshal interface
+func (a *Allocation) MarshalJSON() ([]byte, error) {
+	buffer := bytes.NewBufferString("{")
+	jsonEncodeString(buffer, "name", a.Name, ",")
+	jsonEncode(buffer, "properties", a.Properties, ",")
+	jsonEncode(buffer, "window", a.Window, ",")
+	jsonEncodeString(buffer, "start", a.Start.Format(timeFmt), ",")
+	jsonEncodeString(buffer, "end", a.End.Format(timeFmt), ",")
+	jsonEncodeFloat64(buffer, "minutes", a.Minutes(), ",")
+	jsonEncodeFloat64(buffer, "cpuCores", a.CPUCores(), ",")
+	jsonEncodeFloat64(buffer, "cpuCoreRequestAverage", a.CPUCoreRequestAverage, ",")
+	jsonEncodeFloat64(buffer, "cpuCoreUsageAverage", a.CPUCoreUsageAverage, ",")
+	jsonEncodeFloat64(buffer, "cpuCoreHours", a.CPUCoreHours, ",")
+	jsonEncodeFloat64(buffer, "cpuCost", a.CPUCost, ",")
+	jsonEncodeFloat64(buffer, "cpuEfficiency", a.CPUEfficiency, ",")
+	jsonEncodeFloat64(buffer, "gpuHours", a.GPUHours, ",")
+	jsonEncodeFloat64(buffer, "gpuCost", a.GPUCost, ",")
+	jsonEncodeFloat64(buffer, "networkCost", a.NetworkCost, ",")
+	jsonEncodeFloat64(buffer, "pvBytes", a.PVBytes(), ",")
+	jsonEncodeFloat64(buffer, "pvByteHours", a.PVByteHours, ",")
+	jsonEncodeFloat64(buffer, "pvCost", a.PVCost, ",")
+	jsonEncodeFloat64(buffer, "ramBytes", a.RAMBytes(), ",")
+	jsonEncodeFloat64(buffer, "ramByteRequestAverage", a.RAMBytesRequestAverage, ",")
+	jsonEncodeFloat64(buffer, "ramByteUsageAverage", a.RAMBytesUsageAverage, ",")
+	jsonEncodeFloat64(buffer, "ramByteHours", a.RAMByteHours, ",")
+	jsonEncodeFloat64(buffer, "ramCost", a.RAMCost, ",")
+	jsonEncodeFloat64(buffer, "ramEfficiency", a.RAMEfficiency, ",")
+	jsonEncodeFloat64(buffer, "sharedCost", a.SharedCost, ",")
+	jsonEncodeFloat64(buffer, "totalCost", a.TotalCost, ",")
+	jsonEncodeFloat64(buffer, "totalEfficiency", a.TotalEfficiency, "")
+	buffer.WriteString("}")
+	return buffer.Bytes(), nil
+}
+
+// TODO niko/cdmr
+// func (a *Allocation)UnmarshalJSON()
+
 // Resolution returns the duration of time covered by the Allocation
 func (a *Allocation) Resolution() time.Duration {
 	return a.End.Sub(a.Start)
@@ -219,22 +282,44 @@ func (a *Allocation) IsUnallocated() bool {
 	return strings.Contains(a.Name, UnallocatedSuffix)
 }
 
+// Minutes returns the number of minutes the Allocation represents, as defined
+// by the difference between the end and start times.
+func (a *Allocation) Minutes() float64 {
+	return a.End.Sub(a.Start).Minutes()
+}
+
 // Share works like Add, but converts the entire cost of the given Allocation
 // to SharedCost, rather than adding to the individual resource costs.
+// TODO niko/cdmr unit test changes!!!
 func (a *Allocation) Share(that *Allocation) (*Allocation, error) {
-	if a == nil {
-		return that.Clone(), nil
-	}
+	if that == nil {
+		return a.Clone(), nil
+	}
+
+	// Convert all costs of shared Allocation to SharedCost, zero out all
+	// non-shared costs, then add.
+	share := that.Clone()
+	share.SharedCost += share.TotalCost
+	share.TotalEfficiency = 1.0
+	share.CPUCost = 0
+	share.CPUCoreHours = 0
+	share.CPUEfficiency = 0
+	share.RAMCost = 0
+	share.RAMByteHours = 0
+	share.RAMEfficiency = 0
+	share.GPUCost = 0
+	share.GPUHours = 0
+	share.PVCost = 0
+	share.PVByteHours = 0
+	share.NetworkCost = 0
+	share.ExternalCost = 0
 
-	if !a.Start.Equal(that.Start) {
-		return nil, fmt.Errorf("mismatched start time: expected %s, received %s", a.Start, that.Start)
-	}
-	if !a.End.Equal(that.End) {
-		return nil, fmt.Errorf("mismatched start time: expected %s, received %s", a.End, that.End)
+	if a == nil {
+		return share, nil
 	}
 
 	agg := a.Clone()
-	agg.add(that, true, false)
+	agg.add(that)
 
 	return agg, nil
 }
@@ -244,7 +329,7 @@ func (a *Allocation) String() string {
 	return fmt.Sprintf("%s%s=%.2f", a.Name, NewWindow(&a.Start, &a.End), a.TotalCost)
 }
 
-func (a *Allocation) add(that *Allocation, isShared, isAccumulating bool) {
+func (a *Allocation) add(that *Allocation) {
 	if a == nil {
 		log.Warningf("Allocation.AggregateBy: trying to add a nil receiver")
 		return
@@ -271,65 +356,64 @@ func (a *Allocation) add(that *Allocation, isShared, isAccumulating bool) {
 		}
 	}
 
-	if that.ActiveStart.Before(a.ActiveStart) {
-		a.ActiveStart = that.ActiveStart
+	// Expand Window, Start, and End to be the "max" of each between the two
+	// given Allocations.
+	a.Window = a.Window.Expand(that.Window)
+
+	if that.Start.Before(a.Start) {
+		a.Start = that.Start
 	}
 
-	if isAccumulating {
-		if a.Start.After(that.Start) {
-			a.Start = that.Start
-		}
+	if that.End.After(a.End) {
+		a.End = that.End
+	}
 
-		if a.End.Before(that.End) {
-			a.End = that.End
-		}
+	// Note: efficiency numbers are computed the cost-weighted sum of each
+	// Allocation's efficiency.
+	// e.g. ($10 @ 25%) + ($10 @ 75%)  = (2.5+7.5)/20   =  50%
+	// e.g. ($90 @ 10%) + ($10 @ 100%) = (9.0+10.0)/100 =  19%
+	// e.g. ($100 @ 0%) + ($100 @ 0%)  = (0.0+0.0)/200  =   0%
+	// e.g. ($10 @ 150%) + ($10 @ 50%) = (15.0+5.0)/20  = 100%
+	// e.g. ($0 @ 100%) + ($0 @ 50%)                    =   0% (no div by 0)
 
-		a.Minutes += that.Minutes
-	} else if that.Minutes > a.Minutes {
-		a.Minutes = that.Minutes
+	// Compute CPU efficiency (see note above for methodology)
+	aggCPUCost := a.CPUCost + that.CPUCost
+	if aggCPUCost > 0 {
+		a.CPUEfficiency = (a.CPUEfficiency*a.CPUCost + that.CPUEfficiency*that.CPUCost) / aggCPUCost
+	} else {
+		a.CPUEfficiency = 0.0
 	}
 
-	// isShared determines whether the given allocation should be spread evenly
-	// across resources (e.g. sharing idle allocation) or lumped into a shared
-	// cost category (e.g. sharing namespace, labels).
-	if isShared {
-		a.SharedCost += that.TotalCost
+	// Compute RAM efficiency (see note above for methodology)
+	aggRAMCost := a.RAMCost + that.RAMCost
+	if aggRAMCost > 0 {
+		a.RAMEfficiency = (a.RAMEfficiency*a.RAMCost + that.RAMEfficiency*that.RAMCost) / aggRAMCost
 	} else {
-		a.CPUCoreHours += that.CPUCoreHours
-		a.GPUHours += that.GPUHours
-		a.RAMByteHours += that.RAMByteHours
-		a.PVByteHours += that.PVByteHours
-
-		aggCPUCost := a.CPUCost + that.CPUCost
-		if aggCPUCost > 0 {
-			a.CPUEfficiency = (a.CPUEfficiency*a.CPUCost + that.CPUEfficiency*that.CPUCost) / aggCPUCost
-		} else {
-			a.CPUEfficiency = 0.0
-		}
-
-		aggRAMCost := a.RAMCost + that.RAMCost
-		if aggRAMCost > 0 {
-			a.RAMEfficiency = (a.RAMEfficiency*a.RAMCost + that.RAMEfficiency*that.RAMCost) / aggRAMCost
-		} else {
-			a.RAMEfficiency = 0.0
-		}
-
-		aggTotalCost := a.TotalCost + that.TotalCost
-		if aggTotalCost > 0 {
-			a.TotalEfficiency = (a.TotalEfficiency*a.TotalCost + that.TotalEfficiency*that.TotalCost) / aggTotalCost
-		} else {
-			aggTotalCost = 0.0
-		}
-
-		a.SharedCost += that.SharedCost
-		a.ExternalCost += that.ExternalCost
-		a.CPUCost += that.CPUCost
-		a.GPUCost += that.GPUCost
-		a.NetworkCost += that.NetworkCost
-		a.RAMCost += that.RAMCost
-		a.PVCost += that.PVCost
+		a.RAMEfficiency = 0.0
 	}
 
+	// Compute total efficiency (see note above for methodology)
+	aggTotalCost := a.TotalCost + that.TotalCost
+	if aggTotalCost > 0 {
+		a.TotalEfficiency = (a.TotalEfficiency*a.TotalCost + that.TotalEfficiency*that.TotalCost) / aggTotalCost
+	} else {
+		aggTotalCost = 0.0
+	}
+
+	// Sum all cumulative resource fields
+	a.CPUCoreHours += that.CPUCoreHours
+	a.GPUHours += that.GPUHours
+	a.RAMByteHours += that.RAMByteHours
+	a.PVByteHours += that.PVByteHours
+
+	// Sum all cumulative cost fields
+	a.CPUCost += that.CPUCost
+	a.GPUCost += that.GPUCost
+	a.RAMCost += that.RAMCost
+	a.PVCost += that.PVCost
+	a.NetworkCost += that.NetworkCost
+	a.SharedCost += that.SharedCost
+	a.ExternalCost += that.ExternalCost
 	a.TotalCost += that.TotalCost
 }
 
@@ -1214,10 +1298,10 @@ func (as *AllocationSet) ComputeIdleAllocations(assetSet *AssetSet) (map[string]
 
 		idleAlloc := &Allocation{
 			Name:       fmt.Sprintf("%s/%s", cluster, IdleSuffix),
+			Window:     window.Clone(),
 			Properties: Properties{ClusterProp: cluster},
 			Start:      start,
 			End:        end,
-			Minutes:    end.Sub(start).Minutes(), // TODO deprecate w/ niko/allocation-minutes
 			CPUCost:    resources["cpu"],
 			GPUCost:    resources["gpu"],
 			RAMCost:    resources["ram"],
@@ -1348,10 +1432,10 @@ func (as *AllocationSet) IdleAllocations() map[string]*Allocation {
 // but only if the Allocation is valid, i.e. matches the AllocationSet's window. If
 // there is no existing entry, one is created. Nil error response indicates success.
 func (as *AllocationSet) Insert(that *Allocation) error {
-	return as.insert(that, false)
+	return as.insert(that)
 }
 
-func (as *AllocationSet) insert(that *Allocation, accumulate bool) error {
+func (as *AllocationSet) insert(that *Allocation) error {
 	if as == nil {
 		return fmt.Errorf("cannot insert into nil AllocationSet")
 	}
@@ -1376,7 +1460,7 @@ func (as *AllocationSet) insert(that *Allocation, accumulate bool) error {
 	if _, ok := as.allocations[that.Name]; !ok {
 		as.allocations[that.Name] = that
 	} else {
-		as.allocations[that.Name].add(that, false, accumulate)
+		as.allocations[that.Name].add(that)
 	}
 
 	// If the given Allocation is an external one, record that
@@ -1518,11 +1602,10 @@ func (as *AllocationSet) accumulate(that *AllocationSet) (*AllocationSet, error)
 		return as, nil
 	}
 
-	if that.Start().Before(as.End()) {
-		timefmt := "2006-01-02T15:04:05"
-		err := fmt.Sprintf("that [%s, %s); that [%s, %s)\n", as.Start().Format(timefmt), as.End().Format(timefmt), that.Start().Format(timefmt), that.End().Format(timefmt))
-		return nil, fmt.Errorf("error accumulating AllocationSets: overlapping windows: %s", err)
-	}
+	// TODO niko/cdmr implement first
+	// if that.Window.Overlaps(as.Window) {
+	// 	return nil, fmt.Errorf("AllocationSet.accumulate: overlapping windows: %s", that.Window, as.Window)
+	// }
 
 	// Set start, end to min(start), max(end)
 	start := as.Start()
@@ -1543,26 +1626,14 @@ func (as *AllocationSet) accumulate(that *AllocationSet) (*AllocationSet, error)
 	defer that.RUnlock()
 
 	for _, alloc := range as.allocations {
-		// Change Start and End to match the new window. However, do not
-		// change Minutes because that will be accounted for during the
-		// insert step, if in fact there are two allocations to add.
-		alloc.Start = start
-		alloc.End = end
-
-		err := acc.insert(alloc, true)
+		err := acc.insert(alloc)
 		if err != nil {
 			return nil, err
 		}
 	}
 
 	for _, alloc := range that.allocations {
-		// Change Start and End to match the new window. However, do not
-		// change Minutes because that will be accounted for during the
-		// insert step, if in fact there are two allocations to add.
-		alloc.Start = start
-		alloc.End = end
-
-		err := acc.insert(alloc, true)
+		err := acc.insert(alloc)
 		if err != nil {
 			return nil, err
 		}

+ 5 - 5
pkg/kubecost/allocation_test.go

@@ -34,9 +34,9 @@ func NewUnitAllocation(name string, start time.Time, resolution time.Duration, p
 	alloc := &Allocation{
 		Name:            name,
 		Properties:      *properties,
+		Window:          NewWindow(&start, &end).Clone(),
 		Start:           start,
 		End:             end,
-		Minutes:         1440,
 		CPUCoreHours:    1,
 		CPUCost:         1,
 		CPUEfficiency:   1,
@@ -305,8 +305,8 @@ func assertAllocationWindow(t *testing.T, as *AllocationSet, msg string, expStar
 		if !a.End.Equal(expEnd) {
 			t.Fatalf("AllocationSet.AggregateBy[%s]: expected end %s, actual %s", msg, expEnd, a.End)
 		}
-		if a.Minutes != expMinutes {
-			t.Fatalf("AllocationSet.AggregateBy[%s]: expected minutes %f, actual %f", msg, expMinutes, a.Minutes)
+		if a.Minutes() != expMinutes {
+			t.Fatalf("AllocationSet.AggregateBy[%s]: expected minutes %f, actual %f", msg, expMinutes, a.Minutes())
 		}
 	})
 }
@@ -1260,8 +1260,8 @@ func TestAllocationSetRange_Accumulate(t *testing.T) {
 	if !alloc.End.Equal(tomorrow) {
 		t.Fatalf("accumulating AllocationSetRange: expected to end %s; actual %s", tomorrow, alloc.End)
 	}
-	if alloc.Minutes != 2880.0 {
-		t.Fatalf("accumulating AllocationSetRange: expected %f minutes; actual %f", 2880.0, alloc.Minutes)
+	if alloc.Minutes() != 2880.0 {
+		t.Fatalf("accumulating AllocationSetRange: expected %f minutes; actual %f", 2880.0, alloc.Minutes())
 	}
 }
 

+ 13 - 34
pkg/kubecost/asset.go

@@ -5,7 +5,6 @@ import (
 	"encoding"
 	"encoding/json"
 	"fmt"
-	"math"
 	"strings"
 	"sync"
 	"time"
@@ -43,10 +42,10 @@ type Asset interface {
 	// Temporal values
 	Start() time.Time
 	End() time.Time
-	Minutes() float64
+	SetStartEnd(time.Time, time.Time)
 	Window() Window
 	ExpandWindow(Window)
-	SetStartEnd(time.Time, time.Time)
+	Minutes() float64
 
 	// Operations and comparisons
 	Add(Asset) Asset
@@ -188,6 +187,9 @@ func AssetToExternalAllocation(asset Asset, aggregateBy []string, allocationProp
 	return &Allocation{
 		Name:         strings.Join(names, "/"),
 		Properties:   props,
+		Window:       asset.Window().Clone(),
+		Start:        asset.Start(),
+		End:          asset.End(),
 		ExternalCost: asset.TotalCost(),
 		TotalCost:    asset.TotalCost(),
 	}, nil
@@ -2695,15 +2697,17 @@ func (as *AssetSet) Get(key string) (Asset, bool) {
 // configured properties to determine the key under which the Asset will
 // be inserted.
 func (as *AssetSet) Insert(asset Asset) error {
-	if as.IsEmpty() {
-		as.Lock()
-		as.assets = map[string]Asset{}
-		as.Unlock()
+	if as == nil {
+		return fmt.Errorf("cannot Insert into nil AssetSet")
 	}
 
 	as.Lock()
 	defer as.Unlock()
 
+	if as.assets == nil {
+		as.assets = map[string]Asset{}
+	}
+
 	// Determine key into which to Insert the Asset.
 	k, err := key(asset, as.aggregateBy)
 	if err != nil {
@@ -2826,9 +2830,11 @@ func (as *AssetSet) accumulate(that *AssetSet) (*AssetSet, error) {
 	// Set start, end to min(start), max(end)
 	start := as.Start()
 	end := as.End()
+
 	if that.Start().Before(start) {
 		start = that.Start()
 	}
+
 	if that.End().After(end) {
 		end = that.End()
 	}
@@ -2986,33 +2992,6 @@ func (asr *AssetSetRange) Window() Window {
 	return NewWindow(&start, &end)
 }
 
-// TODO move everything below to a separate package
-
-func jsonEncodeFloat64(buffer *bytes.Buffer, name string, val float64, comma string) {
-	var encoding string
-	if math.IsNaN(val) {
-		encoding = fmt.Sprintf("\"%s\":null%s", name, comma)
-	} else {
-		encoding = fmt.Sprintf("\"%s\":%f%s", name, val, comma)
-	}
-
-	buffer.WriteString(encoding)
-}
-
-func jsonEncodeString(buffer *bytes.Buffer, name, val, comma string) {
-	buffer.WriteString(fmt.Sprintf("\"%s\":\"%s\"%s", name, val, comma))
-}
-
-func jsonEncode(buffer *bytes.Buffer, name string, obj interface{}, comma string) {
-	buffer.WriteString(fmt.Sprintf("\"%s\":", name))
-	if bytes, err := json.Marshal(obj); err != nil {
-		buffer.WriteString("null")
-	} else {
-		buffer.Write(bytes)
-	}
-	buffer.WriteString(comma)
-}
-
 // Returns true if string slices a and b contain all of the same strings, in any order.
 func sameContents(a, b []string) bool {
 	if len(a) != len(b) {

+ 2 - 1
pkg/kubecost/asset_test.go

@@ -153,7 +153,7 @@ func assertAssetSet(t *testing.T, as *AssetSet, msg string, window Window, exps
 				t.Fatalf("AssetSet.AggregateBy[%s]: key %s expected total cost %.2f, actual %.2f", msg, key, exp, a.TotalCost())
 			}
 			if !a.Window().Equal(window) {
-				t.Fatalf("AssetSet.AggregateBy[%s]: key %s expected window %s, actual %s", msg, key, window, as.Window)
+				t.Fatalf("AssetSet.AggregateBy[%s]: key %s expected window %s, actual %s", msg, key, window, a.Window())
 			}
 		} else {
 			t.Fatalf("AssetSet.AggregateBy[%s]: unexpected asset: %s", msg, key)
@@ -1010,6 +1010,7 @@ func TestAssetSetRange_Accumulate(t *testing.T) {
 		generateAssetSet(startD1),
 		generateAssetSet(startD2),
 	)
+
 	err = asr.AggregateBy([]string{string(AssetTypeProp)}, nil)
 	as, err = asr.Accumulate()
 	if err != nil {

+ 1 - 1
pkg/kubecost/bingen.go

@@ -21,4 +21,4 @@ package kubecost
 // @bingen:generate:AllocationSet
 // @bingen:generate:AllocationSetRange
 
-//go:generate bingen -package=kubecost -version=5 -buffer=github.com/kubecost/cost-model/pkg/util
+//go:generate bingen -package=kubecost -version=7 -buffer=github.com/kubecost/cost-model/pkg/util

+ 35 - 0
pkg/kubecost/json.go

@@ -0,0 +1,35 @@
+package kubecost
+
+import (
+	"bytes"
+	"encoding/json"
+	"fmt"
+	"math"
+)
+
+// TODO move everything below to a separate package
+
+func jsonEncodeFloat64(buffer *bytes.Buffer, name string, val float64, comma string) {
+	var encoding string
+	if math.IsNaN(val) {
+		encoding = fmt.Sprintf("\"%s\":null%s", name, comma)
+	} else {
+		encoding = fmt.Sprintf("\"%s\":%f%s", name, val, comma)
+	}
+
+	buffer.WriteString(encoding)
+}
+
+func jsonEncodeString(buffer *bytes.Buffer, name, val, comma string) {
+	buffer.WriteString(fmt.Sprintf("\"%s\":\"%s\"%s", name, val, comma))
+}
+
+func jsonEncode(buffer *bytes.Buffer, name string, obj interface{}, comma string) {
+	buffer.WriteString(fmt.Sprintf("\"%s\":", name))
+	if bytes, err := json.Marshal(obj); err != nil {
+		buffer.WriteString("null")
+	} else {
+		buffer.Write(bytes)
+	}
+	buffer.WriteString(comma)
+}

+ 129 - 74
pkg/kubecost/kubecost_codecs.go

@@ -25,7 +25,7 @@ const (
 	GeneratorPackageName string = "kubecost"
 
 	// CodecVersion is the version passed into the generator
-	CodecVersion uint8 = 6
+	CodecVersion uint8 = 7
 )
 
 //--------------------------------------------------------------------------
@@ -125,17 +125,17 @@ func (target *Allocation) MarshalBinary() (data []byte, err error) {
 	buff.WriteBytes(a)
 	// --- [end][write][reference](Properties) ---
 
-	// --- [begin][write][reference](time.Time) ---
-	b, errB := target.Start.MarshalBinary()
+	// --- [begin][write][struct](Window) ---
+	b, errB := target.Window.MarshalBinary()
 	if errB != nil {
 		return nil, errB
 	}
 	buff.WriteInt(len(b))
 	buff.WriteBytes(b)
-	// --- [end][write][reference](time.Time) ---
+	// --- [end][write][struct](Window) ---
 
 	// --- [begin][write][reference](time.Time) ---
-	c, errC := target.End.MarshalBinary()
+	c, errC := target.Start.MarshalBinary()
 	if errC != nil {
 		return nil, errC
 	}
@@ -143,9 +143,8 @@ func (target *Allocation) MarshalBinary() (data []byte, err error) {
 	buff.WriteBytes(c)
 	// --- [end][write][reference](time.Time) ---
 
-	buff.WriteFloat64(target.Minutes) // write float64
 	// --- [begin][write][reference](time.Time) ---
-	d, errD := target.ActiveStart.MarshalBinary()
+	d, errD := target.End.MarshalBinary()
 	if errD != nil {
 		return nil, errD
 	}
@@ -153,20 +152,25 @@ func (target *Allocation) MarshalBinary() (data []byte, err error) {
 	buff.WriteBytes(d)
 	// --- [end][write][reference](time.Time) ---
 
-	buff.WriteFloat64(target.CPUCoreHours)    // write float64
-	buff.WriteFloat64(target.CPUCost)         // write float64
-	buff.WriteFloat64(target.CPUEfficiency)   // write float64
-	buff.WriteFloat64(target.GPUHours)        // write float64
-	buff.WriteFloat64(target.GPUCost)         // write float64
-	buff.WriteFloat64(target.NetworkCost)     // write float64
-	buff.WriteFloat64(target.PVByteHours)     // write float64
-	buff.WriteFloat64(target.PVCost)          // write float64
-	buff.WriteFloat64(target.RAMByteHours)    // write float64
-	buff.WriteFloat64(target.RAMCost)         // write float64
-	buff.WriteFloat64(target.RAMEfficiency)   // write float64
-	buff.WriteFloat64(target.SharedCost)      // write float64
-	buff.WriteFloat64(target.TotalCost)       // write float64
-	buff.WriteFloat64(target.TotalEfficiency) // write float64
+	buff.WriteFloat64(target.CPUCoreHours)           // write float64
+	buff.WriteFloat64(target.CPUCoreRequestAverage)  // write float64
+	buff.WriteFloat64(target.CPUCoreUsageAverage)    // write float64
+	buff.WriteFloat64(target.CPUCost)                // write float64
+	buff.WriteFloat64(target.CPUEfficiency)          // write float64
+	buff.WriteFloat64(target.GPUHours)               // write float64
+	buff.WriteFloat64(target.GPUCost)                // write float64
+	buff.WriteFloat64(target.NetworkCost)            // write float64
+	buff.WriteFloat64(target.PVByteHours)            // write float64
+	buff.WriteFloat64(target.PVCost)                 // write float64
+	buff.WriteFloat64(target.RAMByteHours)           // write float64
+	buff.WriteFloat64(target.RAMBytesRequestAverage) // write float64
+	buff.WriteFloat64(target.RAMBytesUsageAverage)   // write float64
+	buff.WriteFloat64(target.RAMCost)                // write float64
+	buff.WriteFloat64(target.RAMEfficiency)          // write float64
+	buff.WriteFloat64(target.SharedCost)             // write float64
+	buff.WriteFloat64(target.ExternalCost)           // write float64
+	buff.WriteFloat64(target.TotalCost)              // write float64
+	buff.WriteFloat64(target.TotalEfficiency)        // write float64
 	return buff.Bytes(), nil
 }
 
@@ -208,16 +212,16 @@ func (target *Allocation) UnmarshalBinary(data []byte) (err error) {
 	target.Properties = *b
 	// --- [end][read][reference](Properties) ---
 
-	// --- [begin][read][reference](time.Time) ---
-	e := &time.Time{}
+	// --- [begin][read][struct](Window) ---
+	e := &Window{}
 	f := buff.ReadInt()    // byte array length
 	g := buff.ReadBytes(f) // byte array
 	errB := e.UnmarshalBinary(g)
 	if errB != nil {
 		return errB
 	}
-	target.Start = *e
-	// --- [end][read][reference](time.Time) ---
+	target.Window = *e
+	// --- [end][read][struct](Window) ---
 
 	// --- [begin][read][reference](time.Time) ---
 	h := &time.Time{}
@@ -227,64 +231,76 @@ func (target *Allocation) UnmarshalBinary(data []byte) (err error) {
 	if errC != nil {
 		return errC
 	}
-	target.End = *h
+	target.Start = *h
 	// --- [end][read][reference](time.Time) ---
 
-	n := buff.ReadFloat64() // read float64
-	target.Minutes = n
-
 	// --- [begin][read][reference](time.Time) ---
-	o := &time.Time{}
-	p := buff.ReadInt()    // byte array length
-	q := buff.ReadBytes(p) // byte array
-	errD := o.UnmarshalBinary(q)
+	n := &time.Time{}
+	o := buff.ReadInt()    // byte array length
+	p := buff.ReadBytes(o) // byte array
+	errD := n.UnmarshalBinary(p)
 	if errD != nil {
 		return errD
 	}
-	target.ActiveStart = *o
+	target.End = *n
 	// --- [end][read][reference](time.Time) ---
 
+	q := buff.ReadFloat64() // read float64
+	target.CPUCoreHours = q
+
 	r := buff.ReadFloat64() // read float64
-	target.CPUCoreHours = r
+	target.CPUCoreRequestAverage = r
 
 	s := buff.ReadFloat64() // read float64
-	target.CPUCost = s
+	target.CPUCoreUsageAverage = s
 
 	t := buff.ReadFloat64() // read float64
-	target.CPUEfficiency = t
+	target.CPUCost = t
 
 	u := buff.ReadFloat64() // read float64
-	target.GPUHours = u
+	target.CPUEfficiency = u
 
 	w := buff.ReadFloat64() // read float64
-	target.GPUCost = w
+	target.GPUHours = w
 
 	x := buff.ReadFloat64() // read float64
-	target.NetworkCost = x
+	target.GPUCost = x
 
 	y := buff.ReadFloat64() // read float64
-	target.PVByteHours = y
+	target.NetworkCost = y
 
 	z := buff.ReadFloat64() // read float64
-	target.PVCost = z
+	target.PVByteHours = z
 
 	aa := buff.ReadFloat64() // read float64
-	target.RAMByteHours = aa
+	target.PVCost = aa
 
 	bb := buff.ReadFloat64() // read float64
-	target.RAMCost = bb
+	target.RAMByteHours = bb
 
 	cc := buff.ReadFloat64() // read float64
-	target.RAMEfficiency = cc
+	target.RAMBytesRequestAverage = cc
 
 	dd := buff.ReadFloat64() // read float64
-	target.SharedCost = dd
+	target.RAMBytesUsageAverage = dd
 
 	ee := buff.ReadFloat64() // read float64
-	target.TotalCost = ee
+	target.RAMCost = ee
 
 	ff := buff.ReadFloat64() // read float64
-	target.TotalEfficiency = ff
+	target.RAMEfficiency = ff
+
+	gg := buff.ReadFloat64() // read float64
+	target.SharedCost = gg
+
+	hh := buff.ReadFloat64() // read float64
+	target.ExternalCost = hh
+
+	ll := buff.ReadFloat64() // read float64
+	target.TotalCost = ll
+
+	mm := buff.ReadFloat64() // read float64
+	target.TotalEfficiency = mm
 
 	return nil
 }
@@ -340,19 +356,33 @@ func (target *AllocationSet) MarshalBinary() (data []byte, err error) {
 		// --- [end][write][map](map[string]*Allocation) ---
 
 	}
-	if target.idleKeys == nil {
+	if target.externalKeys == nil {
 		buff.WriteUInt8(uint8(0)) // write nil byte
 	} else {
 		buff.WriteUInt8(uint8(1)) // write non-nil byte
 
 		// --- [begin][write][map](map[string]bool) ---
-		buff.WriteInt(len(target.idleKeys)) // map length
-		for kk, vv := range target.idleKeys {
+		buff.WriteInt(len(target.externalKeys)) // map length
+		for kk, vv := range target.externalKeys {
 			buff.WriteString(kk) // write string
 			buff.WriteBool(vv)   // write bool
 		}
 		// --- [end][write][map](map[string]bool) ---
 
+	}
+	if target.idleKeys == nil {
+		buff.WriteUInt8(uint8(0)) // write nil byte
+	} else {
+		buff.WriteUInt8(uint8(1)) // write non-nil byte
+
+		// --- [begin][write][map](map[string]bool) ---
+		buff.WriteInt(len(target.idleKeys)) // map length
+		for kkk, vvv := range target.idleKeys {
+			buff.WriteString(kkk) // write string
+			buff.WriteBool(vvv)   // write bool
+		}
+		// --- [end][write][map](map[string]bool) ---
+
 	}
 	// --- [begin][write][struct](Window) ---
 	b, errB := target.Window.MarshalBinary()
@@ -450,7 +480,7 @@ func (target *AllocationSet) UnmarshalBinary(data []byte) (err error) {
 
 	}
 	if buff.ReadUInt8() == uint8(0) {
-		target.idleKeys = nil
+		target.externalKeys = nil
 	} else {
 		// --- [begin][read][map](map[string]bool) ---
 		g := make(map[string]bool)
@@ -466,35 +496,56 @@ func (target *AllocationSet) UnmarshalBinary(data []byte) (err error) {
 
 			g[kk] = vv
 		}
-		target.idleKeys = g
+		target.externalKeys = g
+		// --- [end][read][map](map[string]bool) ---
+
+	}
+	if buff.ReadUInt8() == uint8(0) {
+		target.idleKeys = nil
+	} else {
+		// --- [begin][read][map](map[string]bool) ---
+		n := make(map[string]bool)
+		o := buff.ReadInt() // map len
+		for ii := 0; ii < o; ii++ {
+			var kkk string
+			p := buff.ReadString() // read string
+			kkk = p
+
+			var vvv bool
+			q := buff.ReadBool() // read bool
+			vvv = q
+
+			n[kkk] = vvv
+		}
+		target.idleKeys = n
 		// --- [end][read][map](map[string]bool) ---
 
 	}
 	// --- [begin][read][struct](Window) ---
-	n := &Window{}
-	o := buff.ReadInt()    // byte array length
-	p := buff.ReadBytes(o) // byte array
-	errB := n.UnmarshalBinary(p)
+	r := &Window{}
+	s := buff.ReadInt()    // byte array length
+	t := buff.ReadBytes(s) // byte array
+	errB := r.UnmarshalBinary(t)
 	if errB != nil {
 		return errB
 	}
-	target.Window = *n
+	target.Window = *r
 	// --- [end][read][struct](Window) ---
 
 	if buff.ReadUInt8() == uint8(0) {
 		target.Warnings = nil
 	} else {
 		// --- [begin][read][slice]([]string) ---
-		r := buff.ReadInt() // array len
-		q := make([]string, r)
-		for ii := 0; ii < r; ii++ {
-			var s string
-			t := buff.ReadString() // read string
-			s = t
+		w := buff.ReadInt() // array len
+		u := make([]string, w)
+		for jj := 0; jj < w; jj++ {
+			var x string
+			y := buff.ReadString() // read string
+			x = y
 
-			q[ii] = s
+			u[jj] = x
 		}
-		target.Warnings = q
+		target.Warnings = u
 		// --- [end][read][slice]([]string) ---
 
 	}
@@ -502,16 +553,16 @@ func (target *AllocationSet) UnmarshalBinary(data []byte) (err error) {
 		target.Errors = nil
 	} else {
 		// --- [begin][read][slice]([]string) ---
-		w := buff.ReadInt() // array len
-		u := make([]string, w)
-		for jj := 0; jj < w; jj++ {
-			var x string
-			y := buff.ReadString() // read string
-			x = y
+		aa := buff.ReadInt() // array len
+		z := make([]string, aa)
+		for iii := 0; iii < aa; iii++ {
+			var bb string
+			cc := buff.ReadString() // read string
+			bb = cc
 
-			u[jj] = x
+			z[iii] = bb
 		}
-		target.Errors = u
+		target.Errors = z
 		// --- [end][read][slice]([]string) ---
 
 	}
@@ -1406,6 +1457,7 @@ func (target *Cloud) MarshalBinary() (data []byte, err error) {
 
 	buff.WriteFloat64(target.adjustment) // write float64
 	buff.WriteFloat64(target.Cost)       // write float64
+	buff.WriteFloat64(target.Credit)     // write float64
 	return buff.Bytes(), nil
 }
 
@@ -1513,6 +1565,9 @@ func (target *Cloud) UnmarshalBinary(data []byte) (err error) {
 	w := buff.ReadFloat64() // read float64
 	target.Cost = w
 
+	x := buff.ReadFloat64() // read float64
+	target.Credit = x
+
 	return nil
 }
 

+ 93 - 1
pkg/kubecost/window.go

@@ -393,7 +393,19 @@ func (w Window) ExpandEnd(end time.Time) Window {
 }
 
 func (w Window) Expand(that Window) Window {
-	return w.ExpandStart(*that.start).ExpandEnd(*that.end)
+	if that.start == nil {
+		w.start = nil
+	} else {
+		w = w.ExpandStart(*that.start)
+	}
+
+	if that.end == nil {
+		w.end = nil
+	} else {
+		w = w.ExpandEnd(*that.end)
+	}
+
+	return w
 }
 
 func (w Window) Hours() float64 {
@@ -432,6 +444,86 @@ func (w Window) Minutes() float64 {
 	return w.end.Sub(*w.start).Minutes()
 }
 
+// Overlaps returns true iff the two given Windows share and amount of temporal
+// coverage.
+// TODO niko/cdmr return to this, with unit tests, and then implement in
+// AllocationSet.accumulate
+func (w Window) Overlaps(x Window) bool {
+	if (w.start == nil && w.end == nil) || (x.start == nil && x.end == nil) {
+		// one window is completely open, so overlap is guaranteed
+		// <---------->
+		//   ?------?
+		return true
+	}
+
+	// Neither window is completely open (nil, nil), but one or the other might
+	// still be future- or past-open.
+
+	if w.start == nil {
+		// w is past-open, future-closed
+		// <------]
+
+		if x.start != nil && !x.start.Before(*w.end) {
+			// x starts after w ends (or eq)
+			// <------]
+			//          [------?
+			return false
+		}
+
+		// <-----]
+		//    ?-----?
+		return true
+	}
+
+	if w.end == nil {
+		// w is future-open, past-closed
+		// [------>
+
+		if x.end != nil && !x.end.After(*w.end) {
+			// x ends before w begins (or eq)
+			//          [------>
+			// ?------]
+			return false
+		}
+
+		//    [------>
+		// ?------?
+		return true
+	}
+
+	// Now we know w is closed, but we don't know about x
+	//  [------]
+	//     ?------?
+	if x.start == nil {
+		// TODO niko/cdmr
+	}
+
+	if x.end == nil {
+		// TODO niko/cdmr
+	}
+
+	// Both are closed.
+
+	if !x.start.Before(*w.end) && !x.end.Before(*w.end) {
+		// x starts and ends after w ends
+		// [------]
+		//          [------]
+		return false
+	}
+
+	if !x.start.After(*w.start) && !x.end.After(*w.start) {
+		// x starts and ends before w starts
+		//          [------]
+		// [------]
+		return false
+	}
+
+	// w and x must overlap
+	//    [------]
+	// [------]
+	return true
+}
+
 func (w Window) Set(start, end *time.Time) {
 	w.start = start
 	w.end = end

+ 29 - 25
pkg/kubecost/window_test.go

@@ -211,7 +211,7 @@ func TestParseWindowUTC(t *testing.T) {
 		t.Fatalf(`expect: window "month" to end before now; actual: %s ends after %s`, month, time.Now().UTC())
 	}
 
-	// TODO niko/etl lastweek
+	// TODO lastweek
 
 	lastmonth, err := ParseWindowUTC("lastmonth")
 	monthMinHours := float64(24 * 28)
@@ -542,30 +542,6 @@ func TestParseWindowWithOffsetString(t *testing.T) {
 
 }
 
-// TODO niko/etl
-// func TestWindow_Contains(t *testing.T) {}
-
-// TODO niko/etl
-// func TestWindow_Duration(t *testing.T) {}
-
-// TODO niko/etl
-// func TestWindow_End(t *testing.T) {}
-
-// TODO niko/etl
-// func TestWindow_Equal(t *testing.T) {}
-
-// TODO niko/etl
-// func TestWindow_ExpandStart(t *testing.T) {}
-
-// TODO niko/etl
-// func TestWindow_ExpandEnd(t *testing.T) {}
-
-// TODO niko/etl
-// func TestWindow_Start(t *testing.T) {}
-
-// TODO niko/etl
-// func TestWindow_String(t *testing.T) {}
-
 func TestWindow_DurationOffsetStrings(t *testing.T) {
 	w, err := ParseWindowUTC("1d")
 	if err != nil {
@@ -624,3 +600,31 @@ func TestWindow_DurationOffsetStrings(t *testing.T) {
 		t.Fatalf(`expect: window to be "1d"; actual: "%s"`, dur)
 	}
 }
+
+func TestWindow_Overlaps(t *testing.T) {
+	// TODO niko/cdmr
+}
+
+// TODO
+// func TestWindow_Contains(t *testing.T) {}
+
+// TODO
+// func TestWindow_Duration(t *testing.T) {}
+
+// TODO
+// func TestWindow_End(t *testing.T) {}
+
+// TODO
+// func TestWindow_Equal(t *testing.T) {}
+
+// TODO
+// func TestWindow_ExpandStart(t *testing.T) {}
+
+// TODO
+// func TestWindow_ExpandEnd(t *testing.T) {}
+
+// TODO
+// func TestWindow_Start(t *testing.T) {}
+
+// TODO
+// func TestWindow_String(t *testing.T) {}

+ 21 - 0
pkg/prom/result.go

@@ -230,6 +230,27 @@ func (qr *QueryResult) GetString(field string) (string, error) {
 	return strField, nil
 }
 
+// GetStrings returns the requested fields, or an error if it does not exist
+func (qr *QueryResult) GetStrings(fields ...string) (map[string]string, error) {
+	values := map[string]string{}
+
+	for _, field := range fields {
+		f, ok := qr.Metric[field]
+		if !ok {
+			return nil, fmt.Errorf("'%s' field does not exist in data result vector", field)
+		}
+
+		value, ok := f.(string)
+		if !ok {
+			return nil, fmt.Errorf("'%s' field is improperly formatted", field)
+		}
+
+		values[field] = value
+	}
+
+	return values, nil
+}
+
 // GetLabels returns all labels and their values from the query result
 func (qr *QueryResult) GetLabels() map[string]string {
 	result := make(map[string]string)

+ 2 - 2
test/cloud_test.go

@@ -279,7 +279,7 @@ func TestNodePriceFromCSVWithBadConfig(t *testing.T) {
 	fm := FakeClusterMap{}
 	d, _ := time.ParseDuration("1m")
 
-	model := costmodel.NewCostModel(fc, fm, d)
+	model := costmodel.NewCostModel(nil, nil, fc, fm, d)
 
 	_, err := model.GetNodeCost(c)
 	if err != nil {
@@ -333,7 +333,7 @@ func TestSourceMatchesFromCSV(t *testing.T) {
 	fm := FakeClusterMap{}
 	d, _ := time.ParseDuration("1m")
 
-	model := costmodel.NewCostModel(fc, fm, d)
+	model := costmodel.NewCostModel(nil, nil, fc, fm, d)
 
 	_, err = model.GetNodeCost(c)
 	if err != nil {