Kaynağa Gözat

Merge pull request #817 from kubecost/etl

ETL: utilities and functions for cascading stores and multiplexer
Niko Kovacevic 5 yıl önce
ebeveyn
işleme
f386a3b61b

+ 18 - 4
pkg/costmodel/allocation.go

@@ -64,6 +64,20 @@ const (
 	queryFmtLBActiveMins          = `count(kubecost_load_balancer_cost) by (namespace, service_name, %s)[%s:%s]%s`
 )
 
+// CanCompute should return true if CostModel can act as a valid source for the
+// given time range. In the case of CostModel we want to attempt to compute as
+// long as the range starts in the past. If the CostModel ends up not having
+// data to match, that's okay, and should be communicated with an error
+// response from ComputeAllocation.
+func (cm *CostModel) CanCompute(start, end time.Time) bool {
+	return start.Before(time.Now())
+}
+
+// Name returns the name of the Source
+func (cm *CostModel) Name() string {
+	return "CostModel"
+}
+
 // ComputeAllocation uses the CostModel instance to compute an AllocationSet
 // for the window defined by the given start and end times. The Allocations
 // returned are unaggregated (i.e. down to the container level).
@@ -1678,13 +1692,13 @@ func applyUnmountedPVs(window kubecost.Window, podMap map[podKey]*Pod, pvMap map
 			Cluster: cluster,
 			Name:    kubecost.UnmountedSuffix,
 		}
-		unmountedBreakDown := kubecost.PVAllocations{
+		unmountedPVs := kubecost.PVAllocations{
 			pvKey: {
 				ByteHours: unmountedPVBytes[cluster] * window.Minutes() / 60.0,
 				Cost:      amount,
 			},
 		}
-		podMap[key].Allocations[container].PVs = podMap[key].Allocations[container].PVs.Add(unmountedBreakDown)
+		podMap[key].Allocations[container].PVs = podMap[key].Allocations[container].PVs.Add(unmountedPVs)
 	}
 }
 
@@ -1730,13 +1744,13 @@ func applyUnmountedPVCs(window kubecost.Window, podMap map[podKey]*Pod, pvcMap m
 			Cluster: cluster,
 			Name:    kubecost.UnmountedSuffix,
 		}
-		unmountedBreakDown := kubecost.PVAllocations{
+		unmountedPVs := kubecost.PVAllocations{
 			pvKey: {
 				ByteHours: unmountedPVCBytes[key] * window.Minutes() / 60.0,
 				Cost:      amount,
 			},
 		}
-		podMap[podKey].Allocations[container].PVs = podMap[podKey].Allocations[container].PVs.Add(unmountedBreakDown)
+		podMap[podKey].Allocations[container].PVs = podMap[podKey].Allocations[container].PVs.Add(unmountedPVs)
 
 	}
 }

+ 10 - 15
pkg/kubecost/allocation.go

