aggregations.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651
  1. package costmodel
  2. import (
  3. "math"
  4. "sort"
  5. "strconv"
  6. "time"
  7. "github.com/kubecost/cost-model/cloud"
  8. prometheusClient "github.com/prometheus/client_golang/api"
  9. "k8s.io/klog"
  10. )
  11. const (
  12. hoursPerDay = 24.0
  13. hoursPerMonth = 730.0
  14. )
  15. type Aggregation struct {
  16. Aggregator string `json:"aggregation"`
  17. Subfields []string `json:"subfields,omitempty"`
  18. Environment string `json:"environment"`
  19. Cluster string `json:"cluster,omitempty"`
  20. CPUAllocationAverage float64 `json:"cpuAllocationAverage"`
  21. CPUAllocationVectors []*Vector `json:"-"`
  22. CPUCost float64 `json:"cpuCost"`
  23. CPUCostVector []*Vector `json:"cpuCostVector,omitempty"`
  24. CPUEfficiency float64 `json:"cpuEfficiency"`
  25. CPURequestedVectors []*Vector `json:"-"`
  26. CPUUsedVectors []*Vector `json:"-"`
  27. Efficiency float64 `json:"efficiency"`
  28. GPUAllocationAverage float64 `json:"gpuAllocationAverage"`
  29. GPUAllocationVectors []*Vector `json:"-"`
  30. GPUCost float64 `json:"gpuCost"`
  31. GPUCostVector []*Vector `json:"gpuCostVector,omitempty"`
  32. RAMAllocationAverage float64 `json:"ramAllocationAverage"`
  33. RAMAllocationVectors []*Vector `json:"-"`
  34. RAMCost float64 `json:"ramCost"`
  35. RAMCostVector []*Vector `json:"ramCostVector,omitempty"`
  36. RAMEfficiency float64 `json:"ramEfficiency"`
  37. RAMRequestedVectors []*Vector `json:"-"`
  38. RAMUsedVectors []*Vector `json:"-"`
  39. PVAllocationAverage float64 `json:"pvAllocationAverage"`
  40. PVAllocationVectors []*Vector `json:"-"`
  41. PVCost float64 `json:"pvCost"`
  42. PVCostVector []*Vector `json:"pvCostVector,omitempty"`
  43. NetworkCost float64 `json:"networkCost"`
  44. NetworkCostVector []*Vector `json:"networkCostVector,omitempty"`
  45. SharedCost float64 `json:"sharedCost"`
  46. TotalCost float64 `json:"totalCost"`
  47. }
  48. func (a *Aggregation) GetDataCount() int {
  49. length := 0
  50. if length < len(a.CPUCostVector) {
  51. length = len(a.CPUCostVector)
  52. }
  53. if length < len(a.RAMCostVector) {
  54. length = len(a.RAMCostVector)
  55. }
  56. if length < len(a.PVCostVector) {
  57. length = len(a.PVCostVector)
  58. }
  59. if length < len(a.GPUCostVector) {
  60. length = len(a.GPUCostVector)
  61. }
  62. if length < len(a.NetworkCostVector) {
  63. length = len(a.NetworkCostVector)
  64. }
  65. return length
  66. }
  67. type SharedResourceInfo struct {
  68. ShareResources bool
  69. SharedNamespace map[string]bool
  70. LabelSelectors map[string]string
  71. }
  72. func (s *SharedResourceInfo) IsSharedResource(costDatum *CostData) bool {
  73. if _, ok := s.SharedNamespace[costDatum.Namespace]; ok {
  74. return true
  75. }
  76. for labelName, labelValue := range s.LabelSelectors {
  77. if val, ok := costDatum.Labels[labelName]; ok {
  78. if val == labelValue {
  79. return true
  80. }
  81. }
  82. }
  83. return false
  84. }
  85. func NewSharedResourceInfo(shareResources bool, sharedNamespaces []string, labelnames []string, labelvalues []string) *SharedResourceInfo {
  86. sr := &SharedResourceInfo{
  87. ShareResources: shareResources,
  88. SharedNamespace: make(map[string]bool),
  89. LabelSelectors: make(map[string]string),
  90. }
  91. for _, ns := range sharedNamespaces {
  92. sr.SharedNamespace[ns] = true
  93. }
  94. sr.SharedNamespace["kube-system"] = true // kube-system should be split by default
  95. for i := range labelnames {
  96. sr.LabelSelectors[labelnames[i]] = labelvalues[i]
  97. }
  98. return sr
  99. }
  100. func ComputeIdleCoefficient(costData map[string]*CostData, cli prometheusClient.Client, cp cloud.Provider, discount float64, windowString, offset, resolution string) (map[string]float64, error) {
  101. coefficients := make(map[string]float64)
  102. windowDuration, err := time.ParseDuration(windowString)
  103. if err != nil {
  104. return nil, err
  105. }
  106. resolutionDuration, err := parseDuration(resolution)
  107. resolutionCoefficient := resolutionDuration.Hours()
  108. allTotals, err := ClusterCostsForAllClusters(cli, cp, windowString, offset)
  109. if err != nil {
  110. return nil, err
  111. }
  112. for cid, totals := range allTotals {
  113. klog.Infof("%s: %+v", cid, totals)
  114. if !(len(totals.CPUCost) > 0 && len(totals.MemCost) > 0 && len(totals.StorageCost) > 0) {
  115. klog.V(1).Infof("WARNING: NO DATA FOR CLUSTER %s. Is it emitting data?", cid)
  116. coefficients[cid] = 1.0
  117. continue
  118. }
  119. cpuCost, err := strconv.ParseFloat(totals.CPUCost[0][1], 64)
  120. if err != nil {
  121. return nil, err
  122. }
  123. memCost, err := strconv.ParseFloat(totals.MemCost[0][1], 64)
  124. if err != nil {
  125. return nil, err
  126. }
  127. storageCost, err := strconv.ParseFloat(totals.StorageCost[0][1], 64)
  128. if err != nil {
  129. return nil, err
  130. }
  131. totalClusterCost := (cpuCost * (1 - discount)) + (memCost * (1 - discount)) + storageCost
  132. if err != nil || totalClusterCost == 0.0 {
  133. return nil, err
  134. }
  135. totalClusterCostOverWindow := (totalClusterCost / 730) * windowDuration.Hours()
  136. totalContainerCost := 0.0
  137. for _, costDatum := range costData {
  138. if costDatum.ClusterID == cid {
  139. cpuv, ramv, gpuv, pvvs, _ := getPriceVectors(cp, costDatum, "", discount, 1)
  140. totalContainerCost += totalVectors(cpuv) * resolutionCoefficient
  141. totalContainerCost += totalVectors(ramv) * resolutionCoefficient
  142. totalContainerCost += totalVectors(gpuv) * resolutionCoefficient
  143. for _, pv := range pvvs {
  144. totalContainerCost += totalVectors(pv) * resolutionCoefficient
  145. }
  146. }
  147. }
  148. coefficients[cid] = totalContainerCost / totalClusterCostOverWindow
  149. }
  150. return coefficients, nil
  151. }
  152. // AggregationOptions provides optional parameters to AggregateCostData, allowing callers to perform more complex operations
  153. type AggregationOptions struct {
  154. DataCount int // number of cost data points expected; ensures proper rate calculation if data is incomplete
  155. Discount float64 // percent by which to discount CPU, RAM, and GPU cost
  156. IdleCoefficients map[string]float64 // scales costs by amount of idle resources on a per-cluster basis
  157. IncludeEfficiency bool // set to true to receive efficiency/usage data
  158. IncludeTimeSeries bool // set to true to receive time series data
  159. Rate string // set to "hourly", "daily", or "monthly" to receive cost rate, rather than cumulative cost
  160. ResolutionCoefficient float64 // coefficient for converting hourly costs to per-resolution cost; e.g. 6 for a 6h resolution
  161. SharedResourceInfo *SharedResourceInfo
  162. }
  163. // AggregateCostData aggregates raw cost data by field; e.g. namespace, cluster, service, or label. In the case of label, callers
  164. // must pass a slice of subfields indicating the labels by which to group. Provider is used to define custom resource pricing.
  165. // See AggregationOptions for optional parameters.
  166. func AggregateCostData(costData map[string]*CostData, field string, subfields []string, cp cloud.Provider, opts *AggregationOptions) map[string]*Aggregation {
  167. dataCount := opts.DataCount
  168. discount := opts.Discount
  169. idleCoefficients := opts.IdleCoefficients
  170. includeTimeSeries := opts.IncludeTimeSeries
  171. includeEfficiency := opts.IncludeEfficiency
  172. rate := opts.Rate
  173. sr := opts.SharedResourceInfo
  174. if idleCoefficients == nil {
  175. idleCoefficients = make(map[string]float64)
  176. }
  177. // resolution coefficient compensates for less-frequent-than-hourly samples by multiplying
  178. // cumulative values by the hours between samples. does not apply to rate data and defaults
  179. // to 1.0, which matches hourly sampling of hourly data.
  180. resolutionCoefficient := opts.ResolutionCoefficient
  181. if resolutionCoefficient == 0.0 || rate != "" {
  182. resolutionCoefficient = 1.0
  183. }
  184. // aggregations collects key-value pairs of resource group-to-aggregated data
  185. // e.g. namespace-to-data or label-value-to-data
  186. aggregations := make(map[string]*Aggregation)
  187. // sharedResourceCost is the running total cost of resources that should be reported
  188. // as shared across all other resources, rather than reported as a stand-alone category
  189. sharedResourceCost := 0.0
  190. for _, costDatum := range costData {
  191. idleCoefficient, ok := idleCoefficients[costDatum.ClusterID]
  192. if !ok {
  193. idleCoefficient = 1.0
  194. }
  195. if sr != nil && sr.ShareResources && sr.IsSharedResource(costDatum) {
  196. cpuv, ramv, gpuv, pvvs, netv := getPriceVectors(cp, costDatum, rate, discount, idleCoefficient)
  197. sharedResourceCost += totalVectors(cpuv) * resolutionCoefficient
  198. sharedResourceCost += totalVectors(ramv) * resolutionCoefficient
  199. sharedResourceCost += totalVectors(gpuv) * resolutionCoefficient
  200. sharedResourceCost += totalVectors(netv)
  201. for _, pv := range pvvs {
  202. sharedResourceCost += totalVectors(pv) * resolutionCoefficient
  203. }
  204. } else {
  205. if field == "cluster" {
  206. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.ClusterID, discount, idleCoefficient)
  207. } else if field == "namespace" {
  208. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace, discount, idleCoefficient)
  209. } else if field == "service" {
  210. if len(costDatum.Services) > 0 {
  211. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace+"/"+costDatum.Services[0], discount, idleCoefficient)
  212. }
  213. } else if field == "deployment" {
  214. if len(costDatum.Deployments) > 0 {
  215. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace+"/"+costDatum.Deployments[0], discount, idleCoefficient)
  216. }
  217. } else if field == "daemonset" {
  218. if len(costDatum.Daemonsets) > 0 {
  219. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace+"/"+costDatum.Daemonsets[0], discount, idleCoefficient)
  220. }
  221. } else if field == "label" {
  222. if costDatum.Labels != nil {
  223. for _, sf := range subfields {
  224. if subfieldName, ok := costDatum.Labels[sf]; ok {
  225. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, subfieldName, discount, idleCoefficient)
  226. break
  227. }
  228. }
  229. }
  230. } else if field == "pod" {
  231. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace+"/"+costDatum.PodName, discount, idleCoefficient)
  232. }
  233. }
  234. }
  235. for _, agg := range aggregations {
  236. agg.CPUCost = totalVectors(agg.CPUCostVector) * resolutionCoefficient
  237. agg.RAMCost = totalVectors(agg.RAMCostVector) * resolutionCoefficient
  238. agg.GPUCost = totalVectors(agg.GPUCostVector) * resolutionCoefficient
  239. agg.PVCost = totalVectors(agg.PVCostVector) * resolutionCoefficient
  240. agg.NetworkCost = totalVectors(agg.NetworkCostVector)
  241. agg.SharedCost = sharedResourceCost / float64(len(aggregations))
  242. if dataCount == 0 {
  243. dataCount = agg.GetDataCount()
  244. }
  245. if rate != "" && dataCount > 0 {
  246. agg.CPUCost /= float64(dataCount)
  247. agg.RAMCost /= float64(dataCount)
  248. agg.GPUCost /= float64(dataCount)
  249. agg.PVCost /= float64(dataCount)
  250. agg.NetworkCost /= float64(dataCount)
  251. agg.SharedCost /= float64(dataCount)
  252. }
  253. agg.TotalCost = agg.CPUCost + agg.RAMCost + agg.GPUCost + agg.PVCost + agg.NetworkCost + agg.SharedCost
  254. agg.CPUAllocationAverage = averageVectors(agg.CPUAllocationVectors)
  255. agg.GPUAllocationAverage = averageVectors(agg.GPUAllocationVectors)
  256. agg.RAMAllocationAverage = averageVectors(agg.RAMAllocationVectors)
  257. agg.PVAllocationAverage = averageVectors(agg.PVAllocationVectors)
  258. if includeEfficiency {
  259. // Default both RAM and CPU to 100% efficiency so that a 0-requested, 0-allocated, 0-used situation
  260. // returns 100% efficiency, which should be a red-flag.
  261. //
  262. // If non-zero numbers are available, then efficiency is defined as:
  263. // idlePercentage = (requested - used) / allocated
  264. // efficiency = (1.0 - idlePercentage)
  265. //
  266. // It is possible to score > 100% efficiency, which is meant to be interpreted as a red flag.
  267. // It is not possible to score < 0% efficiency.
  268. agg.CPUEfficiency = 1.0
  269. CPUIdle := 0.0
  270. if agg.CPUAllocationAverage > 0.0 {
  271. avgCPURequested := averageVectors(agg.CPURequestedVectors)
  272. avgCPUUsed := averageVectors(agg.CPUUsedVectors)
  273. CPUIdle = ((avgCPURequested - avgCPUUsed) / agg.CPUAllocationAverage)
  274. agg.CPUEfficiency = 1.0 - CPUIdle
  275. }
  276. agg.RAMEfficiency = 1.0
  277. RAMIdle := 0.0
  278. if agg.RAMAllocationAverage > 0.0 {
  279. avgRAMRequested := averageVectors(agg.RAMRequestedVectors)
  280. avgRAMUsed := averageVectors(agg.RAMUsedVectors)
  281. RAMIdle = ((avgRAMRequested - avgRAMUsed) / agg.RAMAllocationAverage)
  282. agg.RAMEfficiency = 1.0 - RAMIdle
  283. }
  284. // Score total efficiency by the sum of CPU and RAM efficiency, weighted by their
  285. // respective total costs.
  286. agg.Efficiency = 1.0
  287. if (agg.CPUCost + agg.RAMCost) > 0 {
  288. agg.Efficiency = 1.0 - ((agg.CPUCost*CPUIdle)+(agg.RAMCost*RAMIdle))/(agg.CPUCost+agg.RAMCost)
  289. }
  290. }
  291. // convert RAM from bytes to GiB
  292. agg.RAMAllocationAverage = agg.RAMAllocationAverage / 1024 / 1024 / 1024
  293. // convert storage from bytes to GiB
  294. agg.PVAllocationAverage = agg.PVAllocationAverage / 1024 / 1024 / 1024
  295. // remove time series data if it is not explicitly requested
  296. if !includeTimeSeries {
  297. agg.CPUCostVector = nil
  298. agg.RAMCostVector = nil
  299. agg.GPUCostVector = nil
  300. agg.PVCostVector = nil
  301. agg.NetworkCostVector = nil
  302. }
  303. }
  304. return aggregations
  305. }
  306. func aggregateDatum(cp cloud.Provider, aggregations map[string]*Aggregation, costDatum *CostData, field string, subfields []string, rate string, key string, discount float64, idleCoefficient float64) {
  307. // add new entry to aggregation results if a new key is encountered
  308. if _, ok := aggregations[key]; !ok {
  309. agg := &Aggregation{}
  310. agg.Aggregator = field
  311. if len(subfields) > 0 {
  312. agg.Subfields = subfields
  313. }
  314. agg.Environment = key
  315. aggregations[key] = agg
  316. }
  317. mergeVectors(cp, costDatum, aggregations[key], rate, discount, idleCoefficient)
  318. }
  319. func mergeVectors(cp cloud.Provider, costDatum *CostData, aggregation *Aggregation, rate string, discount float64, idleCoefficient float64) {
  320. aggregation.CPUAllocationVectors = addVectors(costDatum.CPUAllocation, aggregation.CPUAllocationVectors)
  321. aggregation.CPURequestedVectors = addVectors(costDatum.CPUReq, aggregation.CPURequestedVectors)
  322. aggregation.CPUUsedVectors = addVectors(costDatum.CPUUsed, aggregation.CPUUsedVectors)
  323. aggregation.RAMAllocationVectors = addVectors(costDatum.RAMAllocation, aggregation.RAMAllocationVectors)
  324. aggregation.RAMRequestedVectors = addVectors(costDatum.RAMReq, aggregation.RAMRequestedVectors)
  325. aggregation.RAMUsedVectors = addVectors(costDatum.RAMUsed, aggregation.RAMUsedVectors)
  326. aggregation.GPUAllocationVectors = addVectors(costDatum.GPUReq, aggregation.GPUAllocationVectors)
  327. for _, pvcd := range costDatum.PVCData {
  328. aggregation.PVAllocationVectors = addVectors(pvcd.Values, aggregation.PVAllocationVectors)
  329. }
  330. cpuv, ramv, gpuv, pvvs, netv := getPriceVectors(cp, costDatum, rate, discount, idleCoefficient)
  331. aggregation.CPUCostVector = addVectors(cpuv, aggregation.CPUCostVector)
  332. aggregation.RAMCostVector = addVectors(ramv, aggregation.RAMCostVector)
  333. aggregation.GPUCostVector = addVectors(gpuv, aggregation.GPUCostVector)
  334. aggregation.NetworkCostVector = addVectors(netv, aggregation.NetworkCostVector)
  335. for _, vectorList := range pvvs {
  336. aggregation.PVCostVector = addVectors(aggregation.PVCostVector, vectorList)
  337. }
  338. }
  339. func getPriceVectors(cp cloud.Provider, costDatum *CostData, rate string, discount float64, idleCoefficient float64) ([]*Vector, []*Vector, []*Vector, [][]*Vector, []*Vector) {
  340. cpuCostStr := costDatum.NodeData.VCPUCost
  341. ramCostStr := costDatum.NodeData.RAMCost
  342. gpuCostStr := costDatum.NodeData.GPUCost
  343. pvCostStr := costDatum.NodeData.StorageCost
  344. // If custom pricing is enabled and can be retrieved, replace
  345. // default cost values with custom values
  346. customPricing, err := cp.GetConfig()
  347. if err != nil {
  348. klog.Errorf("failed to load custom pricing: %s", err)
  349. }
  350. if cloud.CustomPricesEnabled(cp) && err == nil {
  351. if costDatum.NodeData.IsSpot() {
  352. cpuCostStr = customPricing.SpotCPU
  353. ramCostStr = customPricing.SpotRAM
  354. gpuCostStr = customPricing.SpotGPU
  355. } else {
  356. cpuCostStr = customPricing.CPU
  357. ramCostStr = customPricing.RAM
  358. gpuCostStr = customPricing.GPU
  359. }
  360. pvCostStr = customPricing.Storage
  361. }
  362. cpuCost, _ := strconv.ParseFloat(cpuCostStr, 64)
  363. ramCost, _ := strconv.ParseFloat(ramCostStr, 64)
  364. gpuCost, _ := strconv.ParseFloat(gpuCostStr, 64)
  365. pvCost, _ := strconv.ParseFloat(pvCostStr, 64)
  366. // rateCoeff scales the individual time series data values by the appropriate
  367. // number. Each value is, by default, the daily value, so the scales convert
  368. // from daily to the target rate.
  369. rateCoeff := 1.0
  370. switch rate {
  371. case "daily":
  372. rateCoeff = hoursPerDay
  373. case "monthly":
  374. rateCoeff = hoursPerMonth
  375. case "hourly":
  376. default:
  377. }
  378. cpuv := make([]*Vector, 0, len(costDatum.CPUAllocation))
  379. for _, val := range costDatum.CPUAllocation {
  380. cpuv = append(cpuv, &Vector{
  381. Timestamp: math.Round(val.Timestamp/10) * 10,
  382. Value: (val.Value * cpuCost * (1 - discount) / idleCoefficient) * rateCoeff,
  383. })
  384. }
  385. ramv := make([]*Vector, 0, len(costDatum.RAMAllocation))
  386. for _, val := range costDatum.RAMAllocation {
  387. ramv = append(ramv, &Vector{
  388. Timestamp: math.Round(val.Timestamp/10) * 10,
  389. Value: ((val.Value / 1024 / 1024 / 1024) * ramCost * (1 - discount) / idleCoefficient) * rateCoeff,
  390. })
  391. }
  392. gpuv := make([]*Vector, 0, len(costDatum.GPUReq))
  393. for _, val := range costDatum.GPUReq {
  394. gpuv = append(gpuv, &Vector{
  395. Timestamp: math.Round(val.Timestamp/10) * 10,
  396. Value: (val.Value * gpuCost * (1 - discount) / idleCoefficient) * rateCoeff,
  397. })
  398. }
  399. pvvs := make([][]*Vector, 0, len(costDatum.PVCData))
  400. for _, pvcData := range costDatum.PVCData {
  401. pvv := make([]*Vector, 0, len(pvcData.Values))
  402. if pvcData.Volume != nil {
  403. cost, _ := strconv.ParseFloat(pvcData.Volume.Cost, 64)
  404. // override with custom pricing if enabled
  405. if cloud.CustomPricesEnabled(cp) {
  406. cost = pvCost
  407. }
  408. for _, val := range pvcData.Values {
  409. pvv = append(pvv, &Vector{
  410. Timestamp: math.Round(val.Timestamp/10) * 10,
  411. Value: ((val.Value / 1024 / 1024 / 1024) * cost / idleCoefficient) * rateCoeff,
  412. })
  413. }
  414. pvvs = append(pvvs, pvv)
  415. }
  416. }
  417. netv := make([]*Vector, 0, len(costDatum.NetworkData))
  418. for _, val := range costDatum.NetworkData {
  419. netv = append(netv, &Vector{
  420. Timestamp: math.Round(val.Timestamp/10) * 10,
  421. Value: val.Value,
  422. })
  423. }
  424. return cpuv, ramv, gpuv, pvvs, netv
  425. }
  426. func averageVectors(vectors []*Vector) float64 {
  427. if len(vectors) == 0 {
  428. return 0.0
  429. }
  430. return totalVectors(vectors) / float64(len(vectors))
  431. }
  432. func totalVectors(vectors []*Vector) float64 {
  433. total := 0.0
  434. for _, vector := range vectors {
  435. total += vector.Value
  436. }
  437. return total
  438. }
  439. // roundTimestamp rounds the given timestamp to the given precision; e.g. a
  440. // timestamp given in seconds, rounded to precision 10, will be rounded
  441. // to the nearest value dividible by 10 (24 goes to 20, but 25 goes to 30).
  442. func roundTimestamp(ts float64, precision float64) float64 {
  443. return math.Round(ts/precision) * precision
  444. }
  445. // NormalizeVectorByVector produces a version of xvs (a slice of Vectors)
  446. // which has had its timestamps rounded and its values divided by the values
  447. // of the Vectors of yvs, such that yvs is the "unit" Vector slice.
  448. func NormalizeVectorByVector(xvs []*Vector, yvs []*Vector) []*Vector {
  449. // round all non-zero timestamps to the nearest 10 second mark
  450. for _, yv := range yvs {
  451. if yv.Timestamp != 0 {
  452. yv.Timestamp = roundTimestamp(yv.Timestamp, 10.0)
  453. }
  454. }
  455. for _, xv := range xvs {
  456. if xv.Timestamp != 0 {
  457. xv.Timestamp = roundTimestamp(xv.Timestamp, 10.0)
  458. }
  459. }
  460. // if xvs is empty, return yvs
  461. if xvs == nil || len(xvs) == 0 {
  462. return yvs
  463. }
  464. // if yvs is empty, return xvs
  465. if yvs == nil || len(yvs) == 0 {
  466. return xvs
  467. }
  468. // sum stores the sum of the vector slices xvs and yvs
  469. var sum []*Vector
  470. // timestamps stores all timestamps present in both vector slices
  471. // without duplicates
  472. var timestamps []float64
  473. // turn each vector slice into a map of timestamp-to-value so that
  474. // values at equal timestamps can be lined-up and summed
  475. xMap := make(map[float64]float64)
  476. for _, xv := range xvs {
  477. if xv.Timestamp == 0 {
  478. continue
  479. }
  480. xMap[xv.Timestamp] = xv.Value
  481. timestamps = append(timestamps, xv.Timestamp)
  482. }
  483. yMap := make(map[float64]float64)
  484. for _, yv := range yvs {
  485. if yv.Timestamp == 0 {
  486. continue
  487. }
  488. yMap[yv.Timestamp] = yv.Value
  489. if _, ok := xMap[yv.Timestamp]; !ok {
  490. // no need to double add, since we'll range over sorted timestamps and check.
  491. timestamps = append(timestamps, yv.Timestamp)
  492. }
  493. }
  494. // iterate over each timestamp to produce a final normalized vector slice
  495. sort.Float64s(timestamps)
  496. for _, t := range timestamps {
  497. x, okX := xMap[t]
  498. y, okY := yMap[t]
  499. sv := &Vector{Timestamp: t}
  500. if okX && okY && y != 0 {
  501. sv.Value = x / y
  502. } else if okX {
  503. sv.Value = x
  504. } else if okY {
  505. sv.Value = 0
  506. }
  507. sum = append(sum, sv)
  508. }
  509. return sum
  510. }
  511. // addVectors adds two slices of Vectors. Vector timestamps are rounded to the
  512. // nearest ten seconds to allow matching of Vectors within a delta allowance.
  513. // Matching Vectors are summed, while unmatched Vectors are passed through.
  514. // e.g. [(t=1, 1), (t=2, 2)] + [(t=2, 2), (t=3, 3)] = [(t=1, 1), (t=2, 4), (t=3, 3)]
  515. func addVectors(xvs []*Vector, yvs []*Vector) []*Vector {
  516. // round all non-zero timestamps to the nearest 10 second mark
  517. for _, yv := range yvs {
  518. if yv.Timestamp != 0 {
  519. yv.Timestamp = roundTimestamp(yv.Timestamp, 10.0)
  520. }
  521. }
  522. for _, xv := range xvs {
  523. if xv.Timestamp != 0 {
  524. xv.Timestamp = roundTimestamp(xv.Timestamp, 10.0)
  525. }
  526. }
  527. // if xvs is empty, return yvs
  528. if xvs == nil || len(xvs) == 0 {
  529. return yvs
  530. }
  531. // if yvs is empty, return xvs
  532. if yvs == nil || len(yvs) == 0 {
  533. return xvs
  534. }
  535. // sum stores the sum of the vector slices xvs and yvs
  536. var sum []*Vector
  537. // timestamps stores all timestamps present in both vector slices
  538. // without duplicates
  539. var timestamps []float64
  540. // turn each vector slice into a map of timestamp-to-value so that
  541. // values at equal timestamps can be lined-up and summed
  542. xMap := make(map[float64]float64)
  543. for _, xv := range xvs {
  544. if xv.Timestamp == 0 {
  545. continue
  546. }
  547. xMap[xv.Timestamp] = xv.Value
  548. timestamps = append(timestamps, xv.Timestamp)
  549. }
  550. yMap := make(map[float64]float64)
  551. for _, yv := range yvs {
  552. if yv.Timestamp == 0 {
  553. continue
  554. }
  555. yMap[yv.Timestamp] = yv.Value
  556. if _, ok := xMap[yv.Timestamp]; !ok {
  557. // no need to double add, since we'll range over sorted timestamps and check.
  558. timestamps = append(timestamps, yv.Timestamp)
  559. }
  560. }
  561. // iterate over each timestamp to produce a final summed vector slice
  562. sort.Float64s(timestamps)
  563. for _, t := range timestamps {
  564. x, okX := xMap[t]
  565. y, okY := yMap[t]
  566. sv := &Vector{Timestamp: t}
  567. if okX && okY {
  568. sv.Value = x + y
  569. } else if okX {
  570. sv.Value = x
  571. } else if okY {
  572. sv.Value = y
  573. }
  574. sum = append(sum, sv)
  575. }
  576. return sum
  577. }