allocation.go 79 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471
  1. package kubecost
  2. import (
  3. "bytes"
  4. "fmt"
  5. "strings"
  6. "sync"
  7. "time"
  8. "github.com/kubecost/cost-model/pkg/log"
  9. "github.com/kubecost/cost-model/pkg/util"
  10. "github.com/kubecost/cost-model/pkg/util/json"
  11. )
  12. // TODO Clean-up use of IsEmpty; nil checks should be separated for safety.
  13. // TODO Consider making Allocation an interface, which is fulfilled by structs
  14. // like KubernetesAllocation, IdleAllocation, and ExternalAllocation.
  15. // ExternalSuffix indicates an external allocation
  16. const ExternalSuffix = "__external__"
  17. // IdleSuffix indicates an idle allocation property
  18. const IdleSuffix = "__idle__"
  19. // SharedSuffix indicates an shared allocation property
  20. const SharedSuffix = "__shared__"
  21. // UnallocatedSuffix indicates an unallocated allocation property
  22. const UnallocatedSuffix = "__unallocated__"
  23. // UnmountedSuffix indicated allocation to an unmounted PV
  24. const UnmountedSuffix = "__unmounted__"
  25. // ShareWeighted indicates that a shared resource should be shared as a
  26. // proportion of the cost of the remaining allocations.
  27. const ShareWeighted = "__weighted__"
  28. // ShareEven indicates that a shared resource should be shared evenly across
  29. // all remaining allocations.
  30. const ShareEven = "__even__"
  31. // ShareNone indicates that a shareable resource should not be shared
  32. const ShareNone = "__none__"
  33. // Allocation is a unit of resource allocation and cost for a given window
  34. // of time and for a given kubernetes construct with its associated set of
  35. // properties.
  36. // TODO:CLEANUP consider dropping name in favor of just Allocation and an
  37. // Assets-style key() function for AllocationSet.
  38. type Allocation struct {
  39. Name string `json:"name"`
  40. Properties *AllocationProperties `json:"properties,omitempty"`
  41. Window Window `json:"window"`
  42. Start time.Time `json:"start"`
  43. End time.Time `json:"end"`
  44. CPUCoreHours float64 `json:"cpuCoreHours"`
  45. CPUCoreRequestAverage float64 `json:"cpuCoreRequestAverage"`
  46. CPUCoreUsageAverage float64 `json:"cpuCoreUsageAverage"`
  47. CPUCost float64 `json:"cpuCost"`
  48. CPUCostAdjustment float64 `json:"cpuCostAdjustment"`
  49. GPUHours float64 `json:"gpuHours"`
  50. GPUCost float64 `json:"gpuCost"`
  51. GPUCostAdjustment float64 `json:"gpuCostAdjustment"`
  52. NetworkTransferBytes float64 `json:"networkTransferBytes"`
  53. NetworkReceiveBytes float64 `json:"networkReceiveBytes"`
  54. NetworkCost float64 `json:"networkCost"`
  55. NetworkCostAdjustment float64 `json:"networkCostAdjustment"`
  56. LoadBalancerCost float64 `json:"loadBalancerCost"`
  57. LoadBalancerCostAdjustment float64 `json:"loadBalancerCostAdjustment"`
  58. PVs PVAllocations `json:"-"`
  59. PVCostAdjustment float64 `json:"pvCostAdjustment"`
  60. RAMByteHours float64 `json:"ramByteHours"`
  61. RAMBytesRequestAverage float64 `json:"ramByteRequestAverage"`
  62. RAMBytesUsageAverage float64 `json:"ramByteUsageAverage"`
  63. RAMCost float64 `json:"ramCost"`
  64. RAMCostAdjustment float64 `json:"ramCostAdjustment"`
  65. SharedCost float64 `json:"sharedCost"`
  66. ExternalCost float64 `json:"externalCost"`
  67. // RawAllocationOnly is a pointer so if it is not present it will be
  68. // marshalled as null rather than as an object with Go default values.
  69. RawAllocationOnly *RawAllocationOnlyData `json:"rawAllocationOnly"`
  70. }
  71. // RawAllocationOnlyData is information that only belong in "raw" Allocations,
  72. // those which have not undergone aggregation, accumulation, or any other form
  73. // of combination to produce a new Allocation from other Allocations.
  74. //
  75. // Max usage data belongs here because computing the overall maximum from two
  76. // or more Allocations is a non-trivial operation that cannot be defined without
  77. // maintaining a large amount of state. Consider the following example:
  78. // _______________________________________________
  79. //
  80. // A1 Using 3 CPU ---- ----- ------
  81. // A2 Using 2 CPU ---- ----- ----
  82. // A3 Using 1 CPU --- --
  83. // _______________________________________________
  84. // Time ---->
  85. //
  86. // The logical maximum CPU usage is 5, but this cannot be calculated iteratively,
  87. // which is how we calculate aggregations and accumulations of Allocations currently.
  88. // This becomes a problem I could call "maximum sum of overlapping intervals" and is
  89. // essentially a variant of an interval scheduling algorithm.
  90. //
  91. // If we had types to differentiate between regular Allocations and AggregatedAllocations
  92. // then this type would be unnecessary and its fields would go into the regular Allocation
  93. // and not in the AggregatedAllocation.
  94. type RawAllocationOnlyData struct {
  95. CPUCoreUsageMax float64 `json:"cpuCoreUsageMax"`
  96. RAMBytesUsageMax float64 `json:"ramByteUsageMax"`
  97. }
  98. // PVAllocations is a map of Disk Asset Identifiers to the
  99. // usage of them by an Allocation as recorded in a PVAllocation
  100. type PVAllocations map[PVKey]*PVAllocation
  101. // Clone creates a deep copy of a PVAllocations
  102. func (pv *PVAllocations) Clone() PVAllocations {
  103. if pv == nil || *pv == nil {
  104. return nil
  105. }
  106. apv := *pv
  107. clonePV := PVAllocations{}
  108. for k, v := range apv {
  109. clonePV[k] = &PVAllocation{
  110. ByteHours: v.ByteHours,
  111. Cost: v.Cost,
  112. }
  113. }
  114. return clonePV
  115. }
  116. // Add adds contents of that to the calling PVAllocations
  117. func (pv *PVAllocations) Add(that PVAllocations) PVAllocations {
  118. apv := pv.Clone()
  119. if that != nil {
  120. if apv == nil {
  121. apv = PVAllocations{}
  122. }
  123. for pvKey, thatPVAlloc := range that {
  124. apvAlloc, ok := apv[pvKey]
  125. if !ok {
  126. apvAlloc = &PVAllocation{}
  127. }
  128. apvAlloc.Cost += thatPVAlloc.Cost
  129. apvAlloc.ByteHours += thatPVAlloc.ByteHours
  130. apv[pvKey] = apvAlloc
  131. }
  132. }
  133. return apv
  134. }
  135. // PVKey for identifying Disk type assets
  136. type PVKey struct {
  137. Cluster string `json:"cluster"`
  138. Name string `json:"name"`
  139. }
  140. // PVAllocation contains the byte hour usage
  141. // and cost of an Allocation for a single PV
  142. type PVAllocation struct {
  143. ByteHours float64 `json:"byteHours"`
  144. Cost float64 `json:"cost"`
  145. }
  146. // AllocationMatchFunc is a function that can be used to match Allocations by
  147. // returning true for any given Allocation if a condition is met.
  148. type AllocationMatchFunc func(*Allocation) bool
  149. // Add returns the result of summing the two given Allocations, which sums the
  150. // summary fields (e.g. costs, resources) and recomputes efficiency. Neither of
  151. // the two original Allocations are mutated in the process.
  152. func (a *Allocation) Add(that *Allocation) (*Allocation, error) {
  153. if a == nil {
  154. return that.Clone(), nil
  155. }
  156. if that == nil {
  157. return a.Clone(), nil
  158. }
  159. // Note: no need to clone "that", as add only mutates the receiver
  160. agg := a.Clone()
  161. agg.add(that)
  162. return agg, nil
  163. }
  164. // Clone returns a deep copy of the given Allocation
  165. func (a *Allocation) Clone() *Allocation {
  166. if a == nil {
  167. return nil
  168. }
  169. return &Allocation{
  170. Name: a.Name,
  171. Properties: a.Properties.Clone(),
  172. Window: a.Window.Clone(),
  173. Start: a.Start,
  174. End: a.End,
  175. CPUCoreHours: a.CPUCoreHours,
  176. CPUCoreRequestAverage: a.CPUCoreRequestAverage,
  177. CPUCoreUsageAverage: a.CPUCoreUsageAverage,
  178. CPUCost: a.CPUCost,
  179. CPUCostAdjustment: a.CPUCostAdjustment,
  180. GPUHours: a.GPUHours,
  181. GPUCost: a.GPUCost,
  182. GPUCostAdjustment: a.GPUCostAdjustment,
  183. NetworkTransferBytes: a.NetworkTransferBytes,
  184. NetworkReceiveBytes: a.NetworkReceiveBytes,
  185. NetworkCost: a.NetworkCost,
  186. NetworkCostAdjustment: a.NetworkCostAdjustment,
  187. LoadBalancerCost: a.LoadBalancerCost,
  188. LoadBalancerCostAdjustment: a.LoadBalancerCostAdjustment,
  189. PVs: a.PVs.Clone(),
  190. PVCostAdjustment: a.PVCostAdjustment,
  191. RAMByteHours: a.RAMByteHours,
  192. RAMBytesRequestAverage: a.RAMBytesRequestAverage,
  193. RAMBytesUsageAverage: a.RAMBytesUsageAverage,
  194. RAMCost: a.RAMCost,
  195. RAMCostAdjustment: a.RAMCostAdjustment,
  196. SharedCost: a.SharedCost,
  197. ExternalCost: a.ExternalCost,
  198. RawAllocationOnly: a.RawAllocationOnly.Clone(),
  199. }
  200. }
  201. // Clone returns a deep copy of the given RawAllocationOnlyData
  202. func (r *RawAllocationOnlyData) Clone() *RawAllocationOnlyData {
  203. if r == nil {
  204. return nil
  205. }
  206. return &RawAllocationOnlyData{
  207. CPUCoreUsageMax: r.CPUCoreUsageMax,
  208. RAMBytesUsageMax: r.RAMBytesUsageMax,
  209. }
  210. }
  211. // Equal returns true if the values held in the given Allocation precisely
  212. // match those of the receiving Allocation. nil does not match nil. Floating
  213. // point values need to match according to util.IsApproximately, which accounts
  214. // for small, reasonable floating point error margins.
  215. func (a *Allocation) Equal(that *Allocation) bool {
  216. if a == nil || that == nil {
  217. return false
  218. }
  219. if a.Name != that.Name {
  220. return false
  221. }
  222. if !a.Properties.Equal(that.Properties) {
  223. return false
  224. }
  225. if !a.Window.Equal(that.Window) {
  226. return false
  227. }
  228. if !a.Start.Equal(that.Start) {
  229. return false
  230. }
  231. if !a.End.Equal(that.End) {
  232. return false
  233. }
  234. if !util.IsApproximately(a.CPUCoreHours, that.CPUCoreHours) {
  235. return false
  236. }
  237. if !util.IsApproximately(a.CPUCost, that.CPUCost) {
  238. return false
  239. }
  240. if !util.IsApproximately(a.CPUCostAdjustment, that.CPUCostAdjustment) {
  241. return false
  242. }
  243. if !util.IsApproximately(a.GPUHours, that.GPUHours) {
  244. return false
  245. }
  246. if !util.IsApproximately(a.GPUCost, that.GPUCost) {
  247. return false
  248. }
  249. if !util.IsApproximately(a.GPUCostAdjustment, that.GPUCostAdjustment) {
  250. return false
  251. }
  252. if !util.IsApproximately(a.NetworkTransferBytes, that.NetworkTransferBytes) {
  253. return false
  254. }
  255. if !util.IsApproximately(a.NetworkReceiveBytes, that.NetworkReceiveBytes) {
  256. return false
  257. }
  258. if !util.IsApproximately(a.NetworkCost, that.NetworkCost) {
  259. return false
  260. }
  261. if !util.IsApproximately(a.NetworkCostAdjustment, that.NetworkCostAdjustment) {
  262. return false
  263. }
  264. if !util.IsApproximately(a.LoadBalancerCost, that.LoadBalancerCost) {
  265. return false
  266. }
  267. if !util.IsApproximately(a.LoadBalancerCostAdjustment, that.LoadBalancerCostAdjustment) {
  268. return false
  269. }
  270. if !util.IsApproximately(a.PVCostAdjustment, that.PVCostAdjustment) {
  271. return false
  272. }
  273. if !util.IsApproximately(a.RAMByteHours, that.RAMByteHours) {
  274. return false
  275. }
  276. if !util.IsApproximately(a.RAMCost, that.RAMCost) {
  277. return false
  278. }
  279. if !util.IsApproximately(a.RAMCostAdjustment, that.RAMCostAdjustment) {
  280. return false
  281. }
  282. if !util.IsApproximately(a.SharedCost, that.SharedCost) {
  283. return false
  284. }
  285. if !util.IsApproximately(a.ExternalCost, that.ExternalCost) {
  286. return false
  287. }
  288. if a.RawAllocationOnly == nil && that.RawAllocationOnly != nil {
  289. return false
  290. }
  291. if a.RawAllocationOnly != nil && that.RawAllocationOnly == nil {
  292. return false
  293. }
  294. if a.RawAllocationOnly != nil && that.RawAllocationOnly != nil {
  295. if !util.IsApproximately(a.RawAllocationOnly.CPUCoreUsageMax, that.RawAllocationOnly.CPUCoreUsageMax) {
  296. return false
  297. }
  298. if !util.IsApproximately(a.RawAllocationOnly.RAMBytesUsageMax, that.RawAllocationOnly.RAMBytesUsageMax) {
  299. return false
  300. }
  301. }
  302. aPVs := a.PVs
  303. thatPVs := that.PVs
  304. if len(aPVs) == len(thatPVs) {
  305. for k, pv := range aPVs {
  306. tv, ok := thatPVs[k]
  307. if !ok || *tv != *pv {
  308. return false
  309. }
  310. }
  311. } else {
  312. return false
  313. }
  314. return true
  315. }
  316. // TotalCost is the total cost of the Allocation including adjustments
  317. func (a *Allocation) TotalCost() float64 {
  318. return a.CPUTotalCost() + a.GPUTotalCost() + a.RAMTotalCost() + a.PVTotalCost() + a.NetworkTotalCost() + a.LBTotalCost() + a.SharedTotalCost() + a.ExternalCost
  319. }
  320. // CPUTotalCost calculates total CPU cost of Allocation including adjustment
  321. func (a *Allocation) CPUTotalCost() float64 {
  322. return a.CPUCost + a.CPUCostAdjustment
  323. }
  324. // GPUTotalCost calculates total GPU cost of Allocation including adjustment
  325. func (a *Allocation) GPUTotalCost() float64 {
  326. return a.GPUCost + a.GPUCostAdjustment
  327. }
  328. // RAMTotalCost calculates total RAM cost of Allocation including adjustment
  329. func (a *Allocation) RAMTotalCost() float64 {
  330. return a.RAMCost + a.RAMCostAdjustment
  331. }
  332. // PVTotalCost calculates total PV cost of Allocation including adjustment
  333. func (a *Allocation) PVTotalCost() float64 {
  334. return a.PVCost() + a.PVCostAdjustment
  335. }
  336. // NetworkTotalCost calculates total Network cost of Allocation including adjustment
  337. func (a *Allocation) NetworkTotalCost() float64 {
  338. return a.NetworkCost + a.NetworkCostAdjustment
  339. }
  340. // LBTotalCost calculates total LB cost of Allocation including adjustment
  341. func (a *Allocation) LBTotalCost() float64 {
  342. return a.LoadBalancerCost + a.LoadBalancerCostAdjustment
  343. }
  344. // SharedTotalCost calculates total shared cost of Allocation including adjustment
  345. func (a *Allocation) SharedTotalCost() float64 {
  346. return a.SharedCost
  347. }
  348. // PVCost calculate cumulative cost of all PVs that Allocation is attached to
  349. func (a *Allocation) PVCost() float64 {
  350. cost := 0.0
  351. for _, pv := range a.PVs {
  352. cost += pv.Cost
  353. }
  354. return cost
  355. }
  356. // PVByteHours calculate cumulative ByteHours of all PVs that Allocation is attached to
  357. func (a *Allocation) PVByteHours() float64 {
  358. byteHours := 0.0
  359. for _, pv := range a.PVs {
  360. byteHours += pv.ByteHours
  361. }
  362. return byteHours
  363. }
  364. // CPUEfficiency is the ratio of usage to request. If there is no request and
  365. // no usage or cost, then efficiency is zero. If there is no request, but there
  366. // is usage or cost, then efficiency is 100%.
  367. func (a *Allocation) CPUEfficiency() float64 {
  368. if a.CPUCoreRequestAverage > 0 {
  369. return a.CPUCoreUsageAverage / a.CPUCoreRequestAverage
  370. }
  371. if a.CPUCoreUsageAverage == 0.0 || a.CPUCost == 0.0 {
  372. return 0.0
  373. }
  374. return 1.0
  375. }
  376. // RAMEfficiency is the ratio of usage to request. If there is no request and
  377. // no usage or cost, then efficiency is zero. If there is no request, but there
  378. // is usage or cost, then efficiency is 100%.
  379. func (a *Allocation) RAMEfficiency() float64 {
  380. if a.RAMBytesRequestAverage > 0 {
  381. return a.RAMBytesUsageAverage / a.RAMBytesRequestAverage
  382. }
  383. if a.RAMBytesUsageAverage == 0.0 || a.RAMCost == 0.0 {
  384. return 0.0
  385. }
  386. return 1.0
  387. }
  388. // TotalEfficiency is the cost-weighted average of CPU and RAM efficiency. If
  389. // there is no cost at all, then efficiency is zero.
  390. func (a *Allocation) TotalEfficiency() float64 {
  391. if a.RAMTotalCost()+a.CPUTotalCost() > 0 {
  392. ramCostEff := a.RAMEfficiency() * a.RAMTotalCost()
  393. cpuCostEff := a.CPUEfficiency() * a.CPUTotalCost()
  394. return (ramCostEff + cpuCostEff) / (a.CPUTotalCost() + a.RAMTotalCost())
  395. }
  396. return 0.0
  397. }
  398. // CPUCores converts the Allocation's CPUCoreHours into average CPUCores
  399. func (a *Allocation) CPUCores() float64 {
  400. if a.Minutes() <= 0.0 {
  401. return 0.0
  402. }
  403. return a.CPUCoreHours / (a.Minutes() / 60.0)
  404. }
  405. // RAMBytes converts the Allocation's RAMByteHours into average RAMBytes
  406. func (a *Allocation) RAMBytes() float64 {
  407. if a.Minutes() <= 0.0 {
  408. return 0.0
  409. }
  410. return a.RAMByteHours / (a.Minutes() / 60.0)
  411. }
  412. // GPUs converts the Allocation's GPUHours into average GPUs
  413. func (a *Allocation) GPUs() float64 {
  414. if a.Minutes() <= 0.0 {
  415. return 0.0
  416. }
  417. return a.GPUHours / (a.Minutes() / 60.0)
  418. }
  419. // PVBytes converts the Allocation's PVByteHours into average PVBytes
  420. func (a *Allocation) PVBytes() float64 {
  421. if a.Minutes() <= 0.0 {
  422. return 0.0
  423. }
  424. return a.PVByteHours() / (a.Minutes() / 60.0)
  425. }
  426. // ResetAdjustments sets all cost adjustment fields to zero
  427. func (a *Allocation) ResetAdjustments() {
  428. a.CPUCostAdjustment = 0.0
  429. a.GPUCostAdjustment = 0.0
  430. a.RAMCostAdjustment = 0.0
  431. a.PVCostAdjustment = 0.0
  432. a.NetworkCostAdjustment = 0.0
  433. a.LoadBalancerCostAdjustment = 0.0
  434. }
  435. // MarshalJSON implements json.Marshaler interface
  436. func (a *Allocation) MarshalJSON() ([]byte, error) {
  437. buffer := bytes.NewBufferString("{")
  438. jsonEncodeString(buffer, "name", a.Name, ",")
  439. jsonEncode(buffer, "properties", a.Properties, ",")
  440. jsonEncode(buffer, "window", a.Window, ",")
  441. jsonEncodeString(buffer, "start", a.Start.Format(time.RFC3339), ",")
  442. jsonEncodeString(buffer, "end", a.End.Format(time.RFC3339), ",")
  443. jsonEncodeFloat64(buffer, "minutes", a.Minutes(), ",")
  444. jsonEncodeFloat64(buffer, "cpuCores", a.CPUCores(), ",")
  445. jsonEncodeFloat64(buffer, "cpuCoreRequestAverage", a.CPUCoreRequestAverage, ",")
  446. jsonEncodeFloat64(buffer, "cpuCoreUsageAverage", a.CPUCoreUsageAverage, ",")
  447. jsonEncodeFloat64(buffer, "cpuCoreHours", a.CPUCoreHours, ",")
  448. jsonEncodeFloat64(buffer, "cpuCost", a.CPUCost, ",")
  449. jsonEncodeFloat64(buffer, "cpuCostAdjustment", a.CPUCostAdjustment, ",")
  450. jsonEncodeFloat64(buffer, "cpuEfficiency", a.CPUEfficiency(), ",")
  451. jsonEncodeFloat64(buffer, "gpuCount", a.GPUs(), ",")
  452. jsonEncodeFloat64(buffer, "gpuHours", a.GPUHours, ",")
  453. jsonEncodeFloat64(buffer, "gpuCost", a.GPUCost, ",")
  454. jsonEncodeFloat64(buffer, "gpuCostAdjustment", a.GPUCostAdjustment, ",")
  455. jsonEncodeFloat64(buffer, "networkTransferBytes", a.NetworkTransferBytes, ",")
  456. jsonEncodeFloat64(buffer, "networkReceiveBytes", a.NetworkReceiveBytes, ",")
  457. jsonEncodeFloat64(buffer, "networkCost", a.NetworkCost, ",")
  458. jsonEncodeFloat64(buffer, "networkCostAdjustment", a.NetworkCostAdjustment, ",")
  459. jsonEncodeFloat64(buffer, "loadBalancerCost", a.LoadBalancerCost, ",")
  460. jsonEncodeFloat64(buffer, "loadBalancerCostAdjustment", a.LoadBalancerCostAdjustment, ",")
  461. jsonEncodeFloat64(buffer, "pvBytes", a.PVBytes(), ",")
  462. jsonEncodeFloat64(buffer, "pvByteHours", a.PVByteHours(), ",")
  463. jsonEncodeFloat64(buffer, "pvCost", a.PVCost(), ",")
  464. jsonEncode(buffer, "pvs", a.PVs, ",") // Todo Sean: this does not work properly
  465. jsonEncodeFloat64(buffer, "pvCostAdjustment", a.PVCostAdjustment, ",")
  466. jsonEncodeFloat64(buffer, "ramBytes", a.RAMBytes(), ",")
  467. jsonEncodeFloat64(buffer, "ramByteRequestAverage", a.RAMBytesRequestAverage, ",")
  468. jsonEncodeFloat64(buffer, "ramByteUsageAverage", a.RAMBytesUsageAverage, ",")
  469. jsonEncodeFloat64(buffer, "ramByteHours", a.RAMByteHours, ",")
  470. jsonEncodeFloat64(buffer, "ramCost", a.RAMCost, ",")
  471. jsonEncodeFloat64(buffer, "ramCostAdjustment", a.RAMCostAdjustment, ",")
  472. jsonEncodeFloat64(buffer, "ramEfficiency", a.RAMEfficiency(), ",")
  473. jsonEncodeFloat64(buffer, "sharedCost", a.SharedCost, ",")
  474. jsonEncodeFloat64(buffer, "externalCost", a.ExternalCost, ",")
  475. jsonEncodeFloat64(buffer, "totalCost", a.TotalCost(), ",")
  476. jsonEncodeFloat64(buffer, "totalEfficiency", a.TotalEfficiency(), ",")
  477. jsonEncode(buffer, "rawAllocationOnly", a.RawAllocationOnly, "")
  478. buffer.WriteString("}")
  479. return buffer.Bytes(), nil
  480. }
  481. // Resolution returns the duration of time covered by the Allocation
  482. func (a *Allocation) Resolution() time.Duration {
  483. return a.End.Sub(a.Start)
  484. }
  485. // IsAggregated is true if the given Allocation has been aggregated, which we
  486. // define by a lack of AllocationProperties.
  487. func (a *Allocation) IsAggregated() bool {
  488. return a == nil || a.Properties == nil
  489. }
  490. // IsExternal is true if the given Allocation represents external costs.
  491. func (a *Allocation) IsExternal() bool {
  492. return strings.Contains(a.Name, ExternalSuffix)
  493. }
  494. // IsIdle is true if the given Allocation represents idle costs.
  495. func (a *Allocation) IsIdle() bool {
  496. return strings.Contains(a.Name, IdleSuffix)
  497. }
  498. // IsUnallocated is true if the given Allocation represents unallocated costs.
  499. func (a *Allocation) IsUnallocated() bool {
  500. return strings.Contains(a.Name, UnallocatedSuffix)
  501. }
  502. // IsUnmounted is true if the given Allocation represents unmounted volume costs.
  503. func (a *Allocation) IsUnmounted() bool {
  504. return strings.Contains(a.Name, UnmountedSuffix)
  505. }
  506. // Minutes returns the number of minutes the Allocation represents, as defined
  507. // by the difference between the end and start times.
  508. func (a *Allocation) Minutes() float64 {
  509. return a.End.Sub(a.Start).Minutes()
  510. }
  511. // Share adds the TotalCost of the given Allocation to the SharedCost of the
  512. // receiving Allocation. No Start, End, Window, or AllocationProperties are considered.
  513. // Neither Allocation is mutated; a new Allocation is always returned.
  514. func (a *Allocation) Share(that *Allocation) (*Allocation, error) {
  515. if that == nil {
  516. return a.Clone(), nil
  517. }
  518. if a == nil {
  519. return nil, fmt.Errorf("cannot share with nil Allocation")
  520. }
  521. agg := a.Clone()
  522. agg.SharedCost += that.TotalCost()
  523. return agg, nil
  524. }
  525. // String represents the given Allocation as a string
  526. func (a *Allocation) String() string {
  527. return fmt.Sprintf("%s%s=%.2f", a.Name, NewWindow(&a.Start, &a.End), a.TotalCost())
  528. }
  529. func (a *Allocation) add(that *Allocation) {
  530. if a == nil {
  531. log.Warningf("Allocation.AggregateBy: trying to add a nil receiver")
  532. return
  533. }
  534. // Preserve string properties that are matching between the two allocations
  535. a.Properties = a.Properties.Intersection(that.Properties)
  536. // Expand the window to encompass both Allocations
  537. a.Window = a.Window.Expand(that.Window)
  538. // Sum non-cumulative fields by turning them into cumulative, adding them,
  539. // and then converting them back into averages after minutes have been
  540. // combined (just below).
  541. cpuReqCoreMins := a.CPUCoreRequestAverage * a.Minutes()
  542. cpuReqCoreMins += that.CPUCoreRequestAverage * that.Minutes()
  543. cpuUseCoreMins := a.CPUCoreUsageAverage * a.Minutes()
  544. cpuUseCoreMins += that.CPUCoreUsageAverage * that.Minutes()
  545. ramReqByteMins := a.RAMBytesRequestAverage * a.Minutes()
  546. ramReqByteMins += that.RAMBytesRequestAverage * that.Minutes()
  547. ramUseByteMins := a.RAMBytesUsageAverage * a.Minutes()
  548. ramUseByteMins += that.RAMBytesUsageAverage * that.Minutes()
  549. // Expand Start and End to be the "max" of among the given Allocations
  550. if that.Start.Before(a.Start) {
  551. a.Start = that.Start
  552. }
  553. if that.End.After(a.End) {
  554. a.End = that.End
  555. }
  556. // Convert cumulative request and usage back into rates
  557. // TODO:TEST write a unit test that fails if this is done incorrectly
  558. if a.Minutes() > 0 {
  559. a.CPUCoreRequestAverage = cpuReqCoreMins / a.Minutes()
  560. a.CPUCoreUsageAverage = cpuUseCoreMins / a.Minutes()
  561. a.RAMBytesRequestAverage = ramReqByteMins / a.Minutes()
  562. a.RAMBytesUsageAverage = ramUseByteMins / a.Minutes()
  563. } else {
  564. a.CPUCoreRequestAverage = 0.0
  565. a.CPUCoreUsageAverage = 0.0
  566. a.RAMBytesRequestAverage = 0.0
  567. a.RAMBytesUsageAverage = 0.0
  568. }
  569. // Sum all cumulative resource fields
  570. a.CPUCoreHours += that.CPUCoreHours
  571. a.GPUHours += that.GPUHours
  572. a.RAMByteHours += that.RAMByteHours
  573. a.NetworkTransferBytes += that.NetworkTransferBytes
  574. a.NetworkReceiveBytes += that.NetworkReceiveBytes
  575. // Sum all cumulative cost fields
  576. a.CPUCost += that.CPUCost
  577. a.GPUCost += that.GPUCost
  578. a.RAMCost += that.RAMCost
  579. a.NetworkCost += that.NetworkCost
  580. a.LoadBalancerCost += that.LoadBalancerCost
  581. a.SharedCost += that.SharedCost
  582. a.ExternalCost += that.ExternalCost
  583. // Sum PVAllocations
  584. a.PVs = a.PVs.Add(that.PVs)
  585. // Sum all cumulative adjustment fields
  586. a.CPUCostAdjustment += that.CPUCostAdjustment
  587. a.RAMCostAdjustment += that.RAMCostAdjustment
  588. a.GPUCostAdjustment += that.GPUCostAdjustment
  589. a.PVCostAdjustment += that.PVCostAdjustment
  590. a.NetworkCostAdjustment += that.NetworkCostAdjustment
  591. a.LoadBalancerCostAdjustment += that.LoadBalancerCostAdjustment
  592. // Any data that is in a "raw allocation only" is not valid in any
  593. // sort of cumulative Allocation (like one that is added).
  594. a.RawAllocationOnly = nil
  595. }
  596. // AllocationSet stores a set of Allocations, each with a unique name, that share
  597. // a window. An AllocationSet is mutable, so treat it like a threadsafe map.
  598. type AllocationSet struct {
  599. sync.RWMutex
  600. allocations map[string]*Allocation
  601. externalKeys map[string]bool
  602. idleKeys map[string]bool
  603. FromSource string // stores the name of the source used to compute the data
  604. Window Window
  605. Warnings []string
  606. Errors []string
  607. }
  608. // NewAllocationSet instantiates a new AllocationSet and, optionally, inserts
  609. // the given list of Allocations
  610. func NewAllocationSet(start, end time.Time, allocs ...*Allocation) *AllocationSet {
  611. as := &AllocationSet{
  612. allocations: map[string]*Allocation{},
  613. externalKeys: map[string]bool{},
  614. idleKeys: map[string]bool{},
  615. Window: NewWindow(&start, &end),
  616. }
  617. for _, a := range allocs {
  618. as.Insert(a)
  619. }
  620. return as
  621. }
  622. // AllocationAggregationOptions provide advanced functionality to AggregateBy, including
  623. // filtering results and sharing allocations. FilterFuncs are a list of match
  624. // functions such that, if any function fails, the allocation is ignored.
  625. // ShareFuncs are a list of match functions such that, if any function
  626. // succeeds, the allocation is marked as a shared resource. ShareIdle is a
  627. // simple flag for sharing idle resources.
  628. type AllocationAggregationOptions struct {
  629. FilterFuncs []AllocationMatchFunc
  630. IdleByNode bool
  631. LabelConfig *LabelConfig
  632. MergeUnallocated bool
  633. SharedHourlyCosts map[string]float64
  634. ShareFuncs []AllocationMatchFunc
  635. ShareIdle string
  636. ShareSplit string
  637. SplitIdle bool
  638. }
  639. // AggregateBy aggregates the Allocations in the given AllocationSet by the given
  640. // AllocationProperty. This will only be legal if the AllocationSet is divisible by the
  641. // given AllocationProperty; e.g. Containers can be divided by Namespace, but not vice-a-versa.
  642. func (as *AllocationSet) AggregateBy(aggregateBy []string, options *AllocationAggregationOptions) error {
  643. // The order of operations for aggregating allocations is as follows:
  644. // 1. Partition external, idle, and shared allocations into separate sets.
  645. // Also, create the aggSet into which the results will be aggregated.
  646. // 2. Compute sharing coefficients for idle and shared resources
  647. // a) if idle allocation is to be shared, compute idle coefficients
  648. // b) if idle allocation is NOT shared, but filters are present, compute
  649. // idle filtration coefficients for the purpose of only returning the
  650. // portion of idle allocation that would have been shared with the
  651. // unfiltered results. (See unit tests 5.a,b,c)
  652. // c) generate shared allocation for then given shared overhead, which
  653. // must happen after (2a) and (2b)
  654. // d) if there are shared resources, compute share coefficients
  655. // 3. Drop any allocation that fails any of the filters
  656. // 4. Distribute idle allocations according to the idle coefficients
  657. // 5. Generate aggregation key and insert allocation into the output set
  658. // 6. If idle is shared and resources are shared, some idle might be shared
  659. // with a shared resource. Distribute that to the shared resources
  660. // prior to sharing them with the aggregated results.
  661. // 7. Apply idle filtration coefficients from step (2b)
  662. // 8. Distribute shared allocations according to the share coefficients.
  663. // 9. If there are external allocations that can be aggregated into
  664. // the output (i.e. they can be used to generate a valid key for
  665. // the given properties) then aggregate; otherwise... ignore them?
  666. // 10. If the merge idle option is enabled, merge any remaining idle
  667. // allocations into a single idle allocation
  668. if as.IsEmpty() {
  669. return nil
  670. }
  671. if options == nil {
  672. options = &AllocationAggregationOptions{}
  673. }
  674. if options.LabelConfig == nil {
  675. options.LabelConfig = NewLabelConfig()
  676. }
  677. // If aggregateBy is nil, we don't aggregate anything. On the other hand,
  678. // an empty slice implies that we should aggregate everything. See
  679. // generateKey for why that makes sense.
  680. shouldAggregate := aggregateBy != nil
  681. shouldFilter := len(options.FilterFuncs) > 0
  682. shouldShare := len(options.SharedHourlyCosts) > 0 || len(options.ShareFuncs) > 0
  683. if !shouldAggregate && !shouldFilter && !shouldShare {
  684. // There is nothing for AggregateBy to do, so simply return nil
  685. return nil
  686. }
  687. // aggSet will collect the aggregated allocations
  688. aggSet := &AllocationSet{
  689. Window: as.Window.Clone(),
  690. }
  691. // externalSet will collect external allocations
  692. externalSet := &AllocationSet{
  693. Window: as.Window.Clone(),
  694. }
  695. // idleSet will be shared among aggSet after initial aggregation
  696. // is complete
  697. idleSet := &AllocationSet{
  698. Window: as.Window.Clone(),
  699. }
  700. // shareSet will be shared among aggSet after initial aggregation
  701. // is complete
  702. shareSet := &AllocationSet{
  703. Window: as.Window.Clone(),
  704. }
  705. as.Lock()
  706. defer as.Unlock()
  707. // (1) Loop and find all of the external, idle, and shared allocations. Add
  708. // them to their respective sets, removing them from the set of allocations
  709. // to aggregate.
  710. for _, alloc := range as.allocations {
  711. // External allocations get aggregated post-hoc (see step 6) and do
  712. // not necessarily contain complete sets of properties, so they are
  713. // moved to a separate AllocationSet.
  714. if alloc.IsExternal() {
  715. delete(as.externalKeys, alloc.Name)
  716. delete(as.allocations, alloc.Name)
  717. externalSet.Insert(alloc)
  718. continue
  719. }
  720. // Idle allocations should be separated into idleSet if they are to be
  721. // shared later on. If they are not to be shared, then add them to the
  722. // aggSet like any other allocation.
  723. if alloc.IsIdle() {
  724. delete(as.idleKeys, alloc.Name)
  725. delete(as.allocations, alloc.Name)
  726. if options.ShareIdle == ShareEven || options.ShareIdle == ShareWeighted {
  727. idleSet.Insert(alloc)
  728. } else {
  729. aggSet.Insert(alloc)
  730. }
  731. continue
  732. }
  733. // Shared allocations must be identified and separated prior to
  734. // aggregation and filtering. That is, if any of the ShareFuncs return
  735. // true for the allocation, then move it to shareSet.
  736. for _, sf := range options.ShareFuncs {
  737. if sf(alloc) {
  738. delete(as.idleKeys, alloc.Name)
  739. delete(as.allocations, alloc.Name)
  740. shareSet.Insert(alloc)
  741. break
  742. }
  743. }
  744. }
  745. // It's possible that no more un-shared, non-idle, non-external allocations
  746. // remain at this point. This always results in an emptySet, so return early.
  747. if len(as.allocations) == 0 {
  748. emptySet := &AllocationSet{
  749. Window: as.Window.Clone(),
  750. }
  751. as.allocations = emptySet.allocations
  752. return nil
  753. }
  754. // (2) In order to correctly share idle and shared costs, we first compute
  755. // sharing coefficients, which represent the proportion of each cost to
  756. // share with each allocation. Idle allocations are shared per-cluster or per-node,
  757. // per-allocation, and per-resource, while shared resources are shared per-
  758. // allocation only.
  759. //
  760. // For an idleCoefficient example, the entries:
  761. // [cluster1][cluster1/namespace1/pod1/container1][cpu] = 0.166667
  762. // [cluster1][cluster1/namespace1/pod1/container1][gpu] = 0.166667
  763. // [cluster1][cluster1/namespace1/pod1/container1][ram] = 0.687500
  764. // mean that the allocation "cluster1/namespace1/pod1/container1" will
  765. // receive 16.67% of cluster1's idle CPU and GPU costs and 68.75% of its
  766. // RAM costs.
  767. //
  768. // For a shareCoefficient example, the entries:
  769. // [namespace2] = 0.666667
  770. // [__filtered__] = 0.333333
  771. // mean that the post-aggregation allocation "namespace2" will receive
  772. // 66.67% of the shared resource costs, while the remaining 33.33% will
  773. // be filtered out, as they were shared with allocations that did not pass
  774. // one of the given filters.
  775. //
  776. // In order to maintain stable results when multiple operations are being
  777. // carried out (e.g. sharing idle, sharing resources, and filtering) these
  778. // coefficients are computed for the full set of allocations prior to
  779. // adding shared overhead and prior to applying filters.
  780. var err error
  781. // (2a) If there are idle costs to be shared, compute the coefficients for
  782. // sharing them among the non-idle, non-aggregated allocations (including
  783. // the shared allocations).
  784. var idleCoefficients map[string]map[string]map[string]float64
  785. if idleSet.Length() > 0 && options.ShareIdle != ShareNone {
  786. idleCoefficients, err = computeIdleCoeffs(options, as, shareSet)
  787. if err != nil {
  788. log.Warningf("AllocationSet.AggregateBy: compute idle coeff: %s", err)
  789. return fmt.Errorf("error computing idle coefficients: %s", err)
  790. }
  791. }
  792. // (2b) If idle costs are not to be shared, but there are filters, then we
  793. // need to track the amount of each idle allocation to "filter" in order to
  794. // maintain parity with the results when idle is shared. That is, we want
  795. // to return only the idle costs that would have been shared with the given
  796. // results, even if the filter had not been applied.
  797. //
  798. // For example, consider these results from aggregating by namespace with
  799. // two clusters:
  800. //
  801. // namespace1: 25.00
  802. // namespace2: 30.00
  803. // namespace3: 15.00
  804. // idle: 30.00
  805. //
  806. // When we then filter by cluster==cluster1, namespaces 2 and 3 are
  807. // reduced by the amount that existed on cluster2. Then, idle must also be
  808. // reduced by the relevant amount:
  809. //
  810. // namespace1: 25.00
  811. // namespace2: 15.00
  812. // idle: 20.00
  813. //
  814. // Note that this can happen for any field, not just cluster, so we again
  815. // need to track this on a per-cluster or per-node, per-allocation, per-resource basis.
  816. var idleFiltrationCoefficients map[string]map[string]map[string]float64
  817. if len(options.FilterFuncs) > 0 && options.ShareIdle == ShareNone {
  818. idleFiltrationCoefficients, err = computeIdleCoeffs(options, as, shareSet)
  819. if err != nil {
  820. return fmt.Errorf("error computing idle filtration coefficients: %s", err)
  821. }
  822. }
  823. // (2c) Convert SharedHourlyCosts to Allocations in the shareSet. This must
  824. // come after idle coefficients are computes so that allocations generated
  825. // by shared overhead do not skew the idle coefficient computation.
  826. for name, cost := range options.SharedHourlyCosts {
  827. if cost > 0.0 {
  828. hours := as.Resolution().Hours()
  829. // If set ends in the future, adjust hours accordingly
  830. diff := time.Since(as.End())
  831. if diff < 0.0 {
  832. hours += diff.Hours()
  833. }
  834. totalSharedCost := cost * hours
  835. shareSet.Insert(&Allocation{
  836. Name: fmt.Sprintf("%s/%s", name, SharedSuffix),
  837. Start: as.Start(),
  838. End: as.End(),
  839. SharedCost: totalSharedCost,
  840. Properties: &AllocationProperties{Cluster: SharedSuffix}, // The allocation needs to belong to a cluster,but it really doesn't matter which one, so just make it clear.
  841. })
  842. }
  843. }
  844. // (2d) Compute share coefficients for shared resources. These are computed
  845. // after idle coefficients, and are computed for the aggregated allocations
  846. // of the main allocation set. See above for details and an example.
  847. var shareCoefficients map[string]float64
  848. if shareSet.Length() > 0 {
  849. shareCoefficients, err = computeShareCoeffs(aggregateBy, options, as)
  850. if err != nil {
  851. return fmt.Errorf("error computing share coefficients: %s", err)
  852. }
  853. }
  854. // (3-5) Filter, distribute idle cost, and aggregate (in that order)
  855. for _, alloc := range as.allocations {
  856. idleId, err := alloc.getIdleId(options)
  857. if err != nil {
  858. log.DedupedWarningf(3, "AllocationSet.AggregateBy: missing idleId for allocation: %s", alloc.Name)
  859. }
  860. skip := false
  861. // (3) If any of the filter funcs fail, immediately skip the allocation.
  862. for _, ff := range options.FilterFuncs {
  863. if !ff(alloc) {
  864. skip = true
  865. break
  866. }
  867. }
  868. if skip {
  869. // If we are tracking idle filtration coefficients, delete the
  870. // entry corresponding to the filtered allocation. (Deleting the
  871. // entry will result in that proportional amount being removed
  872. // from the idle allocation at the end of the process.)
  873. if idleFiltrationCoefficients != nil {
  874. if ifcc, ok := idleFiltrationCoefficients[idleId]; ok {
  875. delete(ifcc, alloc.Name)
  876. }
  877. }
  878. continue
  879. }
  880. // (4) Distribute idle allocations according to the idle coefficients
  881. // NOTE: if idle allocation is off (i.e. ShareIdle == ShareNone) then
  882. // all idle allocations will be in the aggSet at this point, so idleSet
  883. // will be empty and we won't enter this block.
  884. if idleSet.Length() > 0 {
  885. // Distribute idle allocations by coefficient per-idleId, per-allocation
  886. for _, idleAlloc := range idleSet.allocations {
  887. // Only share idle if the idleId matches; i.e. the allocation
  888. // is from the same idleId as the idle costs
  889. iaidleId, err := idleAlloc.getIdleId(options)
  890. if err != nil {
  891. log.Errorf("AllocationSet.AggregateBy: Idle allocation is missing idleId %s", idleAlloc.Name)
  892. return err
  893. }
  894. if iaidleId != idleId {
  895. continue
  896. }
  897. // Make sure idle coefficients exist
  898. if _, ok := idleCoefficients[idleId]; !ok {
  899. log.Warningf("AllocationSet.AggregateBy: error getting idle coefficient: no idleId '%s' for '%s'", idleId, alloc.Name)
  900. continue
  901. }
  902. if _, ok := idleCoefficients[idleId][alloc.Name]; !ok {
  903. log.Warningf("AllocationSet.AggregateBy: error getting idle coefficient for '%s'", alloc.Name)
  904. continue
  905. }
  906. alloc.CPUCoreHours += idleAlloc.CPUCoreHours * idleCoefficients[idleId][alloc.Name]["cpu"]
  907. alloc.GPUHours += idleAlloc.GPUHours * idleCoefficients[idleId][alloc.Name]["gpu"]
  908. alloc.RAMByteHours += idleAlloc.RAMByteHours * idleCoefficients[idleId][alloc.Name]["ram"]
  909. idleCPUCost := idleAlloc.CPUCost * idleCoefficients[idleId][alloc.Name]["cpu"]
  910. idleGPUCost := idleAlloc.GPUCost * idleCoefficients[idleId][alloc.Name]["gpu"]
  911. idleRAMCost := idleAlloc.RAMCost * idleCoefficients[idleId][alloc.Name]["ram"]
  912. alloc.CPUCost += idleCPUCost
  913. alloc.GPUCost += idleGPUCost
  914. alloc.RAMCost += idleRAMCost
  915. }
  916. }
  917. // (5) generate key to use for aggregation-by-key and allocation name
  918. key := alloc.generateKey(aggregateBy, options.LabelConfig)
  919. alloc.Name = key
  920. if options.MergeUnallocated && alloc.IsUnallocated() {
  921. alloc.Name = UnallocatedSuffix
  922. }
  923. // Inserting the allocation with the generated key for a name will
  924. // perform the actual basic aggregation step.
  925. aggSet.Insert(alloc)
  926. }
  927. // (6) If idle is shared and resources are shared, it's possible that some
  928. // amount of idle cost will be shared with a shared resource. Distribute
  929. // that idle allocation, if it exists, to the respective shared allocations
  930. // before sharing with the aggregated allocations.
  931. if idleSet.Length() > 0 && shareSet.Length() > 0 {
  932. for _, alloc := range shareSet.allocations {
  933. idleId, err := alloc.getIdleId(options)
  934. if err != nil {
  935. log.DedupedWarningf(3, "AllocationSet.AggregateBy: missing idleId for allocation: %s", alloc.Name)
  936. }
  937. // Distribute idle allocations by coefficient per-idleId, per-allocation
  938. for _, idleAlloc := range idleSet.allocations {
  939. // Only share idle if the idleId matches; i.e. the allocation
  940. // is from the same idleId as the idle costs
  941. iaidleId, _ := idleAlloc.getIdleId(options)
  942. if iaidleId != idleId {
  943. continue
  944. }
  945. // Make sure idle coefficients exist
  946. if _, ok := idleCoefficients[idleId]; !ok {
  947. log.Warningf("AllocationSet.AggregateBy: error getting idle coefficient: no idleId '%s' for '%s'", idleId, alloc.Name)
  948. continue
  949. }
  950. if _, ok := idleCoefficients[idleId][alloc.Name]; !ok {
  951. log.Warningf("AllocationSet.AggregateBy: error getting idle coefficient for '%s'", alloc.Name)
  952. continue
  953. }
  954. alloc.CPUCoreHours += idleAlloc.CPUCoreHours * idleCoefficients[idleId][alloc.Name]["cpu"]
  955. alloc.GPUHours += idleAlloc.GPUHours * idleCoefficients[idleId][alloc.Name]["gpu"]
  956. alloc.RAMByteHours += idleAlloc.RAMByteHours * idleCoefficients[idleId][alloc.Name]["ram"]
  957. idleCPUCost := idleAlloc.CPUCost * idleCoefficients[idleId][alloc.Name]["cpu"]
  958. idleGPUCost := idleAlloc.GPUCost * idleCoefficients[idleId][alloc.Name]["gpu"]
  959. idleRAMCost := idleAlloc.RAMCost * idleCoefficients[idleId][alloc.Name]["ram"]
  960. alloc.CPUCost += idleCPUCost
  961. alloc.GPUCost += idleGPUCost
  962. alloc.RAMCost += idleRAMCost
  963. }
  964. }
  965. }
  966. // groupingIdleFiltrationCoeffs is used to track per-resource idle
  967. // coefficients on a cluster-by-cluster or node-by-node basis depending
  968. // on the IdleByNode option. It is, essentailly, an aggregation of
  969. // idleFiltrationCoefficients after they have been
  970. // filtered above (in step 3)
  971. var groupingIdleFiltrationCoeffs map[string]map[string]float64
  972. if idleFiltrationCoefficients != nil {
  973. groupingIdleFiltrationCoeffs = map[string]map[string]float64{}
  974. for idleId, m := range idleFiltrationCoefficients {
  975. if _, ok := groupingIdleFiltrationCoeffs[idleId]; !ok {
  976. groupingIdleFiltrationCoeffs[idleId] = map[string]float64{
  977. "cpu": 0.0,
  978. "gpu": 0.0,
  979. "ram": 0.0,
  980. }
  981. }
  982. for _, n := range m {
  983. for resource, val := range n {
  984. groupingIdleFiltrationCoeffs[idleId][resource] += val
  985. }
  986. }
  987. }
  988. }
  989. // (7) If we have both un-shared idle allocations and idle filtration
  990. // coefficients then apply those. See step (2b) for an example.
  991. if len(aggSet.idleKeys) > 0 && groupingIdleFiltrationCoeffs != nil {
  992. for idleKey := range aggSet.idleKeys {
  993. idleAlloc := aggSet.Get(idleKey)
  994. iaidleId, err := idleAlloc.getIdleId(options)
  995. if err != nil {
  996. log.Errorf("AllocationSet.AggregateBy: Idle allocation is missing idleId %s", idleAlloc.Name)
  997. return err
  998. }
  999. if resourceCoeffs, ok := groupingIdleFiltrationCoeffs[iaidleId]; ok {
  1000. idleAlloc.CPUCost *= resourceCoeffs["cpu"]
  1001. idleAlloc.CPUCoreHours *= resourceCoeffs["cpu"]
  1002. idleAlloc.RAMCost *= resourceCoeffs["ram"]
  1003. idleAlloc.RAMByteHours *= resourceCoeffs["ram"]
  1004. }
  1005. }
  1006. }
  1007. // (8) Distribute shared allocations according to the share coefficients.
  1008. if shareSet.Length() > 0 {
  1009. for _, alloc := range aggSet.allocations {
  1010. for _, sharedAlloc := range shareSet.allocations {
  1011. if _, ok := shareCoefficients[alloc.Name]; !ok {
  1012. if !alloc.IsIdle() {
  1013. log.Warningf("AllocationSet.AggregateBy: error getting share coefficienct for '%s'", alloc.Name)
  1014. }
  1015. continue
  1016. }
  1017. alloc.SharedCost += sharedAlloc.TotalCost() * shareCoefficients[alloc.Name]
  1018. }
  1019. }
  1020. }
  1021. // (9) Aggregate external allocations into aggregated allocations. This may
  1022. // not be possible for every external allocation, but attempt to find an
  1023. // exact key match, given each external allocation's proerties, and
  1024. // aggregate if an exact match is found.
  1025. for _, alloc := range externalSet.allocations {
  1026. skip := false
  1027. for _, ff := range options.FilterFuncs {
  1028. if !ff(alloc) {
  1029. skip = true
  1030. break
  1031. }
  1032. }
  1033. if !skip {
  1034. key := alloc.generateKey(aggregateBy, options.LabelConfig)
  1035. alloc.Name = key
  1036. aggSet.Insert(alloc)
  1037. }
  1038. }
  1039. // (10) Combine all idle allocations into a single "__idle__" allocation
  1040. if !options.SplitIdle {
  1041. for _, idleAlloc := range aggSet.IdleAllocations() {
  1042. aggSet.Delete(idleAlloc.Name)
  1043. idleAlloc.Name = IdleSuffix
  1044. aggSet.Insert(idleAlloc)
  1045. }
  1046. }
  1047. as.allocations = aggSet.allocations
  1048. return nil
  1049. }
  1050. func computeShareCoeffs(aggregateBy []string, options *AllocationAggregationOptions, as *AllocationSet) (map[string]float64, error) {
  1051. // Compute coeffs by totalling per-allocation, then dividing by the total.
  1052. coeffs := map[string]float64{}
  1053. // Compute totals for all allocations
  1054. total := 0.0
  1055. // ShareEven counts each aggregation with even weight, whereas ShareWeighted
  1056. // counts each aggregation proportionally to its respective costs
  1057. shareType := options.ShareSplit
  1058. // Record allocation values first, then normalize by totals to get percentages
  1059. for _, alloc := range as.allocations {
  1060. if alloc.IsIdle() {
  1061. // Skip idle allocations in coefficient calculation
  1062. continue
  1063. }
  1064. if alloc.IsUnmounted() {
  1065. // Skip unmounted allocations in coefficient calculation
  1066. continue
  1067. }
  1068. // Determine the post-aggregation key under which the allocation will
  1069. // be shared.
  1070. name := alloc.generateKey(aggregateBy, options.LabelConfig)
  1071. // If the current allocation will be filtered out in step 3, contribute
  1072. // its share of the shared coefficient to a "__filtered__" bin, which
  1073. // will ultimately be dropped. This step ensures that the shared cost
  1074. // of a non-filtered allocation will be conserved even when the filter
  1075. // is removed. (Otherwise, all the shared cost will get redistributed
  1076. // over the unfiltered results, inflating their shared costs.)
  1077. filtered := false
  1078. for _, ff := range options.FilterFuncs {
  1079. if !ff(alloc) {
  1080. filtered = true
  1081. break
  1082. }
  1083. }
  1084. if filtered {
  1085. name = "__filtered__"
  1086. }
  1087. if shareType == ShareEven {
  1088. // Even distribution is not additive - set to 1.0 for everything
  1089. coeffs[name] = 1.0
  1090. // Total for even distribution is always the number of coefficients
  1091. total = float64(len(coeffs))
  1092. } else {
  1093. // Both are additive for weighted distribution, where each
  1094. // cumulative coefficient will be divided by the total.
  1095. coeffs[name] += alloc.TotalCost()
  1096. total += alloc.TotalCost()
  1097. }
  1098. }
  1099. // Normalize coefficients by totals
  1100. for a := range coeffs {
  1101. if coeffs[a] > 0 && total > 0 {
  1102. coeffs[a] /= total
  1103. } else {
  1104. log.Warningf("ETL: invalid values for shared coefficients: %d, %d", coeffs[a], total)
  1105. coeffs[a] = 0.0
  1106. }
  1107. }
  1108. return coeffs, nil
  1109. }
  1110. func computeIdleCoeffs(options *AllocationAggregationOptions, as *AllocationSet, shareSet *AllocationSet) (map[string]map[string]map[string]float64, error) {
  1111. types := []string{"cpu", "gpu", "ram"}
  1112. // Compute idle coefficients, then save them in AllocationAggregationOptions
  1113. coeffs := map[string]map[string]map[string]float64{}
  1114. // Compute totals per resource for CPU, GPU, RAM, and PV
  1115. totals := map[string]map[string]float64{}
  1116. // ShareEven counts each allocation with even weight, whereas ShareWeighted
  1117. // counts each allocation proportionally to its respective costs
  1118. shareType := options.ShareIdle
  1119. // Record allocation values first, then normalize by totals to get percentages
  1120. for _, alloc := range as.allocations {
  1121. if alloc.IsIdle() {
  1122. // Skip idle allocations in coefficient calculation
  1123. continue
  1124. }
  1125. idleId, err := alloc.getIdleId(options)
  1126. if err != nil {
  1127. log.DedupedWarningf(3, "Missing Idle Key for %s", alloc.Name)
  1128. }
  1129. // get the name key for the allocation
  1130. name := alloc.Name
  1131. // Create key based tables if they don't exist
  1132. if _, ok := coeffs[idleId]; !ok {
  1133. coeffs[idleId] = map[string]map[string]float64{}
  1134. }
  1135. if _, ok := totals[idleId]; !ok {
  1136. totals[idleId] = map[string]float64{}
  1137. }
  1138. if _, ok := coeffs[idleId][name]; !ok {
  1139. coeffs[idleId][name] = map[string]float64{}
  1140. }
  1141. if shareType == ShareEven {
  1142. for _, r := range types {
  1143. // Not additive - hard set to 1.0
  1144. coeffs[idleId][name][r] = 1.0
  1145. // totals are additive
  1146. totals[idleId][r] += 1.0
  1147. }
  1148. } else {
  1149. coeffs[idleId][name]["cpu"] += alloc.CPUTotalCost()
  1150. coeffs[idleId][name]["gpu"] += alloc.GPUTotalCost()
  1151. coeffs[idleId][name]["ram"] += alloc.RAMTotalCost()
  1152. totals[idleId]["cpu"] += alloc.CPUTotalCost()
  1153. totals[idleId]["gpu"] += alloc.GPUTotalCost()
  1154. totals[idleId]["ram"] += alloc.RAMTotalCost()
  1155. }
  1156. }
  1157. // Do the same for shared allocations
  1158. for _, alloc := range shareSet.allocations {
  1159. if alloc.IsIdle() {
  1160. // Skip idle allocations in coefficient calculation
  1161. continue
  1162. }
  1163. // idleId will be providerId or cluster
  1164. idleId, err := alloc.getIdleId(options)
  1165. if err != nil {
  1166. log.DedupedWarningf(3, "Missing Idle Key in share set for %s", alloc.Name)
  1167. }
  1168. // get the name key for the allocation
  1169. name := alloc.Name
  1170. // Create idleId based tables if they don't exist
  1171. if _, ok := coeffs[idleId]; !ok {
  1172. coeffs[idleId] = map[string]map[string]float64{}
  1173. }
  1174. if _, ok := totals[idleId]; !ok {
  1175. totals[idleId] = map[string]float64{}
  1176. }
  1177. if _, ok := coeffs[idleId][name]; !ok {
  1178. coeffs[idleId][name] = map[string]float64{}
  1179. }
  1180. if shareType == ShareEven {
  1181. for _, r := range types {
  1182. // Not additive - hard set to 1.0
  1183. coeffs[idleId][name][r] = 1.0
  1184. // totals are additive
  1185. totals[idleId][r] += 1.0
  1186. }
  1187. } else {
  1188. coeffs[idleId][name]["cpu"] += alloc.CPUTotalCost()
  1189. coeffs[idleId][name]["gpu"] += alloc.GPUTotalCost()
  1190. coeffs[idleId][name]["ram"] += alloc.RAMTotalCost()
  1191. totals[idleId]["cpu"] += alloc.CPUTotalCost()
  1192. totals[idleId]["gpu"] += alloc.GPUTotalCost()
  1193. totals[idleId]["ram"] += alloc.RAMTotalCost()
  1194. }
  1195. }
  1196. // Normalize coefficients by totals
  1197. for c := range coeffs {
  1198. for a := range coeffs[c] {
  1199. for _, r := range types {
  1200. if coeffs[c][a][r] > 0 && totals[c][r] > 0 {
  1201. coeffs[c][a][r] /= totals[c][r]
  1202. }
  1203. }
  1204. }
  1205. }
  1206. return coeffs, nil
  1207. }
  1208. // getIdleId returns the providerId or cluster of an Allocation depending on the IdleByNode
  1209. // option in the AllocationAggregationOptions and an error if the respective field is missing
  1210. func (a *Allocation) getIdleId(options *AllocationAggregationOptions) (string, error) {
  1211. var idleId string
  1212. if options.IdleByNode {
  1213. // Key allocations to ProviderId to match against node
  1214. idleId = a.Properties.ProviderID
  1215. if idleId == "" {
  1216. return idleId, fmt.Errorf("ProviderId is not set")
  1217. }
  1218. } else {
  1219. // key the allocations by cluster id
  1220. idleId = a.Properties.Cluster
  1221. if idleId == "" {
  1222. return idleId, fmt.Errorf("ClusterProp is not set")
  1223. }
  1224. }
  1225. return idleId, nil
  1226. }
  1227. func (a *Allocation) generateKey(aggregateBy []string, labelConfig *LabelConfig) string {
  1228. if a == nil {
  1229. return ""
  1230. }
  1231. if labelConfig == nil {
  1232. labelConfig = NewLabelConfig()
  1233. }
  1234. // Names will ultimately be joined into a single name, which uniquely
  1235. // identifies allocations.
  1236. names := []string{}
  1237. for _, agg := range aggregateBy {
  1238. switch true {
  1239. case agg == AllocationClusterProp:
  1240. names = append(names, a.Properties.Cluster)
  1241. case agg == AllocationNodeProp:
  1242. names = append(names, a.Properties.Node)
  1243. case agg == AllocationNamespaceProp:
  1244. names = append(names, a.Properties.Namespace)
  1245. case agg == AllocationControllerKindProp:
  1246. controllerKind := a.Properties.ControllerKind
  1247. if controllerKind == "" {
  1248. // Indicate that allocation has no controller
  1249. controllerKind = UnallocatedSuffix
  1250. }
  1251. names = append(names, controllerKind)
  1252. case agg == AllocationDaemonSetProp || agg == AllocationStatefulSetProp || agg == AllocationDeploymentProp || agg == AllocationJobProp:
  1253. controller := a.Properties.Controller
  1254. if agg != a.Properties.ControllerKind || controller == "" {
  1255. // The allocation does not have the specified controller kind
  1256. controller = UnallocatedSuffix
  1257. }
  1258. names = append(names, controller)
  1259. case agg == AllocationControllerProp:
  1260. controller := a.Properties.Controller
  1261. if controller == "" {
  1262. // Indicate that allocation has no controller
  1263. controller = UnallocatedSuffix
  1264. } else if a.Properties.ControllerKind != "" {
  1265. controller = fmt.Sprintf("%s:%s", a.Properties.ControllerKind, controller)
  1266. }
  1267. names = append(names, controller)
  1268. case agg == AllocationPodProp:
  1269. names = append(names, a.Properties.Pod)
  1270. case agg == AllocationContainerProp:
  1271. names = append(names, a.Properties.Container)
  1272. case agg == AllocationServiceProp:
  1273. services := a.Properties.Services
  1274. if len(services) == 0 {
  1275. // Indicate that allocation has no services
  1276. names = append(names, UnallocatedSuffix)
  1277. } else {
  1278. // This just uses the first service
  1279. for _, service := range services {
  1280. names = append(names, service)
  1281. break
  1282. }
  1283. }
  1284. case strings.HasPrefix(agg, "label:"):
  1285. labels := a.Properties.Labels
  1286. if labels == nil {
  1287. names = append(names, UnallocatedSuffix)
  1288. } else {
  1289. labelName := strings.TrimPrefix(agg, "label:")
  1290. if labelValue, ok := labels[labelName]; ok {
  1291. names = append(names, fmt.Sprintf("%s=%s", labelName, labelValue))
  1292. } else {
  1293. names = append(names, UnallocatedSuffix)
  1294. }
  1295. }
  1296. case strings.HasPrefix(agg, "annotation:"):
  1297. annotations := a.Properties.Annotations
  1298. if annotations == nil {
  1299. names = append(names, UnallocatedSuffix)
  1300. } else {
  1301. annotationName := strings.TrimPrefix(agg, "annotation:")
  1302. if annotationValue, ok := annotations[annotationName]; ok {
  1303. names = append(names, fmt.Sprintf("%s=%s", annotationName, annotationValue))
  1304. } else {
  1305. names = append(names, UnallocatedSuffix)
  1306. }
  1307. }
  1308. case agg == AllocationDepartmentProp:
  1309. labels := a.Properties.Labels
  1310. if labels == nil {
  1311. names = append(names, UnallocatedSuffix)
  1312. } else {
  1313. labelName := labelConfig.DepartmentLabel
  1314. if labelValue, ok := labels[labelName]; ok {
  1315. names = append(names, labelValue)
  1316. } else {
  1317. names = append(names, UnallocatedSuffix)
  1318. }
  1319. }
  1320. case agg == AllocationEnvironmentProp:
  1321. labels := a.Properties.Labels
  1322. if labels == nil {
  1323. names = append(names, UnallocatedSuffix)
  1324. } else {
  1325. labelName := labelConfig.EnvironmentLabel
  1326. if labelValue, ok := labels[labelName]; ok {
  1327. names = append(names, labelValue)
  1328. } else {
  1329. names = append(names, UnallocatedSuffix)
  1330. }
  1331. }
  1332. case agg == AllocationOwnerProp:
  1333. labels := a.Properties.Labels
  1334. if labels == nil {
  1335. names = append(names, UnallocatedSuffix)
  1336. } else {
  1337. labelName := labelConfig.OwnerLabel
  1338. if labelValue, ok := labels[labelName]; ok {
  1339. names = append(names, labelValue)
  1340. } else {
  1341. names = append(names, UnallocatedSuffix)
  1342. }
  1343. }
  1344. case agg == AllocationProductProp:
  1345. labels := a.Properties.Labels
  1346. if labels == nil {
  1347. names = append(names, UnallocatedSuffix)
  1348. } else {
  1349. labelName := labelConfig.ProductLabel
  1350. if labelValue, ok := labels[labelName]; ok {
  1351. names = append(names, labelValue)
  1352. } else {
  1353. names = append(names, UnallocatedSuffix)
  1354. }
  1355. }
  1356. case agg == AllocationTeamProp:
  1357. labels := a.Properties.Labels
  1358. if labels == nil {
  1359. names = append(names, UnallocatedSuffix)
  1360. } else {
  1361. labelName := labelConfig.TeamLabel
  1362. if labelValue, ok := labels[labelName]; ok {
  1363. names = append(names, labelValue)
  1364. } else {
  1365. names = append(names, UnallocatedSuffix)
  1366. }
  1367. }
  1368. default:
  1369. // This case should never be reached, as input up until this point
  1370. // should be checked and rejected if invalid. But if we do get a
  1371. // value we don't recognize, log a warning.
  1372. log.Warningf("AggregateBy: illegal aggregation parameter: %s", agg)
  1373. }
  1374. }
  1375. return strings.Join(names, "/")
  1376. }
  1377. // TODO:CLEANUP get rid of this
  1378. // Helper function to check for slice membership. Not sure if repeated elsewhere in our codebase.
  1379. func indexOf(v string, arr []string) int {
  1380. for i, s := range arr {
  1381. // This is caseless equivalence
  1382. if strings.EqualFold(v, s) {
  1383. return i
  1384. }
  1385. }
  1386. return -1
  1387. }
  1388. // Clone returns a new AllocationSet with a deep copy of the given
  1389. // AllocationSet's allocations.
  1390. func (as *AllocationSet) Clone() *AllocationSet {
  1391. if as == nil {
  1392. return nil
  1393. }
  1394. as.RLock()
  1395. defer as.RUnlock()
  1396. allocs := map[string]*Allocation{}
  1397. for k, v := range as.allocations {
  1398. allocs[k] = v.Clone()
  1399. }
  1400. externalKeys := map[string]bool{}
  1401. for k, v := range as.externalKeys {
  1402. externalKeys[k] = v
  1403. }
  1404. idleKeys := map[string]bool{}
  1405. for k, v := range as.idleKeys {
  1406. idleKeys[k] = v
  1407. }
  1408. return &AllocationSet{
  1409. allocations: allocs,
  1410. externalKeys: externalKeys,
  1411. idleKeys: idleKeys,
  1412. Window: as.Window.Clone(),
  1413. }
  1414. }
  1415. // ComputeIdleAllocations computes the idle allocations for the AllocationSet,
  1416. // given a set of Assets. Ideally, assetSet should contain only Nodes, but if
  1417. // it contains other Assets, they will be ignored; only CPU, GPU and RAM are
  1418. // considered for idle allocation. If the Nodes have adjustments, then apply
  1419. // the adjustments proportionally to each of the resources so that total
  1420. // allocation with idle reflects the adjusted node costs. One idle allocation
  1421. // per-cluster will be computed and returned, keyed by cluster_id.
  1422. func (as *AllocationSet) ComputeIdleAllocations(assetSet *AssetSet) (map[string]*Allocation, error) {
  1423. if as == nil {
  1424. return nil, fmt.Errorf("cannot compute idle allocation for nil AllocationSet")
  1425. }
  1426. if assetSet == nil {
  1427. return nil, fmt.Errorf("cannot compute idle allocation with nil AssetSet")
  1428. }
  1429. if !as.Window.Equal(assetSet.Window) {
  1430. return nil, fmt.Errorf("cannot compute idle allocation for sets with mismatched windows: %s != %s", as.Window, assetSet.Window)
  1431. }
  1432. window := as.Window
  1433. // Build a map of cumulative cluster asset costs, per resource; i.e.
  1434. // cluster-to-{cpu|gpu|ram}-to-cost.
  1435. assetClusterResourceCosts := map[string]map[string]float64{}
  1436. assetSet.Each(func(key string, a Asset) {
  1437. if node, ok := a.(*Node); ok {
  1438. if _, ok := assetClusterResourceCosts[node.Properties().Cluster]; !ok {
  1439. assetClusterResourceCosts[node.Properties().Cluster] = map[string]float64{}
  1440. }
  1441. // adjustmentRate is used to scale resource costs proportionally
  1442. // by the adjustment. This is necessary because we only get one
  1443. // adjustment per Node, not one per-resource-per-Node.
  1444. //
  1445. // e.g. total cost = $90, adjustment = -$10 => 0.9
  1446. // e.g. total cost = $150, adjustment = -$300 => 0.3333
  1447. // e.g. total cost = $150, adjustment = $50 => 1.5
  1448. adjustmentRate := 1.0
  1449. if node.TotalCost()-node.Adjustment() == 0 {
  1450. // If (totalCost - adjustment) is 0.0 then adjustment cancels
  1451. // the entire node cost and we should make everything 0
  1452. // without dividing by 0.
  1453. adjustmentRate = 0.0
  1454. log.DedupedWarningf(5, "Compute Idle Allocations: Node Cost Adjusted to $0.00 for %s", node.properties.Name)
  1455. } else if node.Adjustment() != 0.0 {
  1456. // adjustmentRate is the ratio of cost-with-adjustment (i.e. TotalCost)
  1457. // to cost-without-adjustment (i.e. TotalCost - Adjustment).
  1458. adjustmentRate = node.TotalCost() / (node.TotalCost() - node.Adjustment())
  1459. }
  1460. cpuCost := node.CPUCost * (1.0 - node.Discount) * adjustmentRate
  1461. gpuCost := node.GPUCost * (1.0 - node.Discount) * adjustmentRate
  1462. ramCost := node.RAMCost * (1.0 - node.Discount) * adjustmentRate
  1463. assetClusterResourceCosts[node.Properties().Cluster]["cpu"] += cpuCost
  1464. assetClusterResourceCosts[node.Properties().Cluster]["gpu"] += gpuCost
  1465. assetClusterResourceCosts[node.Properties().Cluster]["ram"] += ramCost
  1466. }
  1467. })
  1468. // Determine start, end on a per-cluster basis
  1469. clusterStarts := map[string]time.Time{}
  1470. clusterEnds := map[string]time.Time{}
  1471. // Subtract allocated costs from asset costs, leaving only the remaining
  1472. // idle costs.
  1473. as.Each(func(name string, a *Allocation) {
  1474. cluster := a.Properties.Cluster
  1475. if cluster == "" {
  1476. // Failed to find allocation's cluster
  1477. return
  1478. }
  1479. if _, ok := assetClusterResourceCosts[cluster]; !ok {
  1480. // Failed to find assets for allocation's cluster
  1481. return
  1482. }
  1483. // Set cluster (start, end) if they are either not currently set,
  1484. // or if the detected (start, end) of the current allocation falls
  1485. // before or after, respectively, the current values.
  1486. if s, ok := clusterStarts[cluster]; !ok || a.Start.Before(s) {
  1487. clusterStarts[cluster] = a.Start
  1488. }
  1489. if e, ok := clusterEnds[cluster]; !ok || a.End.After(e) {
  1490. clusterEnds[cluster] = a.End
  1491. }
  1492. assetClusterResourceCosts[cluster]["cpu"] -= a.CPUTotalCost()
  1493. assetClusterResourceCosts[cluster]["gpu"] -= a.GPUTotalCost()
  1494. assetClusterResourceCosts[cluster]["ram"] -= a.RAMTotalCost()
  1495. })
  1496. // Turn remaining un-allocated asset costs into idle allocations
  1497. idleAllocs := map[string]*Allocation{}
  1498. for cluster, resources := range assetClusterResourceCosts {
  1499. // Default start and end to the (start, end) of the given window, but
  1500. // use the actual, detected (start, end) pair if they are available.
  1501. start := *window.Start()
  1502. if s, ok := clusterStarts[cluster]; ok && window.Contains(s) {
  1503. start = s
  1504. }
  1505. end := *window.End()
  1506. if e, ok := clusterEnds[cluster]; ok && window.Contains(e) {
  1507. end = e
  1508. }
  1509. idleAlloc := &Allocation{
  1510. Name: fmt.Sprintf("%s/%s", cluster, IdleSuffix),
  1511. Window: window.Clone(),
  1512. Properties: &AllocationProperties{Cluster: cluster},
  1513. Start: start,
  1514. End: end,
  1515. CPUCost: resources["cpu"],
  1516. GPUCost: resources["gpu"],
  1517. RAMCost: resources["ram"],
  1518. }
  1519. // Do not continue if multiple idle allocations are computed for a
  1520. // single cluster.
  1521. if _, ok := idleAllocs[cluster]; ok {
  1522. return nil, fmt.Errorf("duplicate idle allocations for cluster %s", cluster)
  1523. }
  1524. idleAllocs[cluster] = idleAlloc
  1525. }
  1526. return idleAllocs, nil
  1527. }
  1528. // ComputeIdleAllocationsByNode computes the idle allocations for the AllocationSet,
  1529. // given a set of Assets. Ideally, assetSet should contain only Nodes, but if
  1530. // it contains other Assets, they will be ignored; only CPU, GPU and RAM are
  1531. // considered for idle allocation. If the Nodes have adjustments, then apply
  1532. // the adjustments proportionally to each of the resources so that total
  1533. // allocation with idle reflects the adjusted node costs. One idle allocation
  1534. // per-node will be computed and returned, keyed by cluster_id.
  1535. func (as *AllocationSet) ComputeIdleAllocationsByNode(assetSet *AssetSet) (map[string]*Allocation, error) {
  1536. if as == nil {
  1537. return nil, fmt.Errorf("cannot compute idle allocation for nil AllocationSet")
  1538. }
  1539. if assetSet == nil {
  1540. return nil, fmt.Errorf("cannot compute idle allocation with nil AssetSet")
  1541. }
  1542. if !as.Window.Equal(assetSet.Window) {
  1543. return nil, fmt.Errorf("cannot compute idle allocation for sets with mismatched windows: %s != %s", as.Window, assetSet.Window)
  1544. }
  1545. window := as.Window
  1546. // Build a map of cumulative cluster asset costs, per resource; i.e.
  1547. // cluster-to-{cpu|gpu|ram}-to-cost.
  1548. assetNodeResourceCosts := map[string]map[string]float64{}
  1549. nodesByProviderId := map[string]*Node{}
  1550. assetSet.Each(func(key string, a Asset) {
  1551. if node, ok := a.(*Node); ok {
  1552. if _, ok := assetNodeResourceCosts[node.Properties().ProviderID]; ok || node.Properties().ProviderID == "" {
  1553. log.DedupedWarningf(5, "Compute Idle Allocations By Node: Node missing providerId: %s", node.properties.Name)
  1554. return
  1555. }
  1556. nodesByProviderId[node.Properties().ProviderID] = node
  1557. assetNodeResourceCosts[node.Properties().ProviderID] = map[string]float64{}
  1558. // adjustmentRate is used to scale resource costs proportionally
  1559. // by the adjustment. This is necessary because we only get one
  1560. // adjustment per Node, not one per-resource-per-Node.
  1561. //
  1562. // e.g. total cost = $90, adjustment = -$10 => 0.9
  1563. // e.g. total cost = $150, adjustment = -$300 => 0.3333
  1564. // e.g. total cost = $150, adjustment = $50 => 1.5
  1565. adjustmentRate := 1.0
  1566. if node.TotalCost()-node.Adjustment() == 0 {
  1567. // If (totalCost - adjustment) is 0.0 then adjustment cancels
  1568. // the entire node cost and we should make everything 0
  1569. // without dividing by 0.
  1570. adjustmentRate = 0.0
  1571. log.DedupedWarningf(5, "Compute Idle Allocations: Node Cost Adjusted to $0.00 for %s", node.properties.Name)
  1572. } else if node.Adjustment() != 0.0 {
  1573. // adjustmentRate is the ratio of cost-with-adjustment (i.e. TotalCost)
  1574. // to cost-without-adjustment (i.e. TotalCost - Adjustment).
  1575. adjustmentRate = node.TotalCost() / (node.TotalCost() - node.Adjustment())
  1576. }
  1577. cpuCost := node.CPUCost * (1.0 - node.Discount) * adjustmentRate
  1578. gpuCost := node.GPUCost * (1.0 - node.Discount) * adjustmentRate
  1579. ramCost := node.RAMCost * (1.0 - node.Discount) * adjustmentRate
  1580. assetNodeResourceCosts[node.Properties().ProviderID]["cpu"] += cpuCost
  1581. assetNodeResourceCosts[node.Properties().ProviderID]["gpu"] += gpuCost
  1582. assetNodeResourceCosts[node.Properties().ProviderID]["ram"] += ramCost
  1583. }
  1584. })
  1585. // Determine start, end on a per-cluster basis
  1586. nodeStarts := map[string]time.Time{}
  1587. nodeEnds := map[string]time.Time{}
  1588. // Subtract allocated costs from asset costs, leaving only the remaining
  1589. // idle costs.
  1590. as.Each(func(name string, a *Allocation) {
  1591. providerId := a.Properties.ProviderID
  1592. if providerId == "" {
  1593. // Failed to find allocation's node
  1594. log.DedupedWarningf(5, "Compute Idle Allocations By Node: Allocation missing providerId: %s", a.Name)
  1595. return
  1596. }
  1597. if _, ok := assetNodeResourceCosts[providerId]; !ok {
  1598. // Failed to find assets for allocation's node
  1599. return
  1600. }
  1601. // Set cluster (start, end) if they are either not currently set,
  1602. // or if the detected (start, end) of the current allocation falls
  1603. // before or after, respectively, the current values.
  1604. if s, ok := nodeStarts[providerId]; !ok || a.Start.Before(s) {
  1605. nodeStarts[providerId] = a.Start
  1606. }
  1607. if e, ok := nodeEnds[providerId]; !ok || a.End.After(e) {
  1608. nodeEnds[providerId] = a.End
  1609. }
  1610. assetNodeResourceCosts[providerId]["cpu"] -= a.CPUTotalCost()
  1611. assetNodeResourceCosts[providerId]["gpu"] -= a.GPUTotalCost()
  1612. assetNodeResourceCosts[providerId]["ram"] -= a.RAMTotalCost()
  1613. })
  1614. // Turn remaining un-allocated asset costs into idle allocations
  1615. idleAllocs := map[string]*Allocation{}
  1616. for providerId, resources := range assetNodeResourceCosts {
  1617. // Default start and end to the (start, end) of the given window, but
  1618. // use the actual, detected (start, end) pair if they are available.
  1619. start := *window.Start()
  1620. if s, ok := nodeStarts[providerId]; ok && window.Contains(s) {
  1621. start = s
  1622. }
  1623. end := *window.End()
  1624. if e, ok := nodeEnds[providerId]; ok && window.Contains(e) {
  1625. end = e
  1626. }
  1627. node := nodesByProviderId[providerId]
  1628. idleAlloc := &Allocation{
  1629. Name: fmt.Sprintf("%s/%s", node.properties.Name, IdleSuffix),
  1630. Window: window.Clone(),
  1631. Properties: &AllocationProperties{
  1632. Cluster: node.properties.Cluster,
  1633. Node: node.properties.Name,
  1634. ProviderID: providerId,
  1635. },
  1636. Start: start,
  1637. End: end,
  1638. CPUCost: resources["cpu"],
  1639. GPUCost: resources["gpu"],
  1640. RAMCost: resources["ram"],
  1641. }
  1642. // Do not continue if multiple idle allocations are computed for a
  1643. // single node.
  1644. if _, ok := idleAllocs[providerId]; ok {
  1645. return nil, fmt.Errorf("duplicate idle allocations for node Provider ID: %s", providerId)
  1646. }
  1647. idleAllocs[providerId] = idleAlloc
  1648. }
  1649. return idleAllocs, nil
  1650. }
  1651. // Delete removes the allocation with the given name from the set
  1652. func (as *AllocationSet) Delete(name string) {
  1653. if as == nil {
  1654. return
  1655. }
  1656. as.Lock()
  1657. defer as.Unlock()
  1658. delete(as.externalKeys, name)
  1659. delete(as.idleKeys, name)
  1660. delete(as.allocations, name)
  1661. }
  1662. // Each invokes the given function for each Allocation in the set
  1663. func (as *AllocationSet) Each(f func(string, *Allocation)) {
  1664. if as == nil {
  1665. return
  1666. }
  1667. for k, a := range as.allocations {
  1668. f(k, a)
  1669. }
  1670. }
  1671. // End returns the End time of the AllocationSet window
  1672. func (as *AllocationSet) End() time.Time {
  1673. if as == nil {
  1674. log.Warningf("AllocationSet: calling End on nil AllocationSet")
  1675. return time.Unix(0, 0)
  1676. }
  1677. if as.Window.End() == nil {
  1678. log.Warningf("AllocationSet: AllocationSet with illegal window: End is nil; len(as.allocations)=%d", len(as.allocations))
  1679. return time.Unix(0, 0)
  1680. }
  1681. return *as.Window.End()
  1682. }
  1683. // Get returns the Allocation at the given key in the AllocationSet
  1684. func (as *AllocationSet) Get(key string) *Allocation {
  1685. as.RLock()
  1686. defer as.RUnlock()
  1687. if alloc, ok := as.allocations[key]; ok {
  1688. return alloc
  1689. }
  1690. return nil
  1691. }
  1692. // ExternalAllocations returns a map of the external allocations in the set.
  1693. // Returns clones of the actual Allocations, so mutability is not a problem.
  1694. func (as *AllocationSet) ExternalAllocations() map[string]*Allocation {
  1695. externals := map[string]*Allocation{}
  1696. if as.IsEmpty() {
  1697. return externals
  1698. }
  1699. as.RLock()
  1700. defer as.RUnlock()
  1701. for key := range as.externalKeys {
  1702. if alloc, ok := as.allocations[key]; ok {
  1703. externals[key] = alloc.Clone()
  1704. }
  1705. }
  1706. return externals
  1707. }
  1708. // ExternalCost returns the total aggregated external costs of the set
  1709. func (as *AllocationSet) ExternalCost() float64 {
  1710. if as.IsEmpty() {
  1711. return 0.0
  1712. }
  1713. as.RLock()
  1714. defer as.RUnlock()
  1715. externalCost := 0.0
  1716. for _, alloc := range as.allocations {
  1717. externalCost += alloc.ExternalCost
  1718. }
  1719. return externalCost
  1720. }
  1721. // IdleAllocations returns a map of the idle allocations in the AllocationSet.
  1722. // Returns clones of the actual Allocations, so mutability is not a problem.
  1723. func (as *AllocationSet) IdleAllocations() map[string]*Allocation {
  1724. idles := map[string]*Allocation{}
  1725. if as.IsEmpty() {
  1726. return idles
  1727. }
  1728. as.RLock()
  1729. defer as.RUnlock()
  1730. for key := range as.idleKeys {
  1731. if alloc, ok := as.allocations[key]; ok {
  1732. idles[key] = alloc.Clone()
  1733. }
  1734. }
  1735. return idles
  1736. }
  1737. // Insert aggregates the current entry in the AllocationSet by the given Allocation,
  1738. // but only if the Allocation is valid, i.e. matches the AllocationSet's window. If
  1739. // there is no existing entry, one is created. Nil error response indicates success.
  1740. func (as *AllocationSet) Insert(that *Allocation) error {
  1741. return as.insert(that)
  1742. }
  1743. func (as *AllocationSet) insert(that *Allocation) error {
  1744. if as == nil {
  1745. return fmt.Errorf("cannot insert into nil AllocationSet")
  1746. }
  1747. as.Lock()
  1748. defer as.Unlock()
  1749. if as.allocations == nil {
  1750. as.allocations = map[string]*Allocation{}
  1751. }
  1752. if as.externalKeys == nil {
  1753. as.externalKeys = map[string]bool{}
  1754. }
  1755. if as.idleKeys == nil {
  1756. as.idleKeys = map[string]bool{}
  1757. }
  1758. // Add the given Allocation to the existing entry, if there is one;
  1759. // otherwise just set directly into allocations
  1760. if _, ok := as.allocations[that.Name]; !ok {
  1761. as.allocations[that.Name] = that
  1762. } else {
  1763. as.allocations[that.Name].add(that)
  1764. }
  1765. // If the given Allocation is an external one, record that
  1766. if that.IsExternal() {
  1767. as.externalKeys[that.Name] = true
  1768. }
  1769. // If the given Allocation is an idle one, record that
  1770. if that.IsIdle() {
  1771. as.idleKeys[that.Name] = true
  1772. }
  1773. return nil
  1774. }
  1775. // IsEmpty returns true if the AllocationSet is nil, or if it contains
  1776. // zero allocations.
  1777. func (as *AllocationSet) IsEmpty() bool {
  1778. if as == nil || len(as.allocations) == 0 {
  1779. return true
  1780. }
  1781. as.RLock()
  1782. defer as.RUnlock()
  1783. return as.allocations == nil || len(as.allocations) == 0
  1784. }
  1785. // Length returns the number of Allocations in the set
  1786. func (as *AllocationSet) Length() int {
  1787. if as == nil {
  1788. return 0
  1789. }
  1790. as.RLock()
  1791. defer as.RUnlock()
  1792. return len(as.allocations)
  1793. }
  1794. // Map clones and returns a map of the AllocationSet's Allocations
  1795. func (as *AllocationSet) Map() map[string]*Allocation {
  1796. if as.IsEmpty() {
  1797. return map[string]*Allocation{}
  1798. }
  1799. return as.Clone().allocations
  1800. }
  1801. // MarshalJSON JSON-encodes the AllocationSet
  1802. func (as *AllocationSet) MarshalJSON() ([]byte, error) {
  1803. as.RLock()
  1804. defer as.RUnlock()
  1805. return json.Marshal(as.allocations)
  1806. }
  1807. // Resolution returns the AllocationSet's window duration
  1808. func (as *AllocationSet) Resolution() time.Duration {
  1809. return as.Window.Duration()
  1810. }
  1811. // Set uses the given Allocation to overwrite the existing entry in the
  1812. // AllocationSet under the Allocation's name.
  1813. func (as *AllocationSet) Set(alloc *Allocation) error {
  1814. if as.IsEmpty() {
  1815. as.Lock()
  1816. as.allocations = map[string]*Allocation{}
  1817. as.externalKeys = map[string]bool{}
  1818. as.idleKeys = map[string]bool{}
  1819. as.Unlock()
  1820. }
  1821. as.Lock()
  1822. defer as.Unlock()
  1823. as.allocations[alloc.Name] = alloc
  1824. // If the given Allocation is an external one, record that
  1825. if alloc.IsExternal() {
  1826. as.externalKeys[alloc.Name] = true
  1827. }
  1828. // If the given Allocation is an idle one, record that
  1829. if alloc.IsIdle() {
  1830. as.idleKeys[alloc.Name] = true
  1831. }
  1832. return nil
  1833. }
  1834. // Start returns the Start time of the AllocationSet window
  1835. func (as *AllocationSet) Start() time.Time {
  1836. if as == nil {
  1837. log.Warningf("AllocationSet: calling Start on nil AllocationSet")
  1838. return time.Unix(0, 0)
  1839. }
  1840. if as.Window.Start() == nil {
  1841. log.Warningf("AllocationSet: AllocationSet with illegal window: Start is nil; len(as.allocations)=%d", len(as.allocations))
  1842. return time.Unix(0, 0)
  1843. }
  1844. return *as.Window.Start()
  1845. }
  1846. // String represents the given Allocation as a string
  1847. func (as *AllocationSet) String() string {
  1848. if as == nil {
  1849. return "<nil>"
  1850. }
  1851. return fmt.Sprintf("AllocationSet{length: %d; window: %s; totalCost: %.2f}",
  1852. as.Length(), as.Window, as.TotalCost())
  1853. }
  1854. // TotalCost returns the sum of all TotalCosts of the allocations contained
  1855. func (as *AllocationSet) TotalCost() float64 {
  1856. if as.IsEmpty() {
  1857. return 0.0
  1858. }
  1859. as.RLock()
  1860. defer as.RUnlock()
  1861. tc := 0.0
  1862. for _, a := range as.allocations {
  1863. tc += a.TotalCost()
  1864. }
  1865. return tc
  1866. }
  1867. // UTCOffset returns the AllocationSet's configured UTCOffset.
  1868. func (as *AllocationSet) UTCOffset() time.Duration {
  1869. _, zone := as.Start().Zone()
  1870. return time.Duration(zone) * time.Second
  1871. }
  1872. func (as *AllocationSet) accumulate(that *AllocationSet) (*AllocationSet, error) {
  1873. if as.IsEmpty() {
  1874. return that.Clone(), nil
  1875. }
  1876. if that.IsEmpty() {
  1877. return as.Clone(), nil
  1878. }
  1879. // Set start, end to min(start), max(end)
  1880. start := as.Start()
  1881. end := as.End()
  1882. if that.Start().Before(start) {
  1883. start = that.Start()
  1884. }
  1885. if that.End().After(end) {
  1886. end = that.End()
  1887. }
  1888. acc := NewAllocationSet(start, end)
  1889. as.RLock()
  1890. defer as.RUnlock()
  1891. that.RLock()
  1892. defer that.RUnlock()
  1893. for _, alloc := range as.allocations {
  1894. err := acc.insert(alloc)
  1895. if err != nil {
  1896. return nil, err
  1897. }
  1898. }
  1899. for _, alloc := range that.allocations {
  1900. err := acc.insert(alloc)
  1901. if err != nil {
  1902. return nil, err
  1903. }
  1904. }
  1905. return acc, nil
  1906. }
  1907. // AllocationSetRange is a thread-safe slice of AllocationSets. It is meant to
  1908. // be used such that the AllocationSets held are consecutive and coherent with
  1909. // respect to using the same aggregation properties, UTC offset, and
  1910. // resolution. However these rules are not necessarily enforced, so use wisely.
  1911. type AllocationSetRange struct {
  1912. sync.RWMutex
  1913. allocations []*AllocationSet
  1914. FromStore string // stores the name of the store used to retrieve the data
  1915. }
  1916. // NewAllocationSetRange instantiates a new range composed of the given
  1917. // AllocationSets in the order provided.
  1918. func NewAllocationSetRange(allocs ...*AllocationSet) *AllocationSetRange {
  1919. return &AllocationSetRange{
  1920. allocations: allocs,
  1921. }
  1922. }
  1923. // Accumulate sums each AllocationSet in the given range, returning a single cumulative
  1924. // AllocationSet for the entire range.
  1925. func (asr *AllocationSetRange) Accumulate() (*AllocationSet, error) {
  1926. var allocSet *AllocationSet
  1927. var err error
  1928. asr.RLock()
  1929. defer asr.RUnlock()
  1930. for _, as := range asr.allocations {
  1931. allocSet, err = allocSet.accumulate(as)
  1932. if err != nil {
  1933. return nil, err
  1934. }
  1935. }
  1936. return allocSet, nil
  1937. }
  1938. // TODO accumulate into lower-resolution chunks of the given resolution
  1939. // func (asr *AllocationSetRange) AccumulateBy(resolution time.Duration) *AllocationSetRange
  1940. // AggregateBy aggregates each AllocationSet in the range by the given
  1941. // properties and options.
  1942. func (asr *AllocationSetRange) AggregateBy(aggregateBy []string, options *AllocationAggregationOptions) error {
  1943. aggRange := &AllocationSetRange{allocations: []*AllocationSet{}}
  1944. asr.Lock()
  1945. defer asr.Unlock()
  1946. for _, as := range asr.allocations {
  1947. err := as.AggregateBy(aggregateBy, options)
  1948. if err != nil {
  1949. return err
  1950. }
  1951. aggRange.allocations = append(aggRange.allocations, as)
  1952. }
  1953. asr.allocations = aggRange.allocations
  1954. return nil
  1955. }
  1956. // Append appends the given AllocationSet to the end of the range. It does not
  1957. // validate whether or not that violates window continuity.
  1958. func (asr *AllocationSetRange) Append(that *AllocationSet) {
  1959. asr.Lock()
  1960. defer asr.Unlock()
  1961. asr.allocations = append(asr.allocations, that)
  1962. }
  1963. // Each invokes the given function for each AllocationSet in the range
  1964. func (asr *AllocationSetRange) Each(f func(int, *AllocationSet)) {
  1965. if asr == nil {
  1966. return
  1967. }
  1968. for i, as := range asr.allocations {
  1969. f(i, as)
  1970. }
  1971. }
  1972. // Get retrieves the AllocationSet at the given index of the range.
  1973. func (asr *AllocationSetRange) Get(i int) (*AllocationSet, error) {
  1974. if i < 0 || i >= len(asr.allocations) {
  1975. return nil, fmt.Errorf("AllocationSetRange: index out of range: %d", i)
  1976. }
  1977. asr.RLock()
  1978. defer asr.RUnlock()
  1979. return asr.allocations[i], nil
  1980. }
  1981. // InsertRange merges the given AllocationSetRange into the receiving one by
  1982. // lining up sets with matching windows, then inserting each allocation from
  1983. // the given ASR into the respective set in the receiving ASR. If the given
  1984. // ASR contains an AllocationSet from a window that does not exist in the
  1985. // receiving ASR, then an error is returned. However, the given ASR does not
  1986. // need to cover the full range of the receiver.
  1987. func (asr *AllocationSetRange) InsertRange(that *AllocationSetRange) error {
  1988. if asr == nil {
  1989. return fmt.Errorf("cannot insert range into nil AllocationSetRange")
  1990. }
  1991. // keys maps window to index in asr
  1992. keys := map[string]int{}
  1993. asr.Each(func(i int, as *AllocationSet) {
  1994. if as == nil {
  1995. return
  1996. }
  1997. keys[as.Window.String()] = i
  1998. })
  1999. // Nothing to merge, so simply return
  2000. if len(keys) == 0 {
  2001. return nil
  2002. }
  2003. var err error
  2004. that.Each(func(j int, thatAS *AllocationSet) {
  2005. if thatAS == nil || err != nil {
  2006. return
  2007. }
  2008. // Find matching AllocationSet in asr
  2009. i, ok := keys[thatAS.Window.String()]
  2010. if !ok {
  2011. err = fmt.Errorf("cannot merge AllocationSet into window that does not exist: %s", thatAS.Window.String())
  2012. return
  2013. }
  2014. as, err := asr.Get(i)
  2015. if err != nil {
  2016. err = fmt.Errorf("AllocationSetRange index does not exist: %d", i)
  2017. return
  2018. }
  2019. // Insert each Allocation from the given set
  2020. thatAS.Each(func(k string, alloc *Allocation) {
  2021. err = as.Insert(alloc)
  2022. if err != nil {
  2023. err = fmt.Errorf("error inserting allocation: %s", err)
  2024. return
  2025. }
  2026. })
  2027. })
  2028. // err might be nil
  2029. return err
  2030. }
  2031. // Length returns the length of the range, which is zero if nil
  2032. func (asr *AllocationSetRange) Length() int {
  2033. if asr == nil || asr.allocations == nil {
  2034. return 0
  2035. }
  2036. asr.RLock()
  2037. defer asr.RUnlock()
  2038. return len(asr.allocations)
  2039. }
  2040. // MarshalJSON JSON-encodes the range
  2041. func (asr *AllocationSetRange) MarshalJSON() ([]byte, error) {
  2042. asr.RLock()
  2043. defer asr.RUnlock()
  2044. return json.Marshal(asr.allocations)
  2045. }
  2046. // Slice copies the underlying slice of AllocationSets, maintaining order,
  2047. // and returns the copied slice.
  2048. func (asr *AllocationSetRange) Slice() []*AllocationSet {
  2049. if asr == nil || asr.allocations == nil {
  2050. return nil
  2051. }
  2052. asr.RLock()
  2053. defer asr.RUnlock()
  2054. copy := []*AllocationSet{}
  2055. for _, as := range asr.allocations {
  2056. copy = append(copy, as.Clone())
  2057. }
  2058. return copy
  2059. }
  2060. // String represents the given AllocationSetRange as a string
  2061. func (asr *AllocationSetRange) String() string {
  2062. if asr == nil {
  2063. return "<nil>"
  2064. }
  2065. return fmt.Sprintf("AllocationSetRange{length: %d}", asr.Length())
  2066. }
  2067. // UTCOffset returns the detected UTCOffset of the AllocationSets within the
  2068. // range. Defaults to 0 if the range is nil or empty. Does not warn if there
  2069. // are sets with conflicting UTCOffsets (just returns the first).
  2070. func (asr *AllocationSetRange) UTCOffset() time.Duration {
  2071. if asr.Length() == 0 {
  2072. return 0
  2073. }
  2074. as, err := asr.Get(0)
  2075. if err != nil {
  2076. return 0
  2077. }
  2078. return as.UTCOffset()
  2079. }
  2080. // Window returns the full window that the AllocationSetRange spans, from the
  2081. // start of the first AllocationSet to the end of the last one.
  2082. func (asr *AllocationSetRange) Window() Window {
  2083. if asr == nil || asr.Length() == 0 {
  2084. return NewWindow(nil, nil)
  2085. }
  2086. start := asr.allocations[0].Start()
  2087. end := asr.allocations[asr.Length()-1].End()
  2088. return NewWindow(&start, &end)
  2089. }
  2090. // Start returns the earliest start of all Allocations in the AllocationSetRange.
  2091. // It returns an error if there are no allocations.
  2092. func (asr *AllocationSetRange) Start() (time.Time, error) {
  2093. start := time.Time{}
  2094. firstStartNotSet := true
  2095. asr.Each(func(i int, as *AllocationSet) {
  2096. as.Each(func(s string, a *Allocation) {
  2097. if firstStartNotSet {
  2098. start = a.Start
  2099. firstStartNotSet = false
  2100. }
  2101. if a.Start.Before(start) {
  2102. start = a.Start
  2103. }
  2104. })
  2105. })
  2106. if firstStartNotSet {
  2107. return start, fmt.Errorf("had no data to compute a start from")
  2108. }
  2109. return start, nil
  2110. }
  2111. // End returns the latest end of all Allocations in the AllocationSetRange.
  2112. // It returns an error if there are no allocations.
  2113. func (asr *AllocationSetRange) End() (time.Time, error) {
  2114. end := time.Time{}
  2115. firstEndNotSet := true
  2116. asr.Each(func(i int, as *AllocationSet) {
  2117. as.Each(func(s string, a *Allocation) {
  2118. if firstEndNotSet {
  2119. end = a.End
  2120. firstEndNotSet = false
  2121. }
  2122. if a.End.After(end) {
  2123. end = a.End
  2124. }
  2125. })
  2126. })
  2127. if firstEndNotSet {
  2128. return end, fmt.Errorf("had no data to compute an end from")
  2129. }
  2130. return end, nil
  2131. }
  2132. // Minutes returns the duration, in minutes, between the earliest start
  2133. // and the latest end of all allocations in the AllocationSetRange.
  2134. func (asr *AllocationSetRange) Minutes() float64 {
  2135. start, err := asr.Start()
  2136. if err != nil {
  2137. return 0
  2138. }
  2139. end, err := asr.End()
  2140. if err != nil {
  2141. return 0
  2142. }
  2143. duration := end.Sub(start)
  2144. return duration.Minutes()
  2145. }