@@ -75,7 +75,6 @@ type Allocation struct {
 	RAMCost                    float64               `json:"ramCost"`
 	RAMCostAdjustment          float64               `json:"ramCostAdjustment"`
 	SharedCost                 float64               `json:"sharedCost"`
-	SharedCostAdjustment       float64               `json:"sharedCostAdjustment"`
 	ExternalCost               float64               `json:"externalCost"`
 	// RawAllocationOnly is a pointer so if it is not present it will be
 	// marshalled as null rather than as an object with Go default values.
@@ -218,7 +217,6 @@ func (a *Allocation) Clone() *Allocation {
 		RAMCost:                    a.RAMCost,
 		RAMCostAdjustment:          a.RAMCostAdjustment,
 		SharedCost:                 a.SharedCost,
-		SharedCostAdjustment:       a.SharedCostAdjustment,
 		ExternalCost:               a.ExternalCost,
 		RawAllocationOnly:          a.RawAllocationOnly.Clone(),
 	}
@@ -305,9 +303,6 @@ func (a *Allocation) Equal(that *Allocation) bool {
 	if !util.IsApproximately(a.SharedCost, that.SharedCost) {
 		return false
 	}
-	if !util.IsApproximately(a.SharedCostAdjustment, that.SharedCostAdjustment) {
-		return false
-	}
 	if !util.IsApproximately(a.ExternalCost, that.ExternalCost) {
 		return false
 	}
@@ -380,7 +375,7 @@ func (a *Allocation) LBTotalCost() float64 {
 
 // SharedTotalCost calculates total shared cost of Allocation including adjustment
 func (a *Allocation) SharedTotalCost() float64 {
-	return a.SharedCost + a.SharedCostAdjustment
+	return a.SharedCost
 }
 
 // PVCost calculate cumulative cost of all PVs that Allocation is attached to
@@ -512,7 +507,6 @@ func (a *Allocation) MarshalJSON() ([]byte, error) {
 	jsonEncodeFloat64(buffer, "ramCostAdjustment", a.RAMCostAdjustment, ",")
 	jsonEncodeFloat64(buffer, "ramEfficiency", a.RAMEfficiency(), ",")
 	jsonEncodeFloat64(buffer, "sharedCost", a.SharedCost, ",")
-	jsonEncodeFloat64(buffer, "sharedCostAdjustment", a.SharedCostAdjustment, ",")
 	jsonEncodeFloat64(buffer, "externalCost", a.ExternalCost, ",")
 	jsonEncodeFloat64(buffer, "totalCost", a.TotalCost(), ",")
 	jsonEncodeFloat64(buffer, "totalEfficiency", a.TotalEfficiency(), ",")
@@ -648,7 +642,6 @@ func (a *Allocation) add(that *Allocation) {
 	a.PVCostAdjustment += that.PVCostAdjustment
 	a.NetworkCostAdjustment += that.NetworkCostAdjustment
 	a.LoadBalancerCostAdjustment += that.LoadBalancerCostAdjustment
-	a.SharedCostAdjustment += that.SharedCostAdjustment
 
 	// Any data that is in a "raw allocation only" is not valid in any
 	// sort of cumulative Allocation (like one that is added).
@@ -662,6 +655,7 @@ type AllocationSet struct {
 	allocations  map[string]*Allocation
 	externalKeys map[string]bool
 	idleKeys     map[string]bool
+	FromSource   string // stores the name of the source used to compute the data
 	Window       Window
 	Warnings     []string
 	Errors       []string
@@ -1948,11 +1942,11 @@ func (as *AllocationSet) Each(f func(string, *Allocation)) {
 // End returns the End time of the AllocationSet window
 func (as *AllocationSet) End() time.Time {
 	if as == nil {
-		log.Warningf("Allocation ETL: calling End on nil AllocationSet")
+		log.Warningf("AllocationSet: calling End on nil AllocationSet")
 		return time.Unix(0, 0)
 	}
 	if as.Window.End() == nil {
-		log.Warningf("Allocation ETL: AllocationSet with illegal window: End is nil; len(as.allocations)=%d", len(as.allocations))
+		log.Warningf("AllocationSet: AllocationSet with illegal window: End is nil; len(as.allocations)=%d", len(as.allocations))
 		return time.Unix(0, 0)
 	}
 	return *as.Window.End()
@@ -2153,11 +2147,11 @@ func (as *AllocationSet) Set(alloc *Allocation) error {
 // Start returns the Start time of the AllocationSet window
 func (as *AllocationSet) Start() time.Time {
 	if as == nil {
-		log.Warningf("Allocation ETL: calling Start on nil AllocationSet")
+		log.Warningf("AllocationSet: calling Start on nil AllocationSet")
 		return time.Unix(0, 0)
 	}
 	if as.Window.Start() == nil {
-		log.Warningf("Allocation ETL: AllocationSet with illegal window: Start is nil; len(as.allocations)=%d", len(as.allocations))
+		log.Warningf("AllocationSet: AllocationSet with illegal window: Start is nil; len(as.allocations)=%d", len(as.allocations))
 		return time.Unix(0, 0)
 	}
 	return *as.Window.Start()
@@ -2196,11 +2190,11 @@ func (as *AllocationSet) UTCOffset() time.Duration {
 
 func (as *AllocationSet) accumulate(that *AllocationSet) (*AllocationSet, error) {
 	if as.IsEmpty() {
-		return that, nil
+		return that.Clone(), nil
 	}
 
 	if that.IsEmpty() {
-		return as, nil
+		return as.Clone(), nil
 	}
 
 	// Set start, end to min(start), max(end)
@@ -2245,6 +2239,7 @@ func (as *AllocationSet) accumulate(that *AllocationSet) (*AllocationSet, error)
 type AllocationSetRange struct {
 	sync.RWMutex
 	allocations []*AllocationSet
+	FromStore   string // stores the name of the store used to retrieve the data
 }
 
 // NewAllocationSetRange instantiates a new range composed of the given
@@ -2274,7 +2269,7 @@ func (asr *AllocationSetRange) Accumulate() (*AllocationSet, error) {
 	return allocSet, nil
 }
 
-// TODO niko/etl accumulate into lower-resolution chunks of the given resolution
+// TODO accumulate into lower-resolution chunks of the given resolution
 // func (asr *AllocationSetRange) AccumulateBy(resolution time.Duration) *AllocationSetRange
 
 // AggregateBy aggregates each AllocationSet in the range by the given

+ 0 - 61
pkg/kubecost/allocationprops_test.go

@@ -1,61 +0,0 @@
-package kubecost
-
-// TODO niko/etl
-// func TestParseProperty(t *testing.T) {}
-
-// TODO niko/etl
-// func TestProperty_String(t *testing.T) {}
-
-// TODO niko/etl
-// func TestProperties_Clone(t *testing.T) {}
-
-// TODO niko/etl
-// func TestProperties_Intersection(t *testing.T) {}
-
-// TODO niko/etl
-// func TestProperties_GetCluster(t *testing.T) {}
-
-// TODO niko/etl
-// func TestProperties_SetCluster(t *testing.T) {}
-
-// TODO niko/etl
-// func TestProperties_GetContainer(t *testing.T) {}
-
-// TODO niko/etl
-// func TestProperties_SetContainer(t *testing.T) {}
-
-// TODO niko/etl
-// func TestProperties_GetController(t *testing.T) {}
-
-// TODO niko/etl
-// func TestProperties_SetController(t *testing.T) {}
-
-// TODO niko/etl
-// func TestProperties_GetControllerKind(t *testing.T) {}
-
-// TODO niko/etl
-// func TestProperties_SetControllerKind(t *testing.T) {}
-
-// TODO niko/etl
-// func TestProperties_GetLabels(t *testing.T) {}
-
-// TODO niko/etl
-// func TestProperties_SetLabels(t *testing.T) {}
-
-// TODO niko/etl
-// func TestProperties_GetNamespace(t *testing.T) {}
-
-// TODO niko/etl
-// func TestProperties_SetNamespace(t *testing.T) {}
-
-// TODO niko/etl
-// func TestProperties_GetPod(t *testing.T) {}
-
-// TODO niko/etl
-// func TestProperties_SetPod(t *testing.T) {}
-
-// TODO niko/etl
-// func TestProperties_GetServices(t *testing.T) {}
-
-// TODO niko/etl
-// func TestProperties_SetServices(t *testing.T) {}

+ 38 - 35
pkg/kubecost/asset.go

@@ -12,8 +12,6 @@ import (
 	"github.com/kubecost/cost-model/pkg/util/json"
 )
 
-const timeFmt = "2006-01-02T15:04:05-0700"
-
 // UndefinedKey is used in composing Asset group keys if the group does not have that property defined.
 // E.g. if aggregating on Cluster, Assets in the AssetSet where Asset has no cluster will be grouped under key "__undefined__"
 const UndefinedKey = "__undefined__"
@@ -285,10 +283,10 @@ func key(a Asset, aggregateBy []string) (string, error) {
 				}
 			} else {
 				// Don't allow aggregating on label ""
-				return "", fmt.Errorf("Attempted to aggregate on invalid key: %s", s)
+				return "", fmt.Errorf("attempted to aggregate on invalid key: %s", s)
 			}
 		default:
-			return "", fmt.Errorf("Attempted to aggregate on invalid key: %s", s)
+			return "", fmt.Errorf("attempted to aggregate on invalid key: %s", s)
 		}
 
 		if key != "" {
@@ -596,9 +594,9 @@ func (a *Any) MarshalJSON() ([]byte, error) {
 	buffer := bytes.NewBufferString("{")
 	jsonEncode(buffer, "properties", a.Properties(), ",")
 	jsonEncode(buffer, "labels", a.Labels(), ",")
-	jsonEncodeString(buffer, "window", a.Window().String(), ",")
-	jsonEncodeString(buffer, "start", a.Start().Format(timeFmt), ",")
-	jsonEncodeString(buffer, "end", a.End().Format(timeFmt), ",")
+	jsonEncode(buffer, "window", a.Window(), ",")
+	jsonEncodeString(buffer, "start", a.Start().Format(time.RFC3339), ",")
+	jsonEncodeString(buffer, "end", a.End().Format(time.RFC3339), ",")
 	jsonEncodeFloat64(buffer, "minutes", a.Minutes(), ",")
 	jsonEncodeFloat64(buffer, "adjustment", a.Adjustment(), ",")
 	jsonEncodeFloat64(buffer, "totalCost", a.TotalCost(), "")
@@ -839,9 +837,9 @@ func (ca *Cloud) MarshalJSON() ([]byte, error) {
 	jsonEncodeString(buffer, "type", ca.Type().String(), ",")
 	jsonEncode(buffer, "properties", ca.Properties(), ",")
 	jsonEncode(buffer, "labels", ca.Labels(), ",")
-	jsonEncodeString(buffer, "window", ca.Window().String(), ",")
-	jsonEncodeString(buffer, "start", ca.Start().Format(timeFmt), ",")
-	jsonEncodeString(buffer, "end", ca.End().Format(timeFmt), ",")
+	jsonEncode(buffer, "window", ca.Window(), ",")
+	jsonEncodeString(buffer, "start", ca.Start().Format(time.RFC3339), ",")
+	jsonEncodeString(buffer, "end", ca.End().Format(time.RFC3339), ",")
 	jsonEncodeFloat64(buffer, "minutes", ca.Minutes(), ",")
 	jsonEncodeFloat64(buffer, "adjustment", ca.Adjustment(), ",")
 	jsonEncodeFloat64(buffer, "credit", ca.Credit, ",")
@@ -1039,9 +1037,9 @@ func (cm *ClusterManagement) MarshalJSON() ([]byte, error) {
 	jsonEncodeString(buffer, "type", cm.Type().String(), ",")
 	jsonEncode(buffer, "properties", cm.Properties(), ",")
 	jsonEncode(buffer, "labels", cm.Labels(), ",")
-	jsonEncodeString(buffer, "window", cm.Window().String(), ",")
-	jsonEncodeString(buffer, "start", cm.Start().Format(timeFmt), ",")
-	jsonEncodeString(buffer, "end", cm.End().Format(timeFmt), ",")
+	jsonEncode(buffer, "window", cm.Window(), ",")
+	jsonEncodeString(buffer, "start", cm.Start().Format(time.RFC3339), ",")
+	jsonEncodeString(buffer, "end", cm.End().Format(time.RFC3339), ",")
 	jsonEncodeFloat64(buffer, "minutes", cm.Minutes(), ",")
 	jsonEncodeFloat64(buffer, "totalCost", cm.TotalCost(), "")
 	buffer.WriteString("}")
@@ -1320,9 +1318,9 @@ func (d *Disk) MarshalJSON() ([]byte, error) {
 	jsonEncodeString(buffer, "type", d.Type().String(), ",")
 	jsonEncode(buffer, "properties", d.Properties(), ",")
 	jsonEncode(buffer, "labels", d.Labels(), ",")
-	jsonEncodeString(buffer, "window", d.Window().String(), ",")
-	jsonEncodeString(buffer, "start", d.Start().Format(timeFmt), ",")
-	jsonEncodeString(buffer, "end", d.End().Format(timeFmt), ",")
+	jsonEncode(buffer, "window", d.Window(), ",")
+	jsonEncodeString(buffer, "start", d.Start().Format(time.RFC3339), ",")
+	jsonEncodeString(buffer, "end", d.End().Format(time.RFC3339), ",")
 	jsonEncodeFloat64(buffer, "minutes", d.Minutes(), ",")
 	jsonEncodeFloat64(buffer, "byteHours", d.ByteHours, ",")
 	jsonEncodeFloat64(buffer, "bytes", d.Bytes(), ",")
@@ -1637,9 +1635,9 @@ func (n *Network) MarshalJSON() ([]byte, error) {
 	jsonEncodeString(buffer, "type", n.Type().String(), ",")
 	jsonEncode(buffer, "properties", n.Properties(), ",")
 	jsonEncode(buffer, "labels", n.Labels(), ",")
-	jsonEncodeString(buffer, "window", n.Window().String(), ",")
-	jsonEncodeString(buffer, "start", n.Start().Format(timeFmt), ",")
-	jsonEncodeString(buffer, "end", n.End().Format(timeFmt), ",")
+	jsonEncode(buffer, "window", n.Window(), ",")
+	jsonEncodeString(buffer, "start", n.Start().Format(time.RFC3339), ",")
+	jsonEncodeString(buffer, "end", n.End().Format(time.RFC3339), ",")
 	jsonEncodeFloat64(buffer, "minutes", n.Minutes(), ",")
 	jsonEncodeFloat64(buffer, "adjustment", n.Adjustment(), ",")
 	jsonEncodeFloat64(buffer, "totalCost", n.TotalCost(), "")
@@ -1997,9 +1995,9 @@ func (n *Node) MarshalJSON() ([]byte, error) {
 	jsonEncodeString(buffer, "type", n.Type().String(), ",")
 	jsonEncode(buffer, "properties", n.Properties(), ",")
 	jsonEncode(buffer, "labels", n.Labels(), ",")
-	jsonEncodeString(buffer, "window", n.Window().String(), ",")
-	jsonEncodeString(buffer, "start", n.Start().Format(timeFmt), ",")
-	jsonEncodeString(buffer, "end", n.End().Format(timeFmt), ",")
+	jsonEncode(buffer, "window", n.Window(), ",")
+	jsonEncodeString(buffer, "start", n.Start().Format(time.RFC3339), ",")
+	jsonEncodeString(buffer, "end", n.End().Format(time.RFC3339), ",")
 	jsonEncodeFloat64(buffer, "minutes", n.Minutes(), ",")
 	jsonEncodeString(buffer, "nodeType", n.NodeType, ",")
 	jsonEncodeFloat64(buffer, "cpuCores", n.CPUCores(), ",")
@@ -2303,9 +2301,9 @@ func (lb *LoadBalancer) MarshalJSON() ([]byte, error) {
 	jsonEncodeString(buffer, "type", lb.Type().String(), ",")
 	jsonEncode(buffer, "properties", lb.Properties(), ",")
 	jsonEncode(buffer, "labels", lb.Labels(), ",")
-	jsonEncodeString(buffer, "window", lb.Window().String(), ",")
-	jsonEncodeString(buffer, "start", lb.Start().Format(timeFmt), ",")
-	jsonEncodeString(buffer, "end", lb.End().Format(timeFmt), ",")
+	jsonEncode(buffer, "window", lb.Window(), ",")
+	jsonEncodeString(buffer, "start", lb.Start().Format(time.RFC3339), ",")
+	jsonEncodeString(buffer, "end", lb.End().Format(time.RFC3339), ",")
 	jsonEncodeFloat64(buffer, "minutes", lb.Minutes(), ",")
 	jsonEncodeFloat64(buffer, "adjustment", lb.Adjustment(), ",")
 	jsonEncodeFloat64(buffer, "totalCost", lb.TotalCost(), "")
@@ -2514,9 +2512,9 @@ func (sa *SharedAsset) MarshalJSON() ([]byte, error) {
 	jsonEncode(buffer, "labels", sa.Labels(), ",")
 	jsonEncode(buffer, "properties", sa.Properties(), ",")
 	jsonEncode(buffer, "labels", sa.Labels(), ",")
-	jsonEncodeString(buffer, "window", sa.Window().String(), ",")
-	jsonEncodeString(buffer, "start", sa.Start().Format(timeFmt), ",")
-	jsonEncodeString(buffer, "end", sa.End().Format(timeFmt), ",")
+	jsonEncode(buffer, "window", sa.Window(), ",")
+	jsonEncodeString(buffer, "start", sa.Start().Format(time.RFC3339), ",")
+	jsonEncodeString(buffer, "end", sa.End().Format(time.RFC3339), ",")
 	jsonEncodeFloat64(buffer, "minutes", sa.Minutes(), ",")
 	jsonEncodeFloat64(buffer, "totalCost", sa.TotalCost(), "")
 	buffer.WriteString("}")
@@ -2534,6 +2532,7 @@ type AssetSet struct {
 	sync.RWMutex
 	aggregateBy []string
 	assets      map[string]Asset
+	FromSource  string // stores the name of the source used to compute the data
 	Window      Window
 	Warnings    []string
 	Errors      []string
@@ -2575,7 +2574,7 @@ func (as *AssetSet) AggregateBy(aggregateBy []string, opts *AssetAggregationOpti
 	// Compute hours of the given AssetSet, and if it ends in the future,
 	// adjust the hours accordingly
 	hours := as.Window.Minutes() / 60.0
-	diff := time.Now().Sub(as.End())
+	diff := time.Since(as.End())
 	if diff < 0.0 {
 		hours += diff.Hours()
 	}
@@ -2628,10 +2627,7 @@ func (as *AssetSet) Clone() *AssetSet {
 
 	var aggregateBy []string
 	if as.aggregateBy != nil {
-		aggregateBy := []string{}
-		for _, s := range as.aggregateBy {
-			aggregateBy = append(aggregateBy, s)
-		}
+		aggregateBy = append([]string{}, as.aggregateBy...)
 	}
 
 	assets := map[string]Asset{}
@@ -2929,11 +2925,18 @@ func (as *AssetSet) accumulate(that *AssetSet) (*AssetSet, error) {
 	return acc, nil
 }
 
+// AssetSetRange is a thread-safe slice of AssetSets. It is meant to
+// be used such that the AssetSets held are consecutive and coherent with
+// respect to using the same aggregation properties, UTC offset, and
+// resolution. However these rules are not necessarily enforced, so use wisely.
 type AssetSetRange struct {
 	sync.RWMutex
-	assets []*AssetSet
+	assets    []*AssetSet
+	FromStore string // stores the name of the store used to retrieve the data
 }
 
+// NewAssetSetRange instantiates a new range composed of the given
+// AssetSets in the order provided.
 func NewAssetSetRange(assets ...*AssetSet) *AssetSetRange {
 	return &AssetSetRange{
 		assets: assets,
@@ -3023,7 +3026,7 @@ func (asr *AssetSetRange) Length() int {
 
 func (asr *AssetSetRange) MarshalJSON() ([]byte, error) {
 	asr.RLock()
-	asr.RUnlock()
+	defer asr.RUnlock()
 	return json.Marshal(asr.assets)
 }
 

+ 1 - 1
pkg/kubecost/bingen.go

@@ -29,4 +29,4 @@ package kubecost
 // @bingen:generate:PVKey
 // @bingen:generate:PVAllocation
 
-//go:generate bingen -package=kubecost -version=13 -buffer=github.com/kubecost/cost-model/pkg/util
+//go:generate bingen -package=kubecost -version=14 -buffer=github.com/kubecost/cost-model/pkg/util

+ 67 - 54
pkg/kubecost/kubecost_codecs.go

@@ -14,10 +14,11 @@ package kubecost
 import (
 	"encoding"
 	"fmt"
-	util "github.com/kubecost/cost-model/pkg/util"
 	"reflect"
 	"strings"
 	"time"
+
+	util "github.com/kubecost/cost-model/pkg/util"
 )
 
 const (
@@ -25,7 +26,7 @@ const (
 	GeneratorPackageName string = "kubecost"
 
 	// CodecVersion is the version passed into the generator
-	CodecVersion uint8 = 13
+	CodecVersion uint8 = 14
 )
 
 //--------------------------------------------------------------------------
@@ -220,7 +221,6 @@ func (target *Allocation) MarshalBinary() (data []byte, err error) {
 	buff.WriteFloat64(target.RAMCost)                // write float64
 	buff.WriteFloat64(target.RAMCostAdjustment)      // write float64
 	buff.WriteFloat64(target.SharedCost)             // write float64
-	buff.WriteFloat64(target.SharedCostAdjustment)   // write float64
 	buff.WriteFloat64(target.ExternalCost)           // write float64
 	if target.RawAllocationOnly == nil {
 		buff.WriteUInt8(uint8(0)) // write nil byte
@@ -418,23 +418,20 @@ func (target *Allocation) UnmarshalBinary(data []byte) (err error) {
 	target.SharedCost = uu
 
 	ww := buff.ReadFloat64() // read float64
-	target.SharedCostAdjustment = ww
-
-	xx := buff.ReadFloat64() // read float64
-	target.ExternalCost = xx
+	target.ExternalCost = ww
 
 	if buff.ReadUInt8() == uint8(0) {
 		target.RawAllocationOnly = nil
 	} else {
 		// --- [begin][read][struct](RawAllocationOnlyData) ---
-		yy := &RawAllocationOnlyData{}
-		aaa := buff.ReadInt()      // byte array length
-		bbb := buff.ReadBytes(aaa) // byte array
-		errG := yy.UnmarshalBinary(bbb)
+		xx := &RawAllocationOnlyData{}
+		yy := buff.ReadInt()      // byte array length
+		aaa := buff.ReadBytes(yy) // byte array
+		errG := xx.UnmarshalBinary(aaa)
 		if errG != nil {
 			return errG
 		}
-		target.RawAllocationOnly = yy
+		target.RawAllocationOnly = xx
 		// --- [end][read][struct](RawAllocationOnlyData) ---
 
 	}
@@ -721,6 +718,7 @@ func (target *AllocationSet) MarshalBinary() (data []byte, err error) {
 		// --- [end][write][map](map[string]bool) ---
 
 	}
+	buff.WriteString(target.FromSource) // write string
 	// --- [begin][write][struct](Window) ---
 	b, errB := target.Window.MarshalBinary()
 	if errB != nil {
@@ -858,31 +856,34 @@ func (target *AllocationSet) UnmarshalBinary(data []byte) (err error) {
 		// --- [end][read][map](map[string]bool) ---
 
 	}
+	q := buff.ReadString() // read string
+	target.FromSource = q
+
 	// --- [begin][read][struct](Window) ---
-	q := &Window{}
-	r := buff.ReadInt()    // byte array length
-	s := buff.ReadBytes(r) // byte array
-	errB := q.UnmarshalBinary(s)
+	r := &Window{}
+	s := buff.ReadInt()    // byte array length
+	t := buff.ReadBytes(s) // byte array
+	errB := r.UnmarshalBinary(t)
 	if errB != nil {
 		return errB
 	}
-	target.Window = *q
+	target.Window = *r
 	// --- [end][read][struct](Window) ---
 
 	if buff.ReadUInt8() == uint8(0) {
 		target.Warnings = nil
 	} else {
 		// --- [begin][read][slice]([]string) ---
-		u := buff.ReadInt() // array len
-		t := make([]string, u)
-		for jj := 0; jj < u; jj++ {
-			var w string
-			x := buff.ReadString() // read string
-			w = x
+		w := buff.ReadInt() // array len
+		u := make([]string, w)
+		for jj := 0; jj < w; jj++ {
+			var x string
+			y := buff.ReadString() // read string
+			x = y
 
-			t[jj] = w
+			u[jj] = x
 		}
-		target.Warnings = t
+		target.Warnings = u
 		// --- [end][read][slice]([]string) ---
 
 	}
@@ -890,16 +891,16 @@ func (target *AllocationSet) UnmarshalBinary(data []byte) (err error) {
 		target.Errors = nil
 	} else {
 		// --- [begin][read][slice]([]string) ---
-		aa := buff.ReadInt() // array len
-		y := make([]string, aa)
-		for iii := 0; iii < aa; iii++ {
-			var bb string
-			cc := buff.ReadString() // read string
-			bb = cc
+		bb := buff.ReadInt() // array len
+		aa := make([]string, bb)
+		for iii := 0; iii < bb; iii++ {
+			var cc string
+			dd := buff.ReadString() // read string
+			cc = dd
 
-			y[iii] = bb
+			aa[iii] = cc
 		}
-		target.Errors = y
+		target.Errors = aa
 		// --- [end][read][slice]([]string) ---
 
 	}
@@ -956,6 +957,7 @@ func (target *AllocationSetRange) MarshalBinary() (data []byte, err error) {
 		// --- [end][write][slice]([]*AllocationSet) ---
 
 	}
+	buff.WriteString(target.FromStore) // write string
 	return buff.Bytes(), nil
 }
 
@@ -1012,6 +1014,9 @@ func (target *AllocationSetRange) UnmarshalBinary(data []byte) (err error) {
 		// --- [end][read][slice]([]*AllocationSet) ---
 
 	}
+	g := buff.ReadString() // read string
+	target.FromStore = g
+
 	return nil
 }
 
@@ -1364,6 +1369,7 @@ func (target *AssetSet) MarshalBinary() (data []byte, err error) {
 		// --- [end][write][map](map[string]Asset) ---
 
 	}
+	buff.WriteString(target.FromSource) // write string
 	// --- [begin][write][struct](Window) ---
 	d, errB := target.Window.MarshalBinary()
 	if errB != nil {
@@ -1484,31 +1490,34 @@ func (target *AssetSet) UnmarshalBinary(data []byte) (err error) {
 		// --- [end][read][map](map[string]Asset) ---
 
 	}
+	o := buff.ReadString() // read string
+	target.FromSource = o
+
 	// --- [begin][read][struct](Window) ---
-	o := &Window{}
-	p := buff.ReadInt()    // byte array length
-	q := buff.ReadBytes(p) // byte array
-	errB := o.UnmarshalBinary(q)
+	p := &Window{}
+	q := buff.ReadInt()    // byte array length
+	r := buff.ReadBytes(q) // byte array
+	errB := p.UnmarshalBinary(r)
 	if errB != nil {
 		return errB
 	}
-	target.Window = *o
+	target.Window = *p
 	// --- [end][read][struct](Window) ---
 
 	if buff.ReadUInt8() == uint8(0) {
 		target.Warnings = nil
 	} else {
 		// --- [begin][read][slice]([]string) ---
-		s := buff.ReadInt() // array len
-		r := make([]string, s)
-		for ii := 0; ii < s; ii++ {
-			var t string
-			u := buff.ReadString() // read string
-			t = u
+		t := buff.ReadInt() // array len
+		s := make([]string, t)
+		for ii := 0; ii < t; ii++ {
+			var u string
+			w := buff.ReadString() // read string
+			u = w
 
-			r[ii] = t
+			s[ii] = u
 		}
-		target.Warnings = r
+		target.Warnings = s
 		// --- [end][read][slice]([]string) ---
 
 	}
@@ -1516,16 +1525,16 @@ func (target *AssetSet) UnmarshalBinary(data []byte) (err error) {
 		target.Errors = nil
 	} else {
 		// --- [begin][read][slice]([]string) ---
-		x := buff.ReadInt() // array len
-		w := make([]string, x)
-		for jj := 0; jj < x; jj++ {
-			var y string
-			aa := buff.ReadString() // read string
-			y = aa
+		y := buff.ReadInt() // array len
+		x := make([]string, y)
+		for jj := 0; jj < y; jj++ {
+			var aa string
+			bb := buff.ReadString() // read string
+			aa = bb
 
-			w[jj] = y
+			x[jj] = aa
 		}
-		target.Errors = w
+		target.Errors = x
 		// --- [end][read][slice]([]string) ---
 
 	}
@@ -1582,6 +1591,7 @@ func (target *AssetSetRange) MarshalBinary() (data []byte, err error) {
 		// --- [end][write][slice]([]*AssetSet) ---
 
 	}
+	buff.WriteString(target.FromStore) // write string
 	return buff.Bytes(), nil
 }
 
@@ -1638,6 +1648,9 @@ func (target *AssetSetRange) UnmarshalBinary(data []byte) (err error) {
 		// --- [end][read][slice]([]*AssetSet) ---
 
 	}
+	g := buff.ReadString() // read string
+	target.FromStore = g
+
 	return nil
 }
 

+ 87 - 0
pkg/util/time.go

@@ -3,6 +3,7 @@ package util
 import (
 	"fmt"
 	"strconv"
+	"sync"
 	"time"
 )
 
@@ -136,3 +137,89 @@ func normalizeTimeParam(param string) (string, error) {
 
 	return param, nil
 }
+
+// FormatStoreResolution provides a clean notation for ETL store resolutions.
+// e.g. daily => 1d; hourly => 1h
+func FormatStoreResolution(dur time.Duration) string {
+	if dur >= 24*time.Hour {
+		return fmt.Sprintf("%dd", int(dur.Hours()/24.0))
+	} else if dur >= time.Hour {
+		return fmt.Sprintf("%dh", int(dur.Hours()))
+	}
+	return fmt.Sprint(dur)
+}
+
+// JobTicker is a ticker used to synchronize the next run of a repeating
+// process. The designated use-case is for infinitely-looping selects,
+// where a timeout or an exit channel might cancel the process, but otherwise
+// the intent is to wait at the select for some amount of time until the
+// next run. This differs from a standard ticker, which ticks without
+// waiting and drops any missed ticks; rather, this ticker must be kicked
+// off manually for each tick, so that after the current run of the job
+// completes, the timer starts again.
+type JobTicker struct {
+	Ch     <-chan time.Time
+	ch     chan time.Time
+	closed bool
+	mx     sync.Mutex
+}
+
+// NewJobTicker instantiates a new JobTicker.
+func NewJobTicker() *JobTicker {
+	c := make(chan time.Time)
+
+	return &JobTicker{
+		Ch:     c,
+		ch:     c,
+		closed: false,
+	}
+}
+
+// Close closes the JobTicker channels
+func (jt *JobTicker) Close() {
+	jt.mx.Lock()
+	defer jt.mx.Unlock()
+
+	if jt.closed {
+		return
+	}
+
+	jt.closed = true
+	close(jt.ch)
+}
+
+// TickAt schedules the next tick of the ticker for the given time in the
+// future. If the time is not in the future, the ticker will tick immediately.
+func (jt *JobTicker) TickAt(t time.Time) {
+	go func(t time.Time) {
+		n := time.Now()
+		if t.After(n) {
+			time.Sleep(t.Sub(n))
+		}
+
+		jt.mx.Lock()
+		defer jt.mx.Unlock()
+
+		if !jt.closed {
+			jt.ch <- time.Now()
+		}
+	}(t)
+}
+
+// TickIn schedules the next tick of the ticker for the given duration into
+// the future. If the duration is less than or equal to zero, the ticker will
+// tick immediately.
+func (jt *JobTicker) TickIn(d time.Duration) {
+	go func(d time.Duration) {
+		if d > 0 {
+			time.Sleep(d)
+		}
+
+		jt.mx.Lock()
+		defer jt.mx.Unlock()
+
+		if !jt.closed {
+			jt.ch <- time.Now()
+		}
+	}(d)
+}