Explorar o código

Added 3 variations of map pools for vector storage in ApplyVectorOps. Default to the flexible implementation (which doesn't block) with 4 maps that can be shared across threads. Combined as much loop logic as possible, and re-use a single timestamps slice instead of allocating two new slices for results and timestamps.

Matt Bolt %!s(int64=6) %!d(string=hai) anos
pai
achega
b92c2105a4
Modificáronse 2 ficheiros con 188 adicións e 27 borrados
  1. 131 0
      costmodel/pool.go
  2. 57 27
      costmodel/vector.go

+ 131 - 0
costmodel/pool.go

@@ -0,0 +1,131 @@
+package costmodel
+
+import (
+	"sync"
+)
+
+// A pool of vector maps for mapping float64 timestamps
+// to float64 values
+type VectorMapPool interface {
+	Get() map[float64]float64
+	Put(map[float64]float64)
+}
+
+// ------------
+
+// A buffered channel implementation of a vector map pool which
+// controls the total number of maps allowed in/out of the pool
+// at any given moment. Attempting to Get() with no available
+// maps will block until one is available. You will be unable to
+// Put() a map if the buffer is full.
+type FixedMapPool struct {
+	maps chan map[float64]float64
+	size int
+}
+
+// Returns a map from the pool. Blocks if no maps are available for re-use
+func (mp *FixedMapPool) Get() map[float64]float64 {
+	return <-mp.maps
+}
+
+// Adds a map back to the pool if there is room. Does not block on overflow.
+func (mp *FixedMapPool) Put(m map[float64]float64) {
+	if len(mp.maps) >= mp.size {
+		return
+	}
+
+	for k := range m {
+		delete(m, k)
+	}
+
+	mp.maps <- m
+}
+
+// Creates a new fixed map pool which maintains a fixed pool size
+func NewFixedMapPool(size int) VectorMapPool {
+	mp := &FixedMapPool{
+		maps: make(chan map[float64]float64, size),
+		size: size,
+	}
+
+	// Pre-Populate the buffer with maps
+	for i := 0; i < size; i++ {
+		mp.maps <- make(map[float64]float64)
+	}
+
+	return mp
+}
+
+// ------------
+
+// A buffered channel implementation of a vector map pool which
+// controls the total number of maps allowed in/out of the pool
+// at any given moment. Unlike the FixedMapPool, this pool will
+// not block if maps are over requested, but will only maintain
+// a buffer up the size limitation.
+type FlexibleMapPool struct {
+	maps chan map[float64]float64
+}
+
+// Returns a map from the pool. Does not block on over-request.
+func (mp *FlexibleMapPool) Get() map[float64]float64 {
+	select {
+	case m := <-mp.maps:
+		return m
+	default:
+		return make(map[float64]float64)
+	}
+}
+
+// Adds a map back to the pool if there is room. Does not block on overflow.
+func (mp *FlexibleMapPool) Put(m map[float64]float64) {
+	for k := range m {
+		delete(m, k)
+	}
+
+	// Either return the map to the buffered channel, or do nothing
+	select {
+	case mp.maps <- m:
+	default:
+	}
+}
+
+// Creates a new fixed map pool which maintains a fixed pool size
+func NewFlexibleMapPool(size int) VectorMapPool {
+	return &FlexibleMapPool{
+		maps: make(chan map[float64]float64, size),
+	}
+}
+
+// ------------
+
+// Implementation backed by sync.Pool
+type UnboundedMapPool struct {
+	maps *sync.Pool
+}
+
+// Returns a map from the pool. Does not block on over-request.
+func (mp *UnboundedMapPool) Get() map[float64]float64 {
+	return mp.maps.Get().(map[float64]float64)
+}
+
+// Adds a map back to the pool if there is room. Does not block on overflow.
+func (mp *UnboundedMapPool) Put(m map[float64]float64) {
+	for k := range m {
+		delete(m, k)
+	}
+
+	mp.maps.Put(m)
+}
+
+// Creates a new unbounded map pool which allows the runtime to decide when
+// pooled values should be evicted
+func NewUnboundedMapPool() VectorMapPool {
+	return &UnboundedMapPool{
+		maps: &sync.Pool{
+			New: func() interface{} {
+				return make(map[float64]float64)
+			},
+		},
+	}
+}

+ 57 - 27
costmodel/vector.go

@@ -10,6 +10,16 @@ type Vector struct {
 	Value     float64 `json:"value"`
 }
 
+const MapPoolSize = 4
+
+type VectorSlice []*Vector
+
+func (p VectorSlice) Len() int           { return len(p) }
+func (p VectorSlice) Less(i, j int) bool { return p[i].Timestamp < p[j].Timestamp }
+func (p VectorSlice) Swap(i, j int)      { p[i], p[j] = p[j], p[i] }
+
+var mapPool VectorMapPool = NewFlexibleMapPool(MapPoolSize)
+
 // roundTimestamp rounds the given timestamp to the given precision; e.g. a
 // timestamp given in seconds, rounded to precision 10, will be rounded
 // to the nearest value dividible by 10 (24 goes to 20, but 25 goes to 30).
@@ -17,21 +27,21 @@ func roundTimestamp(ts float64, precision float64) float64 {
 	return math.Round(ts/precision) * precision
 }
 
+// Makes a reasonable guess at capacity for vector join
+func capacityFor(xvs []*Vector, yvs []*Vector) int {
+	x := len(xvs)
+	y := len(yvs)
+
+	if x >= y {
+		return x + (y / 4)
+	}
+
+	return y + (x / 4)
+}
+
 // ApplyVectorOp accepts two vectors, synchronizes timestamps, and executes an operation
 // on each vector. See VectorJoinOp for details.
 func ApplyVectorOp(xvs []*Vector, yvs []*Vector, op VectorJoinOp) []*Vector {
-	// round all non-zero timestamps to the nearest 10 second mark
-	for _, yv := range yvs {
-		if yv.Timestamp != 0 {
-			yv.Timestamp = roundTimestamp(yv.Timestamp, 10.0)
-		}
-	}
-	for _, xv := range xvs {
-		if xv.Timestamp != 0 {
-			xv.Timestamp = roundTimestamp(xv.Timestamp, 10.0)
-		}
-	}
-
 	// if xvs is empty, return yvs
 	if xvs == nil || len(xvs) == 0 {
 		return yvs
@@ -42,47 +52,67 @@ func ApplyVectorOp(xvs []*Vector, yvs []*Vector, op VectorJoinOp) []*Vector {
 		return xvs
 	}
 
-	// result contains the final vector slice after joining xvs and yvs
-	var result []*Vector
-
-	// timestamps stores all timestamps present in both vector slices
-	// without duplicates
-	var timestamps []float64
+	// timestamps contains the vector slice after joining xvs and yvs
+	var timestamps []*Vector
 
 	// turn each vector slice into a map of timestamp-to-value so that
 	// values at equal timestamps can be lined-up and summed
-	xMap := make(map[float64]float64)
+	xMap := mapPool.Get()
+	defer mapPool.Put(xMap)
+
 	for _, xv := range xvs {
 		if xv.Timestamp == 0 {
 			continue
 		}
+
+		// round all non-zero timestamps to the nearest 10 second mark
+		xv.Timestamp = roundTimestamp(xv.Timestamp, 10.0)
+
 		xMap[xv.Timestamp] = xv.Value
-		timestamps = append(timestamps, xv.Timestamp)
+		timestamps = append(timestamps, &Vector{
+			Timestamp: xv.Timestamp,
+		})
 	}
-	yMap := make(map[float64]float64)
+
+	yMap := mapPool.Get()
+	defer mapPool.Put(yMap)
+
 	for _, yv := range yvs {
 		if yv.Timestamp == 0 {
 			continue
 		}
+
+		// round all non-zero timestamps to the nearest 10 second mark
+		yv.Timestamp = roundTimestamp(yv.Timestamp, 10.0)
+
 		yMap[yv.Timestamp] = yv.Value
 		if _, ok := xMap[yv.Timestamp]; !ok {
 			// no need to double add, since we'll range over sorted timestamps and check.
-			timestamps = append(timestamps, yv.Timestamp)
+			timestamps = append(timestamps, &Vector{
+				Timestamp: yv.Timestamp,
+			})
 		}
 	}
 
 	// iterate over each timestamp to produce a final op vector slice
-	sort.Float64s(timestamps)
-	for _, t := range timestamps {
-		x, okX := xMap[t]
-		y, okY := yMap[t]
-		sv := &Vector{Timestamp: t}
+	// reuse the existing slice to reduce allocations
+	result := timestamps[:0]
+	for _, sv := range timestamps {
+		x, okX := xMap[sv.Timestamp]
+		y, okY := yMap[sv.Timestamp]
 
 		if op(sv, VectorValue(x, okX), VectorValue(y, okY)) {
 			result = append(result, sv)
 		}
 	}
 
+	// nil out remaining vectors in timestamps to GC
+	for i := len(result); i < len(timestamps); i++ {
+		timestamps[i] = nil
+	}
+
+	sort.Sort(VectorSlice(result))
+
 	return result
 }