2
0

vector.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. package util
  2. import (
  3. "math"
  4. "sort"
  5. )
  6. type Vector struct {
  7. Timestamp float64 `json:"timestamp"`
  8. Value float64 `json:"value"`
  9. }
  10. const MapPoolSize = 4
  11. type VectorSlice []*Vector
  12. func (p VectorSlice) Len() int { return len(p) }
  13. func (p VectorSlice) Less(i, j int) bool { return p[i].Timestamp < p[j].Timestamp }
  14. func (p VectorSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
  15. var mapPool VectorMapPool = NewFlexibleMapPool(MapPoolSize)
  16. // roundTimestamp rounds the given timestamp to the given precision; e.g. a
  17. // timestamp given in seconds, rounded to precision 10, will be rounded
  18. // to the nearest value dividible by 10 (24 goes to 20, but 25 goes to 30).
  19. func roundTimestamp(ts float64, precision float64) float64 {
  20. return math.Round(ts/precision) * precision
  21. }
  22. // Makes a reasonable guess at capacity for vector join
  23. func capacityFor(xvs []*Vector, yvs []*Vector) int {
  24. x := len(xvs)
  25. y := len(yvs)
  26. if x >= y {
  27. return x + (y / 4)
  28. }
  29. return y + (x / 4)
  30. }
  31. // ApplyVectorOp accepts two vectors, synchronizes timestamps, and executes an operation
  32. // on each vector. See VectorJoinOp for details.
  33. func ApplyVectorOp(xvs []*Vector, yvs []*Vector, op VectorJoinOp) []*Vector {
  34. // if xvs is empty, return yvs
  35. if xvs == nil || len(xvs) == 0 {
  36. return yvs
  37. }
  38. // if yvs is empty, return xvs
  39. if yvs == nil || len(yvs) == 0 {
  40. return xvs
  41. }
  42. // timestamps contains the vector slice after joining xvs and yvs
  43. var timestamps []*Vector
  44. // turn each vector slice into a map of timestamp-to-value so that
  45. // values at equal timestamps can be lined-up and summed
  46. xMap := mapPool.Get()
  47. defer mapPool.Put(xMap)
  48. for _, xv := range xvs {
  49. if xv.Timestamp == 0 {
  50. continue
  51. }
  52. // round all non-zero timestamps to the nearest 10 second mark
  53. xv.Timestamp = roundTimestamp(xv.Timestamp, 10.0)
  54. xMap[uint64(xv.Timestamp)] = xv.Value
  55. timestamps = append(timestamps, &Vector{
  56. Timestamp: xv.Timestamp,
  57. })
  58. }
  59. yMap := mapPool.Get()
  60. defer mapPool.Put(yMap)
  61. for _, yv := range yvs {
  62. if yv.Timestamp == 0 {
  63. continue
  64. }
  65. // round all non-zero timestamps to the nearest 10 second mark
  66. yv.Timestamp = roundTimestamp(yv.Timestamp, 10.0)
  67. yMap[uint64(yv.Timestamp)] = yv.Value
  68. if _, ok := xMap[uint64(yv.Timestamp)]; !ok {
  69. // no need to double add, since we'll range over sorted timestamps and check.
  70. timestamps = append(timestamps, &Vector{
  71. Timestamp: yv.Timestamp,
  72. })
  73. }
  74. }
  75. // iterate over each timestamp to produce a final op vector slice
  76. // reuse the existing slice to reduce allocations
  77. result := timestamps[:0]
  78. for _, sv := range timestamps {
  79. x, okX := xMap[uint64(sv.Timestamp)]
  80. y, okY := yMap[uint64(sv.Timestamp)]
  81. if op(sv, VectorValue(x, okX), VectorValue(y, okY)) {
  82. result = append(result, sv)
  83. }
  84. }
  85. // nil out remaining vectors in timestamps to GC
  86. for i := len(result); i < len(timestamps); i++ {
  87. timestamps[i] = nil
  88. }
  89. sort.Sort(VectorSlice(result))
  90. return result
  91. }
  92. // VectorJoinOp is an operation func that accepts a result vector pointer
  93. // for a specific timestamp and two float64 pointers representing the
  94. // input vectors for that timestamp. x or y inputs can be nil, but not
  95. // both. The op should use x and y values to set the Value on the result
  96. // ptr. If a result could not be generated, the op should return false,
  97. // which will omit the vector for the specific timestamp. Otherwise,
  98. // return true denoting a successful op.
  99. type VectorJoinOp func(result *Vector, x *float64, y *float64) bool
  100. // returns a nil ptr or valid float ptr based on the ok bool
  101. func VectorValue(v float64, ok bool) *float64 {
  102. if !ok {
  103. return nil
  104. }
  105. return &v
  106. }
  107. // NormalizeVectorByVector produces a version of xvs (a slice of Vectors)
  108. // which has had its timestamps rounded and its values divided by the values
  109. // of the Vectors of yvs, such that yvs is the "unit" Vector slice.
  110. func NormalizeVectorByVector(xvs []*Vector, yvs []*Vector) []*Vector {
  111. normalizeOp := func(result *Vector, x *float64, y *float64) bool {
  112. if x != nil && y != nil && *y != 0 {
  113. result.Value = *x / *y
  114. } else if x != nil {
  115. result.Value = *x
  116. } else if y != nil {
  117. result.Value = 0
  118. }
  119. return true
  120. }
  121. return ApplyVectorOp(xvs, yvs, normalizeOp)
  122. }