summaryallocation.go 39 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232
  1. package kubecost
  2. import (
  3. "errors"
  4. "fmt"
  5. "strings"
  6. "sync"
  7. "time"
  8. "github.com/kubecost/cost-model/pkg/log"
  9. )
  10. // SummaryAllocation summarizes an Allocation, keeping only fields necessary
  11. // for providing a high-level view of identifying the Allocation over a period
  12. // of time (Start, End) over which it ran, and inspecting the associated per-
  13. // resource costs (subtotaled with adjustments), total cost, and efficiency.
  14. //
  15. // SummaryAllocation does not have a concept of Window (i.e. the time period
  16. // within which it is defined, as opposed to the Start and End times). That
  17. // context must be provided by a SummaryAllocationSet.
  18. type SummaryAllocation struct {
  19. Name string `json:"name"`
  20. Properties *AllocationProperties `json:"-"`
  21. Start time.Time `json:"start"`
  22. End time.Time `json:"end"`
  23. CPUCoreRequestAverage float64 `json:"cpuCoreRequestAverage"`
  24. CPUCoreUsageAverage float64 `json:"cpuCoreUsageAverage"`
  25. CPUCost float64 `json:"cpuCost"`
  26. GPUCost float64 `json:"gpuCost"`
  27. NetworkCost float64 `json:"networkCost"`
  28. LoadBalancerCost float64 `json:"loadBalancerCost"`
  29. PVCost float64 `json:"pvCost"`
  30. RAMBytesRequestAverage float64 `json:"ramByteRequestAverage"`
  31. RAMBytesUsageAverage float64 `json:"ramByteUsageAverage"`
  32. RAMCost float64 `json:"ramCost"`
  33. SharedCost float64 `json:"sharedCost"`
  34. ExternalCost float64 `json:"externalCost"`
  35. Share bool `json:"-"`
  36. }
  37. // NewSummaryAllocation converts an Allocation to a SummaryAllocation by
  38. // dropping unnecessary fields and consolidating others (e.g. adjustments).
  39. // Reconciliation happens here because that process is synonymous with the
  40. // consolidation of adjustment fields.
  41. func NewSummaryAllocation(alloc *Allocation, reconcile, reconcileNetwork bool) *SummaryAllocation {
  42. if alloc == nil {
  43. return nil
  44. }
  45. sa := &SummaryAllocation{
  46. Name: alloc.Name,
  47. Properties: alloc.Properties.Clone(),
  48. Start: alloc.Start,
  49. End: alloc.End,
  50. CPUCoreRequestAverage: alloc.CPUCoreRequestAverage,
  51. CPUCoreUsageAverage: alloc.CPUCoreUsageAverage,
  52. CPUCost: alloc.CPUCost + alloc.CPUCostAdjustment,
  53. GPUCost: alloc.GPUCost + alloc.GPUCostAdjustment,
  54. NetworkCost: alloc.NetworkCost + alloc.NetworkCostAdjustment,
  55. LoadBalancerCost: alloc.LoadBalancerCost + alloc.LoadBalancerCostAdjustment,
  56. PVCost: alloc.PVCost() + alloc.PVCostAdjustment,
  57. RAMBytesRequestAverage: alloc.RAMBytesRequestAverage,
  58. RAMBytesUsageAverage: alloc.RAMBytesUsageAverage,
  59. RAMCost: alloc.RAMCost + alloc.RAMCostAdjustment,
  60. SharedCost: alloc.SharedCost,
  61. ExternalCost: alloc.ExternalCost,
  62. }
  63. // Revert adjustments if reconciliation is off. If only network
  64. // reconciliation is off, only revert network adjustment.
  65. if !reconcile {
  66. sa.CPUCost -= alloc.CPUCostAdjustment
  67. sa.GPUCost -= alloc.GPUCostAdjustment
  68. sa.NetworkCost -= alloc.NetworkCostAdjustment
  69. sa.LoadBalancerCost -= alloc.LoadBalancerCostAdjustment
  70. sa.PVCost -= alloc.PVCostAdjustment
  71. sa.RAMCost -= alloc.RAMCostAdjustment
  72. } else if !reconcileNetwork {
  73. sa.NetworkCost -= alloc.NetworkCostAdjustment
  74. }
  75. return sa
  76. }
  77. // Add sums two SummaryAllocations, adding the given SummaryAllocation to the
  78. // receiving one, thus mutating the receiver. For performance reasons, it
  79. // simply drops Properties, so a SummaryAllocation can only be Added once.
  80. func (sa *SummaryAllocation) Add(that *SummaryAllocation) error {
  81. if sa == nil || that == nil {
  82. return errors.New("cannot Add a nil SummaryAllocation")
  83. }
  84. if sa.Properties == nil {
  85. return errors.New("cannot Add a SummaryAllocation without Properties")
  86. }
  87. // Once Added, a SummaryAllocation has no Properties, preventing it from
  88. // being Added a second time. This saves us from having to compute the
  89. // intersection of two sets of Properties, which is expensive.
  90. sa.Properties = nil
  91. // Sum non-cumulative fields by turning them into cumulative, adding them,
  92. // and then converting them back into averages after minutes have been
  93. // combined (just below).
  94. cpuReqCoreMins := sa.CPUCoreRequestAverage * sa.Minutes()
  95. cpuReqCoreMins += that.CPUCoreRequestAverage * that.Minutes()
  96. cpuUseCoreMins := sa.CPUCoreUsageAverage * sa.Minutes()
  97. cpuUseCoreMins += that.CPUCoreUsageAverage * that.Minutes()
  98. ramReqByteMins := sa.RAMBytesRequestAverage * sa.Minutes()
  99. ramReqByteMins += that.RAMBytesRequestAverage * that.Minutes()
  100. ramUseByteMins := sa.RAMBytesUsageAverage * sa.Minutes()
  101. ramUseByteMins += that.RAMBytesUsageAverage * that.Minutes()
  102. // Expand Start and End to be the "max" of among the given Allocations
  103. if that.Start.Before(sa.Start) {
  104. sa.Start = that.Start
  105. }
  106. if that.End.After(sa.End) {
  107. sa.End = that.End
  108. }
  109. // Convert cumulative request and usage back into rates
  110. if sa.Minutes() > 0 {
  111. sa.CPUCoreRequestAverage = cpuReqCoreMins / sa.Minutes()
  112. sa.CPUCoreUsageAverage = cpuUseCoreMins / sa.Minutes()
  113. sa.RAMBytesRequestAverage = ramReqByteMins / sa.Minutes()
  114. sa.RAMBytesUsageAverage = ramUseByteMins / sa.Minutes()
  115. } else {
  116. sa.CPUCoreRequestAverage = 0.0
  117. sa.CPUCoreUsageAverage = 0.0
  118. sa.RAMBytesRequestAverage = 0.0
  119. sa.RAMBytesUsageAverage = 0.0
  120. }
  121. // Sum all cumulative cost fields
  122. sa.CPUCost += that.CPUCost
  123. sa.ExternalCost += that.ExternalCost
  124. sa.GPUCost += that.GPUCost
  125. sa.LoadBalancerCost += that.LoadBalancerCost
  126. sa.NetworkCost += that.NetworkCost
  127. sa.PVCost += that.PVCost
  128. sa.RAMCost += that.RAMCost
  129. sa.SharedCost += that.SharedCost
  130. return nil
  131. }
  132. // CPUEfficiency is the ratio of usage to request. If there is no request and
  133. // no usage or cost, then efficiency is zero. If there is no request, but there
  134. // is usage or cost, then efficiency is 100%.
  135. func (sa *SummaryAllocation) CPUEfficiency() float64 {
  136. if sa == nil {
  137. return 0.0
  138. }
  139. if sa.CPUCoreRequestAverage > 0 {
  140. return sa.CPUCoreUsageAverage / sa.CPUCoreRequestAverage
  141. }
  142. if sa.CPUCoreUsageAverage == 0.0 || sa.CPUCost == 0.0 {
  143. return 0.0
  144. }
  145. return 1.0
  146. }
  147. func (sa *SummaryAllocation) generateKey(aggregateBy []string, labelConfig *LabelConfig) string {
  148. if sa == nil {
  149. return ""
  150. }
  151. return sa.Properties.GenerateKey(aggregateBy, labelConfig)
  152. }
  153. // IsExternal is true if the given SummaryAllocation represents external costs.
  154. func (sa *SummaryAllocation) IsExternal() bool {
  155. if sa == nil {
  156. return false
  157. }
  158. return strings.Contains(sa.Name, ExternalSuffix)
  159. }
  160. // IsIdle is true if the given SummaryAllocation represents idle costs.
  161. func (sa *SummaryAllocation) IsIdle() bool {
  162. if sa == nil {
  163. return false
  164. }
  165. return strings.Contains(sa.Name, IdleSuffix)
  166. }
  167. // IsUnallocated is true if the given SummaryAllocation represents unallocated
  168. // costs.
  169. func (sa *SummaryAllocation) IsUnallocated() bool {
  170. if sa == nil {
  171. return false
  172. }
  173. return strings.Contains(sa.Name, UnallocatedSuffix)
  174. }
  175. // IsUnmounted is true if the given SummaryAllocation represents unmounted
  176. // volume costs.
  177. func (sa *SummaryAllocation) IsUnmounted() bool {
  178. if sa == nil {
  179. return false
  180. }
  181. return strings.Contains(sa.Name, UnmountedSuffix)
  182. }
  183. // Minutes returns the number of minutes the SummaryAllocation represents, as
  184. // defined by the difference between the end and start times.
  185. func (sa *SummaryAllocation) Minutes() float64 {
  186. if sa == nil {
  187. return 0.0
  188. }
  189. return sa.End.Sub(sa.Start).Minutes()
  190. }
  191. // RAMEfficiency is the ratio of usage to request. If there is no request and
  192. // no usage or cost, then efficiency is zero. If there is no request, but there
  193. // is usage or cost, then efficiency is 100%.
  194. func (sa *SummaryAllocation) RAMEfficiency() float64 {
  195. if sa == nil {
  196. return 0.0
  197. }
  198. if sa.RAMBytesRequestAverage > 0 {
  199. return sa.RAMBytesUsageAverage / sa.RAMBytesRequestAverage
  200. }
  201. if sa.RAMBytesUsageAverage == 0.0 || sa.RAMCost == 0.0 {
  202. return 0.0
  203. }
  204. return 1.0
  205. }
  206. // TotalCost is the total cost of the SummaryAllocation
  207. func (sa *SummaryAllocation) TotalCost() float64 {
  208. if sa == nil {
  209. return 0.0
  210. }
  211. return sa.CPUCost + sa.GPUCost + sa.RAMCost + sa.PVCost + sa.NetworkCost + sa.LoadBalancerCost + sa.SharedCost + sa.ExternalCost
  212. }
  213. // TotalEfficiency is the cost-weighted average of CPU and RAM efficiency. If
  214. // there is no cost at all, then efficiency is zero.
  215. func (sa *SummaryAllocation) TotalEfficiency() float64 {
  216. if sa == nil {
  217. return 0.0
  218. }
  219. if sa.RAMCost+sa.CPUCost > 0 {
  220. ramCostEff := sa.RAMEfficiency() * sa.RAMCost
  221. cpuCostEff := sa.CPUEfficiency() * sa.CPUCost
  222. return (ramCostEff + cpuCostEff) / (sa.CPUCost + sa.RAMCost)
  223. }
  224. return 0.0
  225. }
  226. // SummaryAllocationSet stores a set of SummaryAllocations, each with a unique
  227. // name, that share a window. An AllocationSet is mutable, so treat it like a
  228. // threadsafe map.
  229. type SummaryAllocationSet struct {
  230. sync.RWMutex
  231. externalKeys map[string]bool
  232. idleKeys map[string]bool
  233. SummaryAllocations map[string]*SummaryAllocation `json:"allocations"`
  234. Window Window `json:"window"`
  235. }
  236. // NewSummaryAllocationSet converts an AllocationSet to a SummaryAllocationSet.
  237. // Filter functions, sharing functions, and reconciliation parameters are
  238. // required for unfortunate reasons to do with performance and legacy order-of-
  239. // operations details, as well as the fact that reconciliation has been
  240. // pushed down to the conversion step between Allocation and SummaryAllocation.
  241. func NewSummaryAllocationSet(as *AllocationSet, ffs, sfs []AllocationMatchFunc, reconcile, reconcileNetwork bool) *SummaryAllocationSet {
  242. if as == nil {
  243. return nil
  244. }
  245. // If we can know the exact size of the map, use it. If filters or sharing
  246. // functions are present, we can't know the size, so we make a default map.
  247. var sasMap map[string]*SummaryAllocation
  248. if len(ffs) == 0 && len(sfs) == 0 {
  249. // No filters, so make the map of summary allocations exactly the size
  250. // of the origin allocation set.
  251. sasMap = make(map[string]*SummaryAllocation, len(as.allocations))
  252. } else {
  253. // There are filters, so start with a standard map
  254. sasMap = make(map[string]*SummaryAllocation)
  255. }
  256. sas := &SummaryAllocationSet{
  257. SummaryAllocations: sasMap,
  258. Window: as.Window.Clone(),
  259. }
  260. for _, alloc := range as.allocations {
  261. // First, detect if the allocation should be shared. If so, mark it as
  262. // such, insert it, and continue.
  263. shouldShare := false
  264. for _, sf := range sfs {
  265. if sf(alloc) {
  266. shouldShare = true
  267. break
  268. }
  269. }
  270. if shouldShare {
  271. sa := NewSummaryAllocation(alloc, reconcile, reconcileNetwork)
  272. sa.Share = true
  273. sas.Insert(sa)
  274. continue
  275. }
  276. // If the allocation does not pass any of the given filter functions,
  277. // do not insert it into the set.
  278. shouldFilter := false
  279. for _, ff := range ffs {
  280. if !ff(alloc) {
  281. shouldFilter = true
  282. break
  283. }
  284. }
  285. if shouldFilter {
  286. continue
  287. }
  288. err := sas.Insert(NewSummaryAllocation(alloc, reconcile, reconcileNetwork))
  289. if err != nil {
  290. log.Errorf("SummaryAllocation: error inserting summary of %s", alloc.Name)
  291. }
  292. }
  293. for key := range as.externalKeys {
  294. sas.externalKeys[key] = true
  295. }
  296. for key := range as.idleKeys {
  297. sas.idleKeys[key] = true
  298. }
  299. return sas
  300. }
  301. // Add sums two SummaryAllocationSets, which Adds all SummaryAllocations in the
  302. // given SummaryAllocationSet to thier counterparts in the receiving set. Add
  303. // also expands the Window to include both constituent Windows, in the case
  304. // that Add is being used from accumulating (as opposed to aggregating). For
  305. // performance reasons, the function may return either a new set, or an
  306. // unmodified original, so it should not be assumed that the original sets are
  307. // safeuly usable after calling Add.
  308. func (sas *SummaryAllocationSet) Add(that *SummaryAllocationSet) (*SummaryAllocationSet, error) {
  309. if sas == nil || len(sas.SummaryAllocations) == 0 {
  310. return that, nil
  311. }
  312. if that == nil || len(that.SummaryAllocations) == 0 {
  313. return sas, nil
  314. }
  315. if sas.Window.IsOpen() {
  316. return nil, errors.New("cannot add a SummaryAllocationSet with an open window")
  317. }
  318. // Set start, end to min(start), max(end)
  319. start := *sas.Window.Start()
  320. end := *sas.Window.End()
  321. if that.Window.Start().Before(start) {
  322. start = *that.Window.Start()
  323. }
  324. if that.Window.End().After(end) {
  325. end = *that.Window.End()
  326. }
  327. acc := &SummaryAllocationSet{
  328. SummaryAllocations: make(map[string]*SummaryAllocation, len(sas.SummaryAllocations)),
  329. Window: NewClosedWindow(start, end),
  330. }
  331. sas.RLock()
  332. defer sas.RUnlock()
  333. that.RLock()
  334. defer that.RUnlock()
  335. for _, alloc := range sas.SummaryAllocations {
  336. err := acc.Insert(alloc)
  337. if err != nil {
  338. return nil, err
  339. }
  340. }
  341. for _, alloc := range that.SummaryAllocations {
  342. err := acc.Insert(alloc)
  343. if err != nil {
  344. return nil, err
  345. }
  346. }
  347. return acc, nil
  348. }
  349. // AggregateBy aggregates the Allocations in the given AllocationSet by the given
  350. // AllocationProperty. This will only be legal if the AllocationSet is divisible by the
  351. // given AllocationProperty; e.g. Containers can be divided by Namespace, but not vice-a-versa.
  352. func (sas *SummaryAllocationSet) AggregateBy(aggregateBy []string, options *AllocationAggregationOptions) error {
  353. if sas == nil || len(sas.SummaryAllocations) == 0 {
  354. return nil
  355. }
  356. if sas.Window.IsOpen() {
  357. return errors.New("cannot aggregate a SummaryAllocationSet with an open window")
  358. }
  359. if options == nil {
  360. options = &AllocationAggregationOptions{}
  361. }
  362. if options.LabelConfig == nil {
  363. options.LabelConfig = NewLabelConfig()
  364. }
  365. // Check if we have any work to do; if not, then early return. If
  366. // aggregateBy is nil, we don't aggregate anything. On the other hand,
  367. // an empty slice implies that we should aggregate everything. (See
  368. // generateKey for why that makes sense.)
  369. shouldAggregate := aggregateBy != nil
  370. shouldShare := len(options.SharedHourlyCosts) > 0 || len(options.ShareFuncs) > 0
  371. if !shouldAggregate && !shouldShare {
  372. return nil
  373. }
  374. // The order of operations for aggregating a SummaryAllotionSet is as
  375. // follows:
  376. //
  377. // 1. Partition external, idle, and shared allocations into separate sets.
  378. // Also, create the resultSet into which the results will be aggregated.
  379. //
  380. // 2. Record resource totals for shared costs and unmounted volumes so
  381. // that we can account for them in computing idle coefficients.
  382. //
  383. // 3. Retrieve pre-computed allocation resource totals, which will be used
  384. // to compute idle sharing coefficients.
  385. //
  386. // 4. Compute sharing coefficients per-aggregation, if sharing resources.
  387. //
  388. // 5. Distribute idle allocations according to the idle coefficients.
  389. //
  390. // 6. Record allocation resource totals (after filtration) if filters have
  391. // been applied. (Used for filtering proportional amount of idle.)
  392. //
  393. // 7. Generate aggregation key and insert allocation into the output set
  394. //
  395. // 8. If idle is shared and resources are shared, it's probable that some
  396. // amount of idle cost will be shared with a shared resource.
  397. // Distribute that idle cost, if it exists, among the respective shared
  398. // allocations before sharing them with the aggregated allocations.
  399. //
  400. // 9. Apply idle filtration, which "filters" the idle cost, or scales it
  401. // by the proportion of allocation resources remaining after filters
  402. // have been applied.
  403. //
  404. // 10. Convert shared hourly cost into a cumulative allocation to share,
  405. // and insert it into the share set.
  406. //
  407. // 11. Distribute shared resources according to sharing coefficients.
  408. //
  409. // 12. Insert external allocations into the result set.
  410. //
  411. // 13. Insert any undistributed idle, in the case that idle
  412. // coefficients end up being zero and some idle is not shared.
  413. //
  414. // 14. Combine all idle allocations into a single idle allocation, unless
  415. // the option to keep idle split by cluster or node is enabled.
  416. // 1. Partition external, idle, and shared allocations into separate sets.
  417. // Also, create the resultSet into which the results will be aggregated.
  418. // resultSet will collect the aggregated allocations
  419. resultSet := &SummaryAllocationSet{
  420. Window: sas.Window.Clone(),
  421. }
  422. // externalSet will collect external allocations
  423. externalSet := &SummaryAllocationSet{
  424. Window: sas.Window.Clone(),
  425. }
  426. // idleSet will be shared among resultSet after initial aggregation
  427. // is complete
  428. idleSet := &SummaryAllocationSet{
  429. Window: sas.Window.Clone(),
  430. }
  431. // shareSet will be shared among resultSet after initial aggregation
  432. // is complete
  433. shareSet := &SummaryAllocationSet{
  434. Window: sas.Window.Clone(),
  435. }
  436. sas.Lock()
  437. defer sas.Unlock()
  438. // 2. Record resource totals for shared costs, aggregating by cluster or by
  439. // node (depending on if idle is partitioned by cluster or node) so that we
  440. // can account for them in computing idle coefficients. Do the same for
  441. // unmounted volume costs, which only require a total cost.
  442. sharedResourceTotals := map[string]*AllocationTotals{}
  443. totalUnmountedCost := 0.0
  444. // 1 & 2. Identify set membership and aggregate aforementioned totals.
  445. for _, sa := range sas.SummaryAllocations {
  446. if sa.Share {
  447. var key string
  448. if options.IdleByNode {
  449. key = fmt.Sprintf("%s/%s", sa.Properties.Cluster, sa.Properties.Node)
  450. } else {
  451. key = sa.Properties.Cluster
  452. }
  453. if _, ok := sharedResourceTotals[key]; !ok {
  454. sharedResourceTotals[key] = &AllocationTotals{}
  455. }
  456. sharedResourceTotals[key].CPUCost += sa.CPUCost
  457. sharedResourceTotals[key].GPUCost += sa.GPUCost
  458. sharedResourceTotals[key].LoadBalancerCost += sa.LoadBalancerCost
  459. sharedResourceTotals[key].NetworkCost += sa.NetworkCost
  460. sharedResourceTotals[key].PersistentVolumeCost += sa.PVCost
  461. sharedResourceTotals[key].RAMCost += sa.RAMCost
  462. shareSet.Insert(sa)
  463. delete(sas.SummaryAllocations, sa.Name)
  464. continue
  465. }
  466. // External allocations get aggregated post-hoc (see step 6) and do
  467. // not necessarily contain complete sets of properties, so they are
  468. // moved to a separate AllocationSet.
  469. if sa.IsExternal() {
  470. delete(sas.externalKeys, sa.Name)
  471. delete(sas.SummaryAllocations, sa.Name)
  472. externalSet.Insert(sa)
  473. continue
  474. }
  475. // Idle allocations should be separated into idleSet if they are to be
  476. // shared later on. If they are not to be shared, then add them to the
  477. // resultSet like any other allocation.
  478. if sa.IsIdle() {
  479. delete(sas.idleKeys, sa.Name)
  480. delete(sas.SummaryAllocations, sa.Name)
  481. if options.ShareIdle == ShareEven || options.ShareIdle == ShareWeighted {
  482. idleSet.Insert(sa)
  483. } else {
  484. resultSet.Insert(sa)
  485. }
  486. continue
  487. }
  488. // Track total unmounted cost because it must be taken out of total
  489. // allocated costs for sharing coefficients.
  490. if sa.IsUnmounted() {
  491. totalUnmountedCost += sa.TotalCost()
  492. }
  493. }
  494. // TODO summary: do we need to handle case where len(SummaryAllocations) == 0?
  495. // 3. Retrieve pre-computed allocation resource totals, which will be used
  496. // to compute idle coefficients, based on the ratio of an allocation's per-
  497. // resource cost to the per-resource totals of that allocation's cluster or
  498. // node. Whether to perform this operation based on cluster or node is an
  499. // option. (See IdleByNode documentation; defaults to idle-by-cluster.)
  500. var allocTotals map[string]*AllocationTotals
  501. var ok bool
  502. if options.IdleByNode {
  503. if options.AllocationTotalsStore != nil {
  504. allocTotals, ok = options.AllocationTotalsStore.GetAllocationTotalsByNode(*sas.Window.Start(), *sas.Window.End())
  505. if !ok {
  506. return fmt.Errorf("nil allocation resource totals by node for %s", sas.Window)
  507. }
  508. }
  509. } else {
  510. if options.AllocationTotalsStore != nil {
  511. allocTotals, ok = options.AllocationTotalsStore.GetAllocationTotalsByCluster(*sas.Window.Start(), *sas.Window.End())
  512. if !ok {
  513. return fmt.Errorf("nil allocation resource totals by cluster for %s", sas.Window)
  514. }
  515. }
  516. }
  517. // TODO summary: make sure that we're robust to missing (nil) allocTotals.
  518. // To test, pass nil options.AllocationTotalsStore.
  519. // If filters have been applied, then we need to record allocation resource
  520. // totals after filtration (i.e. the allocations that are present) so that
  521. // we can identify the proportion of idle cost to keep. That is, we should
  522. // only return the idle cost that would be shared with the remaining
  523. // allocations, even if we're keeping idle separate. The totals should be
  524. // recorded by idle-key (cluster or node, depending on the IdleByNode
  525. // option). Instantiating this map is a signal to record the totals.
  526. var allocTotalsAfterFilters map[string]*AllocationTotals
  527. if len(resultSet.idleKeys) > 0 && len(options.FilterFuncs) > 0 {
  528. allocTotalsAfterFilters = make(map[string]*AllocationTotals, len(resultSet.idleKeys))
  529. }
  530. // If we're recording allocTotalsAfterFilters and there are shared costs,
  531. // then record those resource totals here so that idle for thpse shared
  532. // resources gets included.
  533. if allocTotalsAfterFilters != nil {
  534. for key, rt := range sharedResourceTotals {
  535. if _, ok := allocTotalsAfterFilters[key]; !ok {
  536. allocTotalsAfterFilters[key] = &AllocationTotals{}
  537. }
  538. // Record only those fields required for computing idle
  539. allocTotalsAfterFilters[key].CPUCost += rt.CPUCost
  540. allocTotalsAfterFilters[key].GPUCost += rt.GPUCost
  541. allocTotalsAfterFilters[key].RAMCost += rt.RAMCost
  542. }
  543. }
  544. // Sharing coefficients are recorded by post-aggregation-key (e.g. if
  545. // aggregating by namespace, then the key will be the namespace) and only
  546. // need to be recorded if there are shared resources. Instantiating this
  547. // map is the signal to record sharing coefficients.
  548. var sharingCoeffs map[string]float64
  549. if len(shareSet.SummaryAllocations) > 0 {
  550. sharingCoeffs = map[string]float64{}
  551. }
  552. // Loop over all remaining SummaryAllocations (after filters, sharing, &c.)
  553. // doing the following, in this order:
  554. // 4. Compute sharing coefficients, if there are shared resources
  555. // 5. Distribute idle cost, if sharing idle
  556. // 6. Record allocTotalsAfterFiltration, if filters have been applied
  557. // 7. Aggregate by key
  558. for _, sa := range sas.SummaryAllocations {
  559. // Generate key to use for aggregation-by-key and allocation name
  560. key := sa.generateKey(aggregateBy, options.LabelConfig)
  561. // 4. Incrementally add to sharing coefficients before adding idle
  562. // cost, which would skew the coefficients. These coefficients will be
  563. // later divided by a total, turning them into a coefficient between
  564. // 0.0 and 1.0.
  565. // NOTE: SummaryAllocation does not support ShareEven, so only record
  566. // by cost for cost-weighted distribution.
  567. if sharingCoeffs != nil {
  568. sharingCoeffs[key] += sa.TotalCost()
  569. }
  570. // 5. Distribute idle allocations according to the idle coefficients.
  571. // NOTE: if idle allocation is off (i.e. ShareIdle == ShareNone) then
  572. // all idle allocations will be in the resultSet at this point, so idleSet
  573. // will be empty and we won't enter this block.
  574. if len(idleSet.SummaryAllocations) > 0 {
  575. for _, idle := range idleSet.SummaryAllocations {
  576. // Idle key is either cluster or node, as determined by the
  577. // IdleByNode option.
  578. var key string
  579. // Only share idle allocation with current allocation (sa) if
  580. // the relevant properties match (i.e. cluster and/or node)
  581. if idle.Properties.Cluster != sa.Properties.Cluster {
  582. continue
  583. }
  584. key = idle.Properties.Cluster
  585. if options.IdleByNode {
  586. if idle.Properties.Node != sa.Properties.Node {
  587. continue
  588. }
  589. key = fmt.Sprintf("%s/%s", idle.Properties.Cluster, idle.Properties.Node)
  590. }
  591. cpuCoeff, gpuCoeff, ramCoeff := ComputeIdleCoefficients(options.ShareIdle, key, sa.CPUCost, sa.GPUCost, sa.RAMCost, allocTotals)
  592. sa.CPUCost += idle.CPUCost * cpuCoeff
  593. sa.GPUCost += idle.GPUCost * gpuCoeff
  594. sa.RAMCost += idle.RAMCost * ramCoeff
  595. }
  596. }
  597. // The key becomes the allocation's name, which is used as the key by
  598. // which the allocation is inserted into the set.
  599. sa.Name = key
  600. // If merging unallocated allocations, rename all unallocated
  601. // allocations as simply __unallocated__
  602. if options.MergeUnallocated && sa.IsUnallocated() {
  603. sa.Name = UnallocatedSuffix
  604. }
  605. // 6. Record filtered resource totals for idle allocation filtration,
  606. // only if necessary.
  607. if allocTotalsAfterFilters != nil {
  608. key := sa.Properties.Cluster
  609. if options.IdleByNode {
  610. key = fmt.Sprintf("%s/%s", sa.Properties.Cluster, sa.Properties.Node)
  611. }
  612. if _, ok := allocTotalsAfterFilters[key]; ok {
  613. allocTotalsAfterFilters[key].CPUCost += sa.CPUCost
  614. allocTotalsAfterFilters[key].GPUCost += sa.GPUCost
  615. allocTotalsAfterFilters[key].RAMCost += sa.RAMCost
  616. } else {
  617. allocTotalsAfterFilters[key] = &AllocationTotals{
  618. CPUCost: sa.CPUCost,
  619. GPUCost: sa.GPUCost,
  620. RAMCost: sa.RAMCost,
  621. }
  622. }
  623. }
  624. // 7. Inserting the allocation with the generated key for a name
  625. // performs the actual aggregation step.
  626. resultSet.Insert(sa)
  627. }
  628. // 8. If idle is shared and resources are shared, it's probable that some
  629. // amount of idle cost will be shared with a shared resource. Distribute
  630. // that idle cost, if it exists, among the respective shared allocations
  631. // before sharing them with the aggregated allocations.
  632. if len(idleSet.SummaryAllocations) > 0 && len(shareSet.SummaryAllocations) > 0 {
  633. for _, sa := range shareSet.SummaryAllocations {
  634. for _, idle := range idleSet.SummaryAllocations {
  635. var key string
  636. // Only share idle allocation with current allocation (sa) if
  637. // the relevant property matches (i.e. Cluster or Node,
  638. // depending on which idle sharing option is selected)
  639. if options.IdleByNode {
  640. if idle.Properties.Node != sa.Properties.Node {
  641. continue
  642. }
  643. key = idle.Properties.Node
  644. } else {
  645. if idle.Properties.Cluster != sa.Properties.Cluster {
  646. continue
  647. }
  648. key = idle.Properties.Cluster
  649. }
  650. cpuCoeff, gpuCoeff, ramCoeff := ComputeIdleCoefficients(options.ShareIdle, key, sa.CPUCost, sa.GPUCost, sa.RAMCost, allocTotals)
  651. sa.CPUCost += idle.CPUCost * cpuCoeff
  652. sa.GPUCost += idle.GPUCost * gpuCoeff
  653. sa.RAMCost += idle.RAMCost * ramCoeff
  654. }
  655. }
  656. }
  657. // 9. Apply idle filtration, which "filters" the idle cost, i.e. scales
  658. // idle allocation costs per-resource by the proportion of allocation
  659. // resources remaining after filtering. In effect, this returns only the
  660. // idle costs that would have been shared with the remaining allocations,
  661. // even if idle is kept separated.
  662. if allocTotalsAfterFilters != nil {
  663. for idleKey := range resultSet.idleKeys {
  664. ia := resultSet.SummaryAllocations[idleKey]
  665. var key string
  666. if options.IdleByNode {
  667. key = ia.Properties.Node
  668. } else {
  669. key = ia.Properties.Cluster
  670. }
  671. // Percentage of idle that should remain after filters are applied,
  672. // which equals the proportion of filtered-to-actual cost.
  673. cpuFilterCoeff := 0.0
  674. if allocTotals[key].CPUCost > 0.0 {
  675. cpuFilterCoeff = allocTotalsAfterFilters[key].CPUCost / allocTotals[key].CPUCost
  676. }
  677. gpuFilterCoeff := 0.0
  678. if allocTotals[key].RAMCost > 0.0 {
  679. gpuFilterCoeff = allocTotalsAfterFilters[key].RAMCost / allocTotals[key].RAMCost
  680. }
  681. ramFilterCoeff := 0.0
  682. if allocTotals[key].RAMCost > 0.0 {
  683. ramFilterCoeff = allocTotalsAfterFilters[key].RAMCost / allocTotals[key].RAMCost
  684. }
  685. ia.CPUCost *= cpuFilterCoeff
  686. ia.GPUCost *= gpuFilterCoeff
  687. ia.RAMCost *= ramFilterCoeff
  688. }
  689. }
  690. // 10. Convert shared hourly cost into a cumulative allocation to share,
  691. // and insert it into the share set.
  692. for name, cost := range options.SharedHourlyCosts {
  693. if cost > 0.0 {
  694. hours := sas.Window.Hours()
  695. // If set ends in the future, adjust hours accordingly
  696. diff := time.Since(*sas.Window.End())
  697. if diff < 0.0 {
  698. hours += diff.Hours()
  699. }
  700. totalSharedCost := cost * hours
  701. shareSet.Insert(&SummaryAllocation{
  702. Name: fmt.Sprintf("%s/%s", name, SharedSuffix),
  703. Start: *sas.Window.Start(),
  704. End: *sas.Window.End(),
  705. SharedCost: totalSharedCost,
  706. })
  707. }
  708. }
  709. // 11. Distribute shared resources according to sharing coefficients.
  710. // NOTE: ShareEven is not supported
  711. if len(shareSet.SummaryAllocations) > 0 {
  712. sharingCoeffDenominator := 0.0
  713. for _, rt := range allocTotals {
  714. sharingCoeffDenominator += rt.TotalCost()
  715. }
  716. // Do not include the shared costs, themselves, when determining
  717. // sharing coefficients.
  718. for _, rt := range sharedResourceTotals {
  719. sharingCoeffDenominator -= rt.TotalCost()
  720. }
  721. // Do not include the unmounted costs when determining sharing
  722. // coefficients becuase they do not receive shared costs.
  723. sharingCoeffDenominator -= totalUnmountedCost
  724. if sharingCoeffDenominator <= 0.0 {
  725. log.Warningf("SummaryAllocation: sharing coefficient denominator is %f", sharingCoeffDenominator)
  726. } else {
  727. // Compute sharing coeffs by dividing the thus-far accumulated
  728. // numerators by the now-finalized denominator.
  729. for key := range sharingCoeffs {
  730. sharingCoeffs[key] /= sharingCoeffDenominator
  731. }
  732. for key, sa := range resultSet.SummaryAllocations {
  733. // Idle and unmounted allocations, by definition, do not
  734. // receive shared cost
  735. if sa.IsIdle() || sa.IsUnmounted() {
  736. continue
  737. }
  738. sharingCoeff := sharingCoeffs[key]
  739. // Distribute each shared cost with the current allocation on the
  740. // basis of the proportion of the allocation's cost (ShareWeighted)
  741. // or count (ShareEven) to the total aggregated cost or count. This
  742. // condition should hold in spite of filters because the sharing
  743. // coefficient denominator is held constant by pre-computed
  744. // resource totals and the post-aggregation total cost of the
  745. // remaining allocations will, by definition, not be affected.
  746. for _, shared := range shareSet.SummaryAllocations {
  747. sa.SharedCost += shared.TotalCost() * sharingCoeff
  748. }
  749. }
  750. }
  751. }
  752. // 12. Insert external allocations into the result set.
  753. for _, sa := range externalSet.SummaryAllocations {
  754. skip := false
  755. // TODO summary: deal with filters... maybe make an Allocation with the
  756. // same Properties and test the filter func?
  757. // for _, ff := range options.FilterFuncs {
  758. // if !ff(sa) {
  759. // skip = true
  760. // break
  761. // }
  762. // }
  763. if !skip {
  764. key := sa.generateKey(aggregateBy, options.LabelConfig)
  765. sa.Name = key
  766. resultSet.Insert(sa)
  767. }
  768. }
  769. // 13. Distribute remaining, undistributed idle. Undistributed idle is any
  770. // per-resource idle cost for which there can be no idle coefficient
  771. // computed because there is zero usage across all allocations.
  772. for _, ia := range idleSet.SummaryAllocations {
  773. key := ia.Properties.Cluster
  774. if options.IdleByNode {
  775. key = fmt.Sprintf("%s/%s", ia.Properties.Cluster, ia.Properties.Node)
  776. }
  777. rt, ok := allocTotals[key]
  778. if !ok {
  779. log.Warningf("SummaryAllocation: AggregateBy: cannot handle undistributed idle for '%s'", key)
  780. continue
  781. }
  782. hasUndistributableCost := false
  783. if ia.CPUCost > 0.0 && rt.CPUCost == 0.0 {
  784. // There is idle CPU cost, but no allocated CPU cost, so that cost
  785. // is undistributable and must be inserted.
  786. hasUndistributableCost = true
  787. } else {
  788. // Cost was entirely distributed, so zero it out
  789. ia.CPUCost = 0.0
  790. }
  791. if ia.GPUCost > 0.0 && rt.GPUCost == 0.0 {
  792. // There is idle GPU cost, but no allocated GPU cost, so that cost
  793. // is undistributable and must be inserted.
  794. hasUndistributableCost = true
  795. } else {
  796. // Cost was entirely distributed, so zero it out
  797. ia.GPUCost = 0.0
  798. }
  799. if ia.RAMCost > 0.0 && rt.RAMCost == 0.0 {
  800. // There is idle CPU cost, but no allocated CPU cost, so that cost
  801. // is undistributable and must be inserted.
  802. hasUndistributableCost = true
  803. } else {
  804. // Cost was entirely distributed, so zero it out
  805. ia.RAMCost = 0.0
  806. }
  807. if hasUndistributableCost {
  808. ia.Name = fmt.Sprintf("%s/%s", key, IdleSuffix)
  809. resultSet.Insert(ia)
  810. }
  811. }
  812. // 14. Combine all idle allocations into a single idle allocation, unless
  813. // the option to keep idle split by cluster or node is enabled.
  814. if !options.SplitIdle {
  815. for _, ia := range resultSet.idleAllocations() {
  816. resultSet.Delete(ia.Name)
  817. ia.Name = IdleSuffix
  818. resultSet.Insert(ia)
  819. }
  820. }
  821. // Replace the existing set's data with the new, aggregated summary data
  822. sas.SummaryAllocations = resultSet.SummaryAllocations
  823. return nil
  824. }
  825. // Delete removes the allocation with the given name from the set
  826. func (sas *SummaryAllocationSet) Delete(name string) {
  827. if sas == nil {
  828. return
  829. }
  830. sas.Lock()
  831. defer sas.Unlock()
  832. delete(sas.externalKeys, name)
  833. delete(sas.idleKeys, name)
  834. delete(sas.SummaryAllocations, name)
  835. }
  836. // Each invokes the given function for each SummaryAllocation in the set
  837. func (sas *SummaryAllocationSet) Each(f func(string, *SummaryAllocation)) {
  838. if sas == nil {
  839. return
  840. }
  841. for k, a := range sas.SummaryAllocations {
  842. f(k, a)
  843. }
  844. }
  845. // IdleAllocations returns a map of the idle allocations in the AllocationSet.
  846. func (sas *SummaryAllocationSet) idleAllocations() map[string]*SummaryAllocation {
  847. idles := map[string]*SummaryAllocation{}
  848. if sas == nil || len(sas.SummaryAllocations) == 0 {
  849. return idles
  850. }
  851. sas.RLock()
  852. defer sas.RUnlock()
  853. for key := range sas.idleKeys {
  854. if sa, ok := sas.SummaryAllocations[key]; ok {
  855. idles[key] = sa
  856. }
  857. }
  858. return idles
  859. }
  860. // Insert aggregates the current entry in the SummaryAllocationSet by the given Allocation,
  861. // but only if the Allocation is valid, i.e. matches the SummaryAllocationSet's window. If
  862. // there is no existing entry, one is created. Nil error response indicates success.
  863. func (sas *SummaryAllocationSet) Insert(sa *SummaryAllocation) error {
  864. if sas == nil {
  865. return fmt.Errorf("cannot insert into nil SummaryAllocationSet")
  866. }
  867. if sa == nil {
  868. return fmt.Errorf("cannot insert a nil SummaryAllocation")
  869. }
  870. sas.Lock()
  871. defer sas.Unlock()
  872. if sas.SummaryAllocations == nil {
  873. sas.SummaryAllocations = map[string]*SummaryAllocation{}
  874. }
  875. if sas.externalKeys == nil {
  876. sas.externalKeys = map[string]bool{}
  877. }
  878. if sas.idleKeys == nil {
  879. sas.idleKeys = map[string]bool{}
  880. }
  881. // Add the given Allocation to the existing entry, if there is one;
  882. // otherwise just set directly into allocations
  883. if _, ok := sas.SummaryAllocations[sa.Name]; ok {
  884. err := sas.SummaryAllocations[sa.Name].Add(sa)
  885. if err != nil {
  886. return fmt.Errorf("SummaryAllocationSet.Insert: error trying to Add: %s", err)
  887. }
  888. } else {
  889. sas.SummaryAllocations[sa.Name] = sa
  890. }
  891. // If the given Allocation is an external one, record that
  892. if sa.IsExternal() {
  893. sas.externalKeys[sa.Name] = true
  894. }
  895. // If the given Allocation is an idle one, record that
  896. if sa.IsIdle() {
  897. sas.idleKeys[sa.Name] = true
  898. }
  899. return nil
  900. }
  901. // SummaryAllocationSetRange is a thread-safe slice of SummaryAllocationSets.
  902. type SummaryAllocationSetRange struct {
  903. sync.RWMutex
  904. Step time.Duration `json:"step"`
  905. SummaryAllocationSets []*SummaryAllocationSet `json:"sets"`
  906. Window Window `json:"window"`
  907. }
  908. // NewSummaryAllocationSetRange instantiates a new range composed of the given
  909. // SummaryAllocationSets in the order provided. The expectations about the
  910. // SummaryAllocationSets are as follows:
  911. // - window durations are all equal
  912. // - sets are consecutive (i.e. chronologically sorted)
  913. // - there are no gaps between sets
  914. // - sets do not have overlapping windows
  915. func NewSummaryAllocationSetRange(sass ...*SummaryAllocationSet) *SummaryAllocationSetRange {
  916. var step time.Duration
  917. window := NewWindow(nil, nil)
  918. for _, sas := range sass {
  919. if window.Start() == nil || (sas.Window.Start() != nil && sas.Window.Start().Before(*window.Start())) {
  920. window.start = sas.Window.Start()
  921. }
  922. if window.End() == nil || (sas.Window.End() != nil && sas.Window.End().After(*window.End())) {
  923. window.end = sas.Window.End()
  924. }
  925. if step == 0 {
  926. step = sas.Window.Duration()
  927. } else if step != sas.Window.Duration() {
  928. log.Warningf("instantiating range with step %s using set of step %s is illegal", step, sas.Window.Duration())
  929. }
  930. }
  931. return &SummaryAllocationSetRange{
  932. Step: step,
  933. SummaryAllocationSets: sass,
  934. Window: window,
  935. }
  936. }
  937. // Accumulate sums each AllocationSet in the given range, returning a single cumulative
  938. // AllocationSet for the entire range.
  939. func (sasr *SummaryAllocationSetRange) Accumulate() (*SummaryAllocationSet, error) {
  940. var result *SummaryAllocationSet
  941. var err error
  942. sasr.RLock()
  943. defer sasr.RUnlock()
  944. for _, sas := range sasr.SummaryAllocationSets {
  945. result, err = result.Add(sas)
  946. if err != nil {
  947. return nil, err
  948. }
  949. }
  950. return result, nil
  951. }
  952. // AggregateBy aggregates each AllocationSet in the range by the given
  953. // properties and options.
  954. func (sasr *SummaryAllocationSetRange) AggregateBy(aggregateBy []string, options *AllocationAggregationOptions) error {
  955. sasr.Lock()
  956. defer sasr.Unlock()
  957. for _, sas := range sasr.SummaryAllocationSets {
  958. err := sas.AggregateBy(aggregateBy, options)
  959. if err != nil {
  960. // Wipe out data so that corrupt data cannot be mistakenly used
  961. sasr.SummaryAllocationSets = []*SummaryAllocationSet{}
  962. return err
  963. }
  964. }
  965. return nil
  966. }
  967. // Append appends the given AllocationSet to the end of the range. It does not
  968. // validate whether or not that violates window continuity.
  969. func (sasr *SummaryAllocationSetRange) Append(sas *SummaryAllocationSet) error {
  970. if sasr.Step != 0 && sas.Window.Duration() != sasr.Step {
  971. return fmt.Errorf("cannot append set with duration %s to range of step %s", sas.Window.Duration(), sasr.Step)
  972. }
  973. sasr.Lock()
  974. defer sasr.Unlock()
  975. // Append to list of sets
  976. sasr.SummaryAllocationSets = append(sasr.SummaryAllocationSets, sas)
  977. // Set step, if not set
  978. if sasr.Step == 0 {
  979. sasr.Step = sas.Window.Duration()
  980. }
  981. // Adjust window
  982. if sasr.Window.Start() == nil || (sas.Window.Start() != nil && sas.Window.Start().Before(*sasr.Window.Start())) {
  983. sasr.Window.start = sas.Window.Start()
  984. }
  985. if sasr.Window.End() == nil || (sas.Window.End() != nil && sas.Window.End().After(*sasr.Window.End())) {
  986. sasr.Window.end = sas.Window.End()
  987. }
  988. return nil
  989. }
  990. // Each invokes the given function for each AllocationSet in the range
  991. func (sasr *SummaryAllocationSetRange) Each(f func(int, *SummaryAllocationSet)) {
  992. if sasr == nil {
  993. return
  994. }
  995. for i, as := range sasr.SummaryAllocationSets {
  996. f(i, as)
  997. }
  998. }
  999. // InsertExternalAllocations takes all allocations in the given
  1000. // AllocationSetRange (they should all be considered "external") and inserts
  1001. // them into the receiving SummaryAllocationSetRange.
  1002. // TODO:CLEANUP replace this with a better idea (or get rid of external
  1003. // allocations, as such, altogether)
  1004. func (sasr *SummaryAllocationSetRange) InsertExternalAllocations(that *AllocationSetRange) error {
  1005. if sasr == nil {
  1006. return fmt.Errorf("cannot insert range into nil AllocationSetRange")
  1007. }
  1008. // keys maps window to index in range
  1009. keys := map[string]int{}
  1010. for i, as := range sasr.SummaryAllocationSets {
  1011. if as == nil {
  1012. continue
  1013. }
  1014. keys[as.Window.String()] = i
  1015. }
  1016. // Nothing to merge, so simply return
  1017. if len(keys) == 0 {
  1018. return nil
  1019. }
  1020. var err error
  1021. that.Each(func(j int, thatAS *AllocationSet) {
  1022. if thatAS == nil || err != nil {
  1023. return
  1024. }
  1025. // Find matching AllocationSet in asr
  1026. i, ok := keys[thatAS.Window.String()]
  1027. if !ok {
  1028. err = fmt.Errorf("cannot merge AllocationSet into window that does not exist: %s", thatAS.Window.String())
  1029. return
  1030. }
  1031. sas := sasr.SummaryAllocationSets[i]
  1032. // Insert each Allocation from the given set
  1033. thatAS.Each(func(k string, alloc *Allocation) {
  1034. externalSA := NewSummaryAllocation(alloc, true, true)
  1035. // This error will be returned below
  1036. // TODO:CLEANUP should Each have early-error-return functionality?
  1037. err = sas.Insert(externalSA)
  1038. })
  1039. })
  1040. // err might be nil
  1041. return err
  1042. }