aggregations.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600
  1. package costmodel
  2. import (
  3. "math"
  4. "sort"
  5. "strconv"
  6. "time"
  7. "github.com/kubecost/cost-model/cloud"
  8. prometheusClient "github.com/prometheus/client_golang/api"
  9. "k8s.io/klog"
  10. )
  11. type Aggregation struct {
  12. Aggregator string `json:"aggregation"`
  13. Subfields []string `json:"subfields,omitempty"`
  14. Environment string `json:"environment"`
  15. Cluster string `json:"cluster,omitempty"`
  16. CPUAllocationVectors []*Vector `json:"-"`
  17. CPUCost float64 `json:"cpuCost"`
  18. CPUCostVector []*Vector `json:"cpuCostVector,omitempty"`
  19. CPUEfficiency float64 `json:"cpuEfficiency"`
  20. CPURequestedVectors []*Vector `json:"-"`
  21. CPUUsedVectors []*Vector `json:"-"`
  22. Efficiency float64 `json:"efficiency"`
  23. GPUAllocation []*Vector `json:"-"`
  24. GPUCost float64 `json:"gpuCost"`
  25. GPUCostVector []*Vector `json:"gpuCostVector,omitempty"`
  26. RAMAllocationVectors []*Vector `json:"-"`
  27. RAMCost float64 `json:"ramCost"`
  28. RAMCostVector []*Vector `json:"ramCostVector,omitempty"`
  29. RAMEfficiency float64 `json:"ramEfficiency"`
  30. RAMRequestedVectors []*Vector `json:"-"`
  31. RAMUsedVectors []*Vector `json:"-"`
  32. PVCost float64 `json:"pvCost"`
  33. PVCostVector []*Vector `json:"pvCostVector,omitempty"`
  34. NetworkCost float64 `json:"networkCost"`
  35. NetworkCostVector []*Vector `json:"networkCostVector,omitempty"`
  36. SharedCost float64 `json:"sharedCost"`
  37. TotalCost float64 `json:"totalCost"`
  38. }
  39. const (
  40. hoursPerDay = 24.0
  41. hoursPerMonth = 730.0
  42. )
  43. type SharedResourceInfo struct {
  44. ShareResources bool
  45. SharedNamespace map[string]bool
  46. LabelSelectors map[string]string
  47. }
  48. func (s *SharedResourceInfo) IsSharedResource(costDatum *CostData) bool {
  49. if _, ok := s.SharedNamespace[costDatum.Namespace]; ok {
  50. return true
  51. }
  52. for labelName, labelValue := range s.LabelSelectors {
  53. if val, ok := costDatum.Labels[labelName]; ok {
  54. if val == labelValue {
  55. return true
  56. }
  57. }
  58. }
  59. return false
  60. }
  61. func NewSharedResourceInfo(shareResources bool, sharedNamespaces []string, labelnames []string, labelvalues []string) *SharedResourceInfo {
  62. sr := &SharedResourceInfo{
  63. ShareResources: shareResources,
  64. SharedNamespace: make(map[string]bool),
  65. LabelSelectors: make(map[string]string),
  66. }
  67. for _, ns := range sharedNamespaces {
  68. sr.SharedNamespace[ns] = true
  69. }
  70. sr.SharedNamespace["kube-system"] = true // kube-system should be split by default
  71. for i := range labelnames {
  72. sr.LabelSelectors[labelnames[i]] = labelvalues[i]
  73. }
  74. return sr
  75. }
  76. func ComputeIdleCoefficient(costData map[string]*CostData, cli prometheusClient.Client, cp cloud.Provider, discount float64, windowString, offset string) (map[string]float64, error) {
  77. coefficients := make(map[string]float64)
  78. windowDuration, err := time.ParseDuration(windowString)
  79. if err != nil {
  80. return nil, err
  81. }
  82. allTotals, err := ClusterCostsForAllClusters(cli, cp, windowString, offset)
  83. if err != nil {
  84. return nil, err
  85. }
  86. for cid, totals := range allTotals {
  87. klog.Infof("%s: %+v", cid, totals)
  88. if !(len(totals.CPUCost) > 0 && len(totals.MemCost) > 0 && len(totals.StorageCost) > 0) {
  89. klog.V(1).Infof("WARNING: NO DATA FOR CLUSTER %s. Is it emitting data?", cid)
  90. coefficients[cid] = 1.0
  91. continue
  92. }
  93. cpuCost, err := strconv.ParseFloat(totals.CPUCost[0][1], 64)
  94. if err != nil {
  95. return nil, err
  96. }
  97. memCost, err := strconv.ParseFloat(totals.MemCost[0][1], 64)
  98. if err != nil {
  99. return nil, err
  100. }
  101. storageCost, err := strconv.ParseFloat(totals.StorageCost[0][1], 64)
  102. if err != nil {
  103. return nil, err
  104. }
  105. totalClusterCost := (cpuCost * (1 - discount)) + (memCost * (1 - discount)) + storageCost
  106. if err != nil || totalClusterCost == 0.0 {
  107. return nil, err
  108. }
  109. totalClusterCostOverWindow := (totalClusterCost / 730) * windowDuration.Hours()
  110. totalContainerCost := 0.0
  111. for _, costDatum := range costData {
  112. if costDatum.ClusterID == cid {
  113. cpuv, ramv, gpuv, pvvs, _ := getPriceVectors(cp, costDatum, "", discount, 1)
  114. totalContainerCost += totalVectors(cpuv)
  115. totalContainerCost += totalVectors(ramv)
  116. totalContainerCost += totalVectors(gpuv)
  117. for _, pv := range pvvs {
  118. totalContainerCost += totalVectors(pv)
  119. }
  120. }
  121. }
  122. coefficients[cid] = totalContainerCost / totalClusterCostOverWindow
  123. }
  124. return coefficients, nil
  125. }
  126. // AggregationOptions provides optional parameters to AggregateCostData, allowing callers to perform more complex operations
  127. type AggregationOptions struct {
  128. DataCount int64 // number of cost data points expected; ensures proper rate calculation if data is incomplete
  129. Discount float64 // percent by which to discount CPU, RAM, and GPU cost
  130. IdleCoefficients map[string]float64 // scales costs by amount of idle resources on a per-cluster basis
  131. IncludeEfficiency bool // set to true to receive efficiency/usage data
  132. IncludeTimeSeries bool // set to true to receive time series data
  133. Rate string // set to "hourly", "daily", or "monthly" to receive cost rate, rather than cumulative cost
  134. SharedResourceInfo *SharedResourceInfo
  135. }
  136. // AggregateCostData aggregates raw cost data by field; e.g. namespace, cluster, service, or label. In the case of label, callers
  137. // must pass a slice of subfields indicating the labels by which to group. Provider is used to define custom resource pricing.
  138. // See AggregationOptions for optional parameters.
  139. // TODO: Can we restructure custom pricing code to allow that to be optional? Having to pass an entire Provider instance is way
  140. // overkill and tightly couples this code to the cloud package.
  141. func AggregateCostData(costData map[string]*CostData, field string, subfields []string, cp cloud.Provider, opts *AggregationOptions) map[string]*Aggregation {
  142. dataCount := opts.DataCount
  143. discount := opts.Discount
  144. idleCoefficients := opts.IdleCoefficients
  145. includeTimeSeries := opts.IncludeTimeSeries
  146. includeEfficiency := opts.IncludeEfficiency
  147. rate := opts.Rate
  148. sr := opts.SharedResourceInfo
  149. if idleCoefficients == nil {
  150. idleCoefficients = make(map[string]float64)
  151. }
  152. // aggregations collects key-value pairs of resource group-to-aggregated data
  153. // e.g. namespace-to-data or label-value-to-data
  154. aggregations := make(map[string]*Aggregation)
  155. // sharedResourceCost is the running total cost of resources that should be reported
  156. // as shared across all other resources, rather than reported as a stand-alone category
  157. sharedResourceCost := 0.0
  158. for _, costDatum := range costData {
  159. idleCoefficient, ok := idleCoefficients[costDatum.ClusterID]
  160. if !ok {
  161. idleCoefficient = 1.0
  162. }
  163. if sr != nil && sr.ShareResources && sr.IsSharedResource(costDatum) {
  164. cpuv, ramv, gpuv, pvvs, netv := getPriceVectors(cp, costDatum, rate, discount, idleCoefficient)
  165. sharedResourceCost += totalVectors(cpuv)
  166. sharedResourceCost += totalVectors(ramv)
  167. sharedResourceCost += totalVectors(gpuv)
  168. sharedResourceCost += totalVectors(netv)
  169. for _, pv := range pvvs {
  170. sharedResourceCost += totalVectors(pv)
  171. }
  172. } else {
  173. if field == "cluster" {
  174. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.ClusterID, discount, idleCoefficient)
  175. } else if field == "namespace" {
  176. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace, discount, idleCoefficient)
  177. } else if field == "service" {
  178. if len(costDatum.Services) > 0 {
  179. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace+"/"+costDatum.Services[0], discount, idleCoefficient)
  180. }
  181. } else if field == "deployment" {
  182. if len(costDatum.Deployments) > 0 {
  183. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace+"/"+costDatum.Deployments[0], discount, idleCoefficient)
  184. }
  185. } else if field == "daemonset" {
  186. if len(costDatum.Daemonsets) > 0 {
  187. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace+"/"+costDatum.Daemonsets[0], discount, idleCoefficient)
  188. }
  189. } else if field == "label" {
  190. if costDatum.Labels != nil {
  191. for _, sf := range subfields {
  192. if subfieldName, ok := costDatum.Labels[sf]; ok {
  193. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, subfieldName, discount, idleCoefficient)
  194. break
  195. }
  196. }
  197. }
  198. } else if field == "pod" {
  199. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace+"/"+costDatum.PodName, discount, idleCoefficient)
  200. }
  201. }
  202. }
  203. for _, agg := range aggregations {
  204. agg.CPUCost = totalVectors(agg.CPUCostVector)
  205. agg.RAMCost = totalVectors(agg.RAMCostVector)
  206. agg.GPUCost = totalVectors(agg.GPUCostVector)
  207. agg.PVCost = totalVectors(agg.PVCostVector)
  208. agg.NetworkCost = totalVectors(agg.NetworkCostVector)
  209. agg.SharedCost = sharedResourceCost / float64(len(aggregations))
  210. if rate != "" && dataCount > 0 {
  211. agg.CPUCost /= float64(dataCount)
  212. agg.RAMCost /= float64(dataCount)
  213. agg.GPUCost /= float64(dataCount)
  214. agg.PVCost /= float64(dataCount)
  215. agg.NetworkCost /= float64(dataCount)
  216. agg.SharedCost /= float64(dataCount)
  217. }
  218. agg.TotalCost = agg.CPUCost + agg.RAMCost + agg.GPUCost + agg.PVCost + agg.NetworkCost + agg.SharedCost
  219. if includeEfficiency {
  220. // Default both RAM and CPU to 100% efficiency so that a 0-requested, 0-allocated, 0-used situation
  221. // returns 100% efficiency, which should be a red-flag.
  222. //
  223. // If non-zero numbers are available, then efficiency is defined as:
  224. // idlePercentage = (requested - used) / allocated
  225. // efficiency = (1.0 - idlePercentage)
  226. //
  227. // It is possible to score > 100% efficiency, which is meant to be interpreted as a red flag.
  228. // It is not possible to score < 0% efficiency.
  229. klog.V(4).Infof("\n\tlen(CPU allocation): %d\n\tlen(CPU requested): %d\n\tlen(CPU used): %d",
  230. len(agg.CPUAllocationVectors),
  231. len(agg.CPURequestedVectors),
  232. len(agg.CPUUsedVectors))
  233. agg.CPUEfficiency = 1.0
  234. CPUIdle := 0.0
  235. avgCPUAllocation := totalVectors(agg.CPUAllocationVectors) / float64(len(agg.CPUAllocationVectors))
  236. if avgCPUAllocation > 0.0 {
  237. avgCPURequested := averageVectors(agg.CPURequestedVectors)
  238. avgCPUUsed := averageVectors(agg.CPUUsedVectors)
  239. CPUIdle = ((avgCPURequested - avgCPUUsed) / avgCPUAllocation)
  240. agg.CPUEfficiency = 1.0 - CPUIdle
  241. }
  242. klog.V(4).Infof("\n\tlen(RAM allocation): %d\n\tlen(RAM requested): %d\n\tlen(RAM used): %d",
  243. len(agg.RAMAllocationVectors),
  244. len(agg.RAMRequestedVectors),
  245. len(agg.RAMUsedVectors))
  246. agg.RAMEfficiency = 1.0
  247. RAMIdle := 0.0
  248. avgRAMAllocation := totalVectors(agg.RAMAllocationVectors) / float64(len(agg.RAMAllocationVectors))
  249. if avgRAMAllocation > 0.0 {
  250. avgRAMRequested := averageVectors(agg.RAMRequestedVectors)
  251. avgRAMUsed := averageVectors(agg.RAMUsedVectors)
  252. RAMIdle = ((avgRAMRequested - avgRAMUsed) / avgRAMAllocation)
  253. agg.RAMEfficiency = 1.0 - RAMIdle
  254. }
  255. // Score total efficiency by the sum of CPU and RAM efficiency, weighted by their
  256. // respective total costs.
  257. agg.Efficiency = 1.0
  258. if (agg.CPUCost + agg.RAMCost) > 0 {
  259. agg.Efficiency = 1.0 - ((agg.CPUCost*CPUIdle)+(agg.RAMCost*RAMIdle))/(agg.CPUCost+agg.RAMCost)
  260. }
  261. }
  262. // remove time series data if it is not explicitly requested
  263. if !includeTimeSeries {
  264. agg.CPUCostVector = nil
  265. agg.RAMCostVector = nil
  266. agg.GPUCostVector = nil
  267. agg.PVCostVector = nil
  268. agg.NetworkCostVector = nil
  269. }
  270. }
  271. return aggregations
  272. }
  273. func aggregateDatum(cp cloud.Provider, aggregations map[string]*Aggregation, costDatum *CostData, field string, subfields []string, rate string, key string, discount float64, idleCoefficient float64) {
  274. // add new entry to aggregation results if a new key is encountered
  275. if _, ok := aggregations[key]; !ok {
  276. agg := &Aggregation{}
  277. agg.Aggregator = field
  278. if len(subfields) > 0 {
  279. agg.Subfields = subfields
  280. }
  281. agg.Environment = key
  282. aggregations[key] = agg
  283. }
  284. mergeVectors(cp, costDatum, aggregations[key], rate, discount, idleCoefficient)
  285. }
  286. func mergeVectors(cp cloud.Provider, costDatum *CostData, aggregation *Aggregation, rate string, discount float64, idleCoefficient float64) {
  287. aggregation.CPUAllocationVectors = addVectors(costDatum.CPUAllocation, aggregation.CPUAllocationVectors)
  288. aggregation.CPURequestedVectors = addVectors(costDatum.CPUReq, aggregation.CPURequestedVectors)
  289. aggregation.CPUUsedVectors = addVectors(costDatum.CPUUsed, aggregation.CPUUsedVectors)
  290. aggregation.RAMAllocationVectors = addVectors(costDatum.RAMAllocation, aggregation.RAMAllocationVectors)
  291. aggregation.RAMRequestedVectors = addVectors(costDatum.RAMReq, aggregation.RAMRequestedVectors)
  292. aggregation.RAMUsedVectors = addVectors(costDatum.RAMUsed, aggregation.RAMUsedVectors)
  293. aggregation.GPUAllocation = addVectors(costDatum.GPUReq, aggregation.GPUAllocation)
  294. cpuv, ramv, gpuv, pvvs, netv := getPriceVectors(cp, costDatum, rate, discount, idleCoefficient)
  295. aggregation.CPUCostVector = addVectors(cpuv, aggregation.CPUCostVector)
  296. aggregation.RAMCostVector = addVectors(ramv, aggregation.RAMCostVector)
  297. aggregation.GPUCostVector = addVectors(gpuv, aggregation.GPUCostVector)
  298. aggregation.NetworkCostVector = addVectors(netv, aggregation.NetworkCostVector)
  299. for _, vectorList := range pvvs {
  300. aggregation.PVCostVector = addVectors(aggregation.PVCostVector, vectorList)
  301. }
  302. }
  303. func getPriceVectors(cp cloud.Provider, costDatum *CostData, rate string, discount float64, idleCoefficient float64) ([]*Vector, []*Vector, []*Vector, [][]*Vector, []*Vector) {
  304. cpuCostStr := costDatum.NodeData.VCPUCost
  305. ramCostStr := costDatum.NodeData.RAMCost
  306. gpuCostStr := costDatum.NodeData.GPUCost
  307. pvCostStr := costDatum.NodeData.StorageCost
  308. // If custom pricing is enabled and can be retrieved, replace
  309. // default cost values with custom values
  310. customPricing, err := cp.GetConfig()
  311. if err != nil {
  312. klog.Errorf("failed to load custom pricing: %s", err)
  313. }
  314. if cloud.CustomPricesEnabled(cp) && err == nil {
  315. if costDatum.NodeData.IsSpot() {
  316. cpuCostStr = customPricing.SpotCPU
  317. ramCostStr = customPricing.SpotRAM
  318. gpuCostStr = customPricing.SpotGPU
  319. } else {
  320. cpuCostStr = customPricing.CPU
  321. ramCostStr = customPricing.RAM
  322. gpuCostStr = customPricing.GPU
  323. }
  324. pvCostStr = customPricing.Storage
  325. }
  326. cpuCost, _ := strconv.ParseFloat(cpuCostStr, 64)
  327. ramCost, _ := strconv.ParseFloat(ramCostStr, 64)
  328. gpuCost, _ := strconv.ParseFloat(gpuCostStr, 64)
  329. pvCost, _ := strconv.ParseFloat(pvCostStr, 64)
  330. // rateCoeff scales the individual time series data values by the appropriate
  331. // number. Each value is, by default, the daily value, so the scales convert
  332. // from daily to the target rate.
  333. rateCoeff := 1.0
  334. switch rate {
  335. case "daily":
  336. rateCoeff = hoursPerDay
  337. case "monthly":
  338. rateCoeff = hoursPerMonth
  339. case "hourly":
  340. default:
  341. }
  342. cpuv := make([]*Vector, 0, len(costDatum.CPUAllocation))
  343. for _, val := range costDatum.CPUAllocation {
  344. cpuv = append(cpuv, &Vector{
  345. Timestamp: math.Round(val.Timestamp/10) * 10,
  346. Value: (val.Value * cpuCost * (1 - discount) / idleCoefficient) * rateCoeff,
  347. })
  348. }
  349. ramv := make([]*Vector, 0, len(costDatum.RAMAllocation))
  350. for _, val := range costDatum.RAMAllocation {
  351. ramv = append(ramv, &Vector{
  352. Timestamp: math.Round(val.Timestamp/10) * 10,
  353. Value: ((val.Value / 1024 / 1024 / 1024) * ramCost * (1 - discount) / idleCoefficient) * rateCoeff,
  354. })
  355. }
  356. gpuv := make([]*Vector, 0, len(costDatum.GPUReq))
  357. for _, val := range costDatum.GPUReq {
  358. gpuv = append(gpuv, &Vector{
  359. Timestamp: math.Round(val.Timestamp/10) * 10,
  360. Value: (val.Value * gpuCost * (1 - discount) / idleCoefficient) * rateCoeff,
  361. })
  362. }
  363. pvvs := make([][]*Vector, 0, len(costDatum.PVCData))
  364. for _, pvcData := range costDatum.PVCData {
  365. pvv := make([]*Vector, 0, len(pvcData.Values))
  366. if pvcData.Volume != nil {
  367. cost, _ := strconv.ParseFloat(pvcData.Volume.Cost, 64)
  368. // override with custom pricing if enabled
  369. if cloud.CustomPricesEnabled(cp) {
  370. cost = pvCost
  371. }
  372. for _, val := range pvcData.Values {
  373. pvv = append(pvv, &Vector{
  374. Timestamp: math.Round(val.Timestamp/10) * 10,
  375. Value: ((val.Value / 1024 / 1024 / 1024) * cost / idleCoefficient) * rateCoeff,
  376. })
  377. }
  378. pvvs = append(pvvs, pvv)
  379. }
  380. }
  381. netv := costDatum.NetworkData
  382. return cpuv, ramv, gpuv, pvvs, netv
  383. }
  384. func averageVectors(vectors []*Vector) float64 {
  385. if len(vectors) == 0 {
  386. return 0.0
  387. }
  388. return totalVectors(vectors) / float64(len(vectors))
  389. }
  390. func totalVectors(vectors []*Vector) float64 {
  391. total := 0.0
  392. for _, vector := range vectors {
  393. total += vector.Value
  394. }
  395. return total
  396. }
  397. // roundTimestamp rounds the given timestamp to the given precision; e.g. a
  398. // timestamp given in seconds, rounded to precision 10, will be rounded
  399. // to the nearest value dividible by 10 (24 goes to 20, but 25 goes to 30).
  400. func roundTimestamp(ts float64, precision float64) float64 {
  401. return math.Round(ts/precision) * precision
  402. }
  403. func NormalizeVectorByVector(xvs []*Vector, yvs []*Vector) []*Vector {
  404. // round all non-zero timestamps to the nearest 10 second mark
  405. for _, yv := range yvs {
  406. if yv.Timestamp != 0 {
  407. yv.Timestamp = roundTimestamp(yv.Timestamp, 10.0)
  408. }
  409. }
  410. for _, xv := range xvs {
  411. if xv.Timestamp != 0 {
  412. xv.Timestamp = roundTimestamp(xv.Timestamp, 10.0)
  413. }
  414. }
  415. // if xvs is empty, return yvs
  416. if xvs == nil || len(xvs) == 0 {
  417. return yvs
  418. }
  419. // if yvs is empty, return xvs
  420. if yvs == nil || len(yvs) == 0 {
  421. return xvs
  422. }
  423. // sum stores the sum of the vector slices xvs and yvs
  424. var sum []*Vector
  425. // timestamps stores all timestamps present in both vector slices
  426. // without duplicates
  427. var timestamps []float64
  428. // turn each vector slice into a map of timestamp-to-value so that
  429. // values at equal timestamps can be lined-up and summed
  430. xMap := make(map[float64]float64)
  431. for _, xv := range xvs {
  432. if xv.Timestamp == 0 {
  433. continue
  434. }
  435. xMap[xv.Timestamp] = xv.Value
  436. timestamps = append(timestamps, xv.Timestamp)
  437. }
  438. yMap := make(map[float64]float64)
  439. for _, yv := range yvs {
  440. if yv.Timestamp == 0 {
  441. continue
  442. }
  443. yMap[yv.Timestamp] = yv.Value
  444. if _, ok := xMap[yv.Timestamp]; !ok {
  445. // no need to double add, since we'll range over sorted timestamps and check.
  446. timestamps = append(timestamps, yv.Timestamp)
  447. }
  448. }
  449. // iterate over each timestamp to produce a final summed vector slice
  450. sort.Float64s(timestamps)
  451. for _, t := range timestamps {
  452. x, okX := xMap[t]
  453. y, okY := yMap[t]
  454. sv := &Vector{Timestamp: t}
  455. if okX && okY && y != 0 {
  456. sv.Value = x / y
  457. } else if okX {
  458. sv.Value = x
  459. } else if okY {
  460. sv.Value = 0
  461. }
  462. sum = append(sum, sv)
  463. }
  464. return sum
  465. }
  466. // addVectors adds two slices of Vectors. Vector timestamps are rounded to the
  467. // nearest ten seconds to allow matching of Vectors within a delta allowance.
  468. // Matching Vectors are summed, while unmatched Vectors are passed through.
  469. // e.g. [(t=1, 1), (t=2, 2)] + [(t=2, 2), (t=3, 3)] = [(t=1, 1), (t=2, 4), (t=3, 3)]
  470. func addVectors(xvs []*Vector, yvs []*Vector) []*Vector {
  471. // round all non-zero timestamps to the nearest 10 second mark
  472. for _, yv := range yvs {
  473. if yv.Timestamp != 0 {
  474. yv.Timestamp = roundTimestamp(yv.Timestamp, 10.0)
  475. }
  476. }
  477. for _, xv := range xvs {
  478. if xv.Timestamp != 0 {
  479. xv.Timestamp = roundTimestamp(xv.Timestamp, 10.0)
  480. }
  481. }
  482. // if xvs is empty, return yvs
  483. if xvs == nil || len(xvs) == 0 {
  484. return yvs
  485. }
  486. // if yvs is empty, return xvs
  487. if yvs == nil || len(yvs) == 0 {
  488. return xvs
  489. }
  490. // sum stores the sum of the vector slices xvs and yvs
  491. var sum []*Vector
  492. // timestamps stores all timestamps present in both vector slices
  493. // without duplicates
  494. var timestamps []float64
  495. // turn each vector slice into a map of timestamp-to-value so that
  496. // values at equal timestamps can be lined-up and summed
  497. xMap := make(map[float64]float64)
  498. for _, xv := range xvs {
  499. if xv.Timestamp == 0 {
  500. continue
  501. }
  502. xMap[xv.Timestamp] = xv.Value
  503. timestamps = append(timestamps, xv.Timestamp)
  504. }
  505. yMap := make(map[float64]float64)
  506. for _, yv := range yvs {
  507. if yv.Timestamp == 0 {
  508. continue
  509. }
  510. yMap[yv.Timestamp] = yv.Value
  511. if _, ok := xMap[yv.Timestamp]; !ok {
  512. // no need to double add, since we'll range over sorted timestamps and check.
  513. timestamps = append(timestamps, yv.Timestamp)
  514. }
  515. }
  516. // iterate over each timestamp to produce a final summed vector slice
  517. sort.Float64s(timestamps)
  518. for _, t := range timestamps {
  519. x, okX := xMap[t]
  520. y, okY := yMap[t]
  521. sv := &Vector{Timestamp: t}
  522. if okX && okY {
  523. sv.Value = x + y
  524. } else if okX {
  525. sv.Value = x
  526. } else if okY {
  527. sv.Value = y
  528. }
  529. sum = append(sum, sv)
  530. }
  531. return sum
  532. }