allocation.go 79 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459
  1. package kubecost
  2. import (
  3. "bytes"
  4. "fmt"
  5. "sort"
  6. "strings"
  7. "sync"
  8. "time"
  9. "github.com/kubecost/cost-model/pkg/log"
  10. "github.com/kubecost/cost-model/pkg/util"
  11. "github.com/kubecost/cost-model/pkg/util/json"
  12. )
  13. // TODO Clean-up use of IsEmpty; nil checks should be separated for safety.
  14. // TODO Consider making Allocation an interface, which is fulfilled by structs
  15. // like KubernetesAllocation, IdleAllocation, and ExternalAllocation.
  16. // ExternalSuffix indicates an external allocation
  17. const ExternalSuffix = "__external__"
  18. // IdleSuffix indicates an idle allocation property
  19. const IdleSuffix = "__idle__"
  20. // SharedSuffix indicates an shared allocation property
  21. const SharedSuffix = "__shared__"
  22. // UnallocatedSuffix indicates an unallocated allocation property
  23. const UnallocatedSuffix = "__unallocated__"
  24. // UnmountedSuffix indicated allocation to an unmounted PV
  25. const UnmountedSuffix = "__unmounted__"
  26. // ShareWeighted indicates that a shared resource should be shared as a
  27. // proportion of the cost of the remaining allocations.
  28. const ShareWeighted = "__weighted__"
  29. // ShareEven indicates that a shared resource should be shared evenly across
  30. // all remaining allocations.
  31. const ShareEven = "__even__"
  32. // ShareNone indicates that a shareable resource should not be shared
  33. const ShareNone = "__none__"
  34. // Allocation is a unit of resource allocation and cost for a given window
  35. // of time and for a given kubernetes construct with its associated set of
  36. // properties.
  37. // TODO:CLEANUP consider dropping name in favor of just Allocation and an
  38. // Assets-style key() function for AllocationSet.
  39. type Allocation struct {
  40. Name string `json:"name"`
  41. Properties *AllocationProperties `json:"properties,omitempty"`
  42. Window Window `json:"window"`
  43. Start time.Time `json:"start"`
  44. End time.Time `json:"end"`
  45. CPUCoreHours float64 `json:"cpuCoreHours"`
  46. CPUCoreRequestAverage float64 `json:"cpuCoreRequestAverage"`
  47. CPUCoreUsageAverage float64 `json:"cpuCoreUsageAverage"`
  48. CPUCost float64 `json:"cpuCost"`
  49. CPUCostAdjustment float64 `json:"cpuCostAdjustment"`
  50. GPUHours float64 `json:"gpuHours"`
  51. GPUCost float64 `json:"gpuCost"`
  52. GPUCostAdjustment float64 `json:"gpuCostAdjustment"`
  53. NetworkCost float64 `json:"networkCost"`
  54. NetworkCostAdjustment float64 `json:"networkCostAdjustment"`
  55. LoadBalancerCost float64 `json:"loadBalancerCost"`
  56. LoadBalancerCostAdjustment float64 `json:"loadBalancerCostAdjustment"`
  57. PVs PVAllocations `json:"-"`
  58. PVCostAdjustment float64 `json:"pvCostAdjustment"`
  59. RAMByteHours float64 `json:"ramByteHours"`
  60. RAMBytesRequestAverage float64 `json:"ramByteRequestAverage"`
  61. RAMBytesUsageAverage float64 `json:"ramByteUsageAverage"`
  62. RAMCost float64 `json:"ramCost"`
  63. RAMCostAdjustment float64 `json:"ramCostAdjustment"`
  64. SharedCost float64 `json:"sharedCost"`
  65. SharedCostAdjustment float64 `json:"sharedCostAdjustment"`
  66. ExternalCost float64 `json:"externalCost"`
  67. // RawAllocationOnly is a pointer so if it is not present it will be
  68. // marshalled as null rather than as an object with Go default values.
  69. RawAllocationOnly *RawAllocationOnlyData `json:"rawAllocationOnly"`
  70. }
  71. // RawAllocationOnlyData is information that only belong in "raw" Allocations,
  72. // those which have not undergone aggregation, accumulation, or any other form
  73. // of combination to produce a new Allocation from other Allocations.
  74. //
  75. // Max usage data belongs here because computing the overall maximum from two
  76. // or more Allocations is a non-trivial operation that cannot be defined without
  77. // maintaining a large amount of state. Consider the following example:
  78. // _______________________________________________
  79. //
  80. // A1 Using 3 CPU ---- ----- ------
  81. // A2 Using 2 CPU ---- ----- ----
  82. // A3 Using 1 CPU --- --
  83. // _______________________________________________
  84. // Time ---->
  85. //
  86. // The logical maximum CPU usage is 5, but this cannot be calculated iteratively,
  87. // which is how we calculate aggregations and accumulations of Allocations currently.
  88. // This becomes a problem I could call "maximum sum of overlapping intervals" and is
  89. // essentially a variant of an interval scheduling algorithm.
  90. //
  91. // If we had types to differentiate between regular Allocations and AggregatedAllocations
  92. // then this type would be unnecessary and its fields would go into the regular Allocation
  93. // and not in the AggregatedAllocation.
  94. type RawAllocationOnlyData struct {
  95. CPUCoreUsageMax float64 `json:"cpuCoreUsageMax"`
  96. RAMBytesUsageMax float64 `json:"ramByteUsageMax"`
  97. }
  98. // PVAllocations is a map of Disk Asset Identifiers to the
  99. // usage of them by an Allocation as recorded in a PVAllocation
  100. type PVAllocations map[PVKey]*PVAllocation
  101. // Clone creates a deep copy of a PVAllocations
  102. func (pv *PVAllocations) Clone() PVAllocations {
  103. if pv == nil || *pv == nil {
  104. return nil
  105. }
  106. apv := *pv
  107. clonePV := PVAllocations{}
  108. for k, v := range apv {
  109. clonePV[k] = &PVAllocation{
  110. ByteHours: v.ByteHours,
  111. Cost: v.Cost,
  112. }
  113. }
  114. return clonePV
  115. }
  116. // Add adds contents of that to the calling PVAllocations
  117. func (pv *PVAllocations) Add(that PVAllocations) PVAllocations {
  118. apv := pv.Clone()
  119. if that != nil {
  120. if apv == nil {
  121. apv = PVAllocations{}
  122. }
  123. for pvKey, thatPVAlloc := range that {
  124. apvAlloc, ok := apv[pvKey]
  125. if !ok {
  126. apvAlloc = &PVAllocation{}
  127. }
  128. apvAlloc.Cost += thatPVAlloc.Cost
  129. apvAlloc.ByteHours += thatPVAlloc.ByteHours
  130. apv[pvKey] = apvAlloc
  131. }
  132. }
  133. return apv
  134. }
  135. // PVKey for identifying Disk type assets
  136. type PVKey struct {
  137. Cluster string `json:"cluster"`
  138. Name string `json:"name"`
  139. }
  140. // PVAllocation contains the byte hour usage
  141. // and cost of an Allocation for a single PV
  142. type PVAllocation struct {
  143. ByteHours float64 `json:"byteHours"`
  144. Cost float64 `json:"cost"`
  145. }
  146. // AllocationMatchFunc is a function that can be used to match Allocations by
  147. // returning true for any given Allocation if a condition is met.
  148. type AllocationMatchFunc func(*Allocation) bool
  149. // Add returns the result of summing the two given Allocations, which sums the
  150. // summary fields (e.g. costs, resources) and recomputes efficiency. Neither of
  151. // the two original Allocations are mutated in the process.
  152. func (a *Allocation) Add(that *Allocation) (*Allocation, error) {
  153. if a == nil {
  154. return that.Clone(), nil
  155. }
  156. if that == nil {
  157. return a.Clone(), nil
  158. }
  159. // Note: no need to clone "that", as add only mutates the receiver
  160. agg := a.Clone()
  161. agg.add(that)
  162. return agg, nil
  163. }
  164. // Clone returns a deep copy of the given Allocation
  165. func (a *Allocation) Clone() *Allocation {
  166. if a == nil {
  167. return nil
  168. }
  169. return &Allocation{
  170. Name: a.Name,
  171. Properties: a.Properties.Clone(),
  172. Window: a.Window.Clone(),
  173. Start: a.Start,
  174. End: a.End,
  175. CPUCoreHours: a.CPUCoreHours,
  176. CPUCoreRequestAverage: a.CPUCoreRequestAverage,
  177. CPUCoreUsageAverage: a.CPUCoreUsageAverage,
  178. CPUCost: a.CPUCost,
  179. CPUCostAdjustment: a.CPUCostAdjustment,
  180. GPUHours: a.GPUHours,
  181. GPUCost: a.GPUCost,
  182. GPUCostAdjustment: a.GPUCostAdjustment,
  183. NetworkCost: a.NetworkCost,
  184. NetworkCostAdjustment: a.NetworkCostAdjustment,
  185. LoadBalancerCost: a.LoadBalancerCost,
  186. LoadBalancerCostAdjustment: a.LoadBalancerCostAdjustment,
  187. PVs: a.PVs.Clone(),
  188. PVCostAdjustment: a.PVCostAdjustment,
  189. RAMByteHours: a.RAMByteHours,
  190. RAMBytesRequestAverage: a.RAMBytesRequestAverage,
  191. RAMBytesUsageAverage: a.RAMBytesUsageAverage,
  192. RAMCost: a.RAMCost,
  193. RAMCostAdjustment: a.RAMCostAdjustment,
  194. SharedCost: a.SharedCost,
  195. SharedCostAdjustment: a.SharedCostAdjustment,
  196. ExternalCost: a.ExternalCost,
  197. RawAllocationOnly: a.RawAllocationOnly.Clone(),
  198. }
  199. }
  200. // Clone returns a deep copy of the given RawAllocationOnlyData
  201. func (r *RawAllocationOnlyData) Clone() *RawAllocationOnlyData {
  202. if r == nil {
  203. return nil
  204. }
  205. return &RawAllocationOnlyData{
  206. CPUCoreUsageMax: r.CPUCoreUsageMax,
  207. RAMBytesUsageMax: r.RAMBytesUsageMax,
  208. }
  209. }
  210. // Equal returns true if the values held in the given Allocation precisely
  211. // match those of the receiving Allocation. nil does not match nil. Floating
  212. // point values need to match according to util.IsApproximately, which accounts
  213. // for small, reasonable floating point error margins.
  214. func (a *Allocation) Equal(that *Allocation) bool {
  215. if a == nil || that == nil {
  216. return false
  217. }
  218. if a.Name != that.Name {
  219. return false
  220. }
  221. if !a.Properties.Equal(that.Properties) {
  222. return false
  223. }
  224. if !a.Window.Equal(that.Window) {
  225. return false
  226. }
  227. if !a.Start.Equal(that.Start) {
  228. return false
  229. }
  230. if !a.End.Equal(that.End) {
  231. return false
  232. }
  233. if !util.IsApproximately(a.CPUCoreHours, that.CPUCoreHours) {
  234. return false
  235. }
  236. if !util.IsApproximately(a.CPUCost, that.CPUCost) {
  237. return false
  238. }
  239. if !util.IsApproximately(a.CPUCostAdjustment, that.CPUCostAdjustment) {
  240. return false
  241. }
  242. if !util.IsApproximately(a.GPUHours, that.GPUHours) {
  243. return false
  244. }
  245. if !util.IsApproximately(a.GPUCost, that.GPUCost) {
  246. return false
  247. }
  248. if !util.IsApproximately(a.GPUCostAdjustment, that.GPUCostAdjustment) {
  249. return false
  250. }
  251. if !util.IsApproximately(a.NetworkCost, that.NetworkCost) {
  252. return false
  253. }
  254. if !util.IsApproximately(a.NetworkCostAdjustment, that.NetworkCostAdjustment) {
  255. return false
  256. }
  257. if !util.IsApproximately(a.LoadBalancerCost, that.LoadBalancerCost) {
  258. return false
  259. }
  260. if !util.IsApproximately(a.LoadBalancerCostAdjustment, that.LoadBalancerCostAdjustment) {
  261. return false
  262. }
  263. if !util.IsApproximately(a.PVCostAdjustment, that.PVCostAdjustment) {
  264. return false
  265. }
  266. if !util.IsApproximately(a.RAMByteHours, that.RAMByteHours) {
  267. return false
  268. }
  269. if !util.IsApproximately(a.RAMCost, that.RAMCost) {
  270. return false
  271. }
  272. if !util.IsApproximately(a.RAMCostAdjustment, that.RAMCostAdjustment) {
  273. return false
  274. }
  275. if !util.IsApproximately(a.SharedCost, that.SharedCost) {
  276. return false
  277. }
  278. if !util.IsApproximately(a.SharedCostAdjustment, that.SharedCostAdjustment) {
  279. return false
  280. }
  281. if !util.IsApproximately(a.ExternalCost, that.ExternalCost) {
  282. return false
  283. }
  284. if a.RawAllocationOnly == nil && that.RawAllocationOnly != nil {
  285. return false
  286. }
  287. if a.RawAllocationOnly != nil && that.RawAllocationOnly == nil {
  288. return false
  289. }
  290. if a.RawAllocationOnly != nil && that.RawAllocationOnly != nil {
  291. if !util.IsApproximately(a.RawAllocationOnly.CPUCoreUsageMax, that.RawAllocationOnly.CPUCoreUsageMax) {
  292. return false
  293. }
  294. if !util.IsApproximately(a.RawAllocationOnly.RAMBytesUsageMax, that.RawAllocationOnly.RAMBytesUsageMax) {
  295. return false
  296. }
  297. }
  298. aPVs := a.PVs
  299. thatPVs := that.PVs
  300. if len(aPVs) == len(thatPVs) {
  301. for k, pv := range aPVs {
  302. tv, ok := thatPVs[k]
  303. if !ok || *tv != *pv {
  304. return false
  305. }
  306. }
  307. } else {
  308. return false
  309. }
  310. return true
  311. }
  312. // TotalCost is the total cost of the Allocation including adjustments
  313. func (a *Allocation) TotalCost() float64 {
  314. return a.CPUTotalCost() + a.GPUTotalCost() + a.RAMTotalCost() + a.PVTotalCost() + a.NetworkTotalCost() + a.LBTotalCost() + a.SharedTotalCost() + a.ExternalCost
  315. }
  316. // CPUTotalCost calculates total CPU cost of Allocation including adjustment
  317. func (a *Allocation) CPUTotalCost() float64 {
  318. return a.CPUCost + a.CPUCostAdjustment
  319. }
  320. // GPUTotalCost calculates total GPU cost of Allocation including adjustment
  321. func (a *Allocation) GPUTotalCost() float64 {
  322. return a.GPUCost + a.GPUCostAdjustment
  323. }
  324. // RAMTotalCost calculates total RAM cost of Allocation including adjustment
  325. func (a *Allocation) RAMTotalCost() float64 {
  326. return a.RAMCost + a.RAMCostAdjustment
  327. }
  328. // PVTotalCost calculates total PV cost of Allocation including adjustment
  329. func (a *Allocation) PVTotalCost() float64 {
  330. return a.PVCost() + a.PVCostAdjustment
  331. }
  332. // NetworkTotalCost calculates total Network cost of Allocation including adjustment
  333. func (a *Allocation) NetworkTotalCost() float64 {
  334. return a.NetworkCost + a.NetworkCostAdjustment
  335. }
  336. // LBTotalCost calculates total LB cost of Allocation including adjustment
  337. func (a *Allocation) LBTotalCost() float64 {
  338. return a.LoadBalancerCost + a.LoadBalancerCostAdjustment
  339. }
  340. // SharedTotalCost calculates total shared cost of Allocation including adjustment
  341. func (a *Allocation) SharedTotalCost() float64 {
  342. return a.SharedCost + a.SharedCostAdjustment
  343. }
  344. // PVCost calculate cumulative cost of all PVs that Allocation is attached to
  345. func (a *Allocation) PVCost() float64 {
  346. cost := 0.0
  347. for _, pv := range a.PVs {
  348. cost += pv.Cost
  349. }
  350. return cost
  351. }
  352. // PVByteHours calculate cumulative ByteHours of all PVs that Allocation is attached to
  353. func (a *Allocation) PVByteHours() float64 {
  354. byteHours := 0.0
  355. for _, pv := range a.PVs {
  356. byteHours += pv.ByteHours
  357. }
  358. return byteHours
  359. }
  360. // CPUEfficiency is the ratio of usage to request. If there is no request and
  361. // no usage or cost, then efficiency is zero. If there is no request, but there
  362. // is usage or cost, then efficiency is 100%.
  363. func (a *Allocation) CPUEfficiency() float64 {
  364. if a.CPUCoreRequestAverage > 0 {
  365. return a.CPUCoreUsageAverage / a.CPUCoreRequestAverage
  366. }
  367. if a.CPUCoreUsageAverage == 0.0 || a.CPUCost == 0.0 {
  368. return 0.0
  369. }
  370. return 1.0
  371. }
  372. // RAMEfficiency is the ratio of usage to request. If there is no request and
  373. // no usage or cost, then efficiency is zero. If there is no request, but there
  374. // is usage or cost, then efficiency is 100%.
  375. func (a *Allocation) RAMEfficiency() float64 {
  376. if a.RAMBytesRequestAverage > 0 {
  377. return a.RAMBytesUsageAverage / a.RAMBytesRequestAverage
  378. }
  379. if a.RAMBytesUsageAverage == 0.0 || a.RAMCost == 0.0 {
  380. return 0.0
  381. }
  382. return 1.0
  383. }
  384. // TotalEfficiency is the cost-weighted average of CPU and RAM efficiency. If
  385. // there is no cost at all, then efficiency is zero.
  386. func (a *Allocation) TotalEfficiency() float64 {
  387. if a.RAMTotalCost()+a.CPUTotalCost() > 0 {
  388. ramCostEff := a.RAMEfficiency() * a.RAMTotalCost()
  389. cpuCostEff := a.CPUEfficiency() * a.CPUTotalCost()
  390. return (ramCostEff + cpuCostEff) / (a.CPUTotalCost() + a.RAMTotalCost())
  391. }
  392. return 0.0
  393. }
  394. // CPUCores converts the Allocation's CPUCoreHours into average CPUCores
  395. func (a *Allocation) CPUCores() float64 {
  396. if a.Minutes() <= 0.0 {
  397. return 0.0
  398. }
  399. return a.CPUCoreHours / (a.Minutes() / 60.0)
  400. }
  401. // RAMBytes converts the Allocation's RAMByteHours into average RAMBytes
  402. func (a *Allocation) RAMBytes() float64 {
  403. if a.Minutes() <= 0.0 {
  404. return 0.0
  405. }
  406. return a.RAMByteHours / (a.Minutes() / 60.0)
  407. }
  408. // GPUs converts the Allocation's GPUHours into average GPUs
  409. func (a *Allocation) GPUs() float64 {
  410. if a.Minutes() <= 0.0 {
  411. return 0.0
  412. }
  413. return a.GPUHours / (a.Minutes() / 60.0)
  414. }
  415. // PVBytes converts the Allocation's PVByteHours into average PVBytes
  416. func (a *Allocation) PVBytes() float64 {
  417. if a.Minutes() <= 0.0 {
  418. return 0.0
  419. }
  420. return a.PVByteHours() / (a.Minutes() / 60.0)
  421. }
  422. // MarshalJSON implements json.Marshaler interface
  423. func (a *Allocation) MarshalJSON() ([]byte, error) {
  424. buffer := bytes.NewBufferString("{")
  425. jsonEncodeString(buffer, "name", a.Name, ",")
  426. jsonEncode(buffer, "properties", a.Properties, ",")
  427. jsonEncode(buffer, "window", a.Window, ",")
  428. jsonEncodeString(buffer, "start", a.Start.Format(time.RFC3339), ",")
  429. jsonEncodeString(buffer, "end", a.End.Format(time.RFC3339), ",")
  430. jsonEncodeFloat64(buffer, "minutes", a.Minutes(), ",")
  431. jsonEncodeFloat64(buffer, "cpuCores", a.CPUCores(), ",")
  432. jsonEncodeFloat64(buffer, "cpuCoreRequestAverage", a.CPUCoreRequestAverage, ",")
  433. jsonEncodeFloat64(buffer, "cpuCoreUsageAverage", a.CPUCoreUsageAverage, ",")
  434. jsonEncodeFloat64(buffer, "cpuCoreHours", a.CPUCoreHours, ",")
  435. jsonEncodeFloat64(buffer, "cpuCost", a.CPUCost, ",")
  436. jsonEncodeFloat64(buffer, "cpuCostAdjustment", a.CPUCostAdjustment, ",")
  437. jsonEncodeFloat64(buffer, "cpuEfficiency", a.CPUEfficiency(), ",")
  438. jsonEncodeFloat64(buffer, "gpuCount", a.GPUs(), ",")
  439. jsonEncodeFloat64(buffer, "gpuHours", a.GPUHours, ",")
  440. jsonEncodeFloat64(buffer, "gpuCost", a.GPUCost, ",")
  441. jsonEncodeFloat64(buffer, "gpuCostAdjustment", a.GPUCostAdjustment, ",")
  442. jsonEncodeFloat64(buffer, "networkCost", a.NetworkCost, ",")
  443. jsonEncodeFloat64(buffer, "networkCostAdjustment", a.NetworkCostAdjustment, ",")
  444. jsonEncodeFloat64(buffer, "loadBalancerCost", a.LoadBalancerCost, ",")
  445. jsonEncodeFloat64(buffer, "loadBalancerCostAdjustment", a.LoadBalancerCostAdjustment, ",")
  446. jsonEncodeFloat64(buffer, "pvBytes", a.PVBytes(), ",")
  447. jsonEncodeFloat64(buffer, "pvByteHours", a.PVByteHours(), ",")
  448. jsonEncodeFloat64(buffer, "pvCost", a.PVCost(), ",")
  449. jsonEncode(buffer, "pvs", a.PVs, ",") // Todo Sean: this does not work properly
  450. jsonEncodeFloat64(buffer, "pvCostAdjustment", a.PVCostAdjustment, ",")
  451. jsonEncodeFloat64(buffer, "ramBytes", a.RAMBytes(), ",")
  452. jsonEncodeFloat64(buffer, "ramByteRequestAverage", a.RAMBytesRequestAverage, ",")
  453. jsonEncodeFloat64(buffer, "ramByteUsageAverage", a.RAMBytesUsageAverage, ",")
  454. jsonEncodeFloat64(buffer, "ramByteHours", a.RAMByteHours, ",")
  455. jsonEncodeFloat64(buffer, "ramCost", a.RAMCost, ",")
  456. jsonEncodeFloat64(buffer, "ramCostAdjustment", a.RAMCostAdjustment, ",")
  457. jsonEncodeFloat64(buffer, "ramEfficiency", a.RAMEfficiency(), ",")
  458. jsonEncodeFloat64(buffer, "sharedCost", a.SharedCost, ",")
  459. jsonEncodeFloat64(buffer, "sharedCostAdjustment", a.SharedCostAdjustment, ",")
  460. jsonEncodeFloat64(buffer, "externalCost", a.ExternalCost, ",")
  461. jsonEncodeFloat64(buffer, "totalCost", a.TotalCost(), ",")
  462. jsonEncodeFloat64(buffer, "totalEfficiency", a.TotalEfficiency(), ",")
  463. jsonEncode(buffer, "rawAllocationOnly", a.RawAllocationOnly, "")
  464. buffer.WriteString("}")
  465. return buffer.Bytes(), nil
  466. }
  467. // Resolution returns the duration of time covered by the Allocation
  468. func (a *Allocation) Resolution() time.Duration {
  469. return a.End.Sub(a.Start)
  470. }
  471. // IsAggregated is true if the given Allocation has been aggregated, which we
  472. // define by a lack of AllocationProperties.
  473. func (a *Allocation) IsAggregated() bool {
  474. return a == nil || a.Properties == nil
  475. }
  476. // IsExternal is true if the given Allocation represents external costs.
  477. func (a *Allocation) IsExternal() bool {
  478. return strings.Contains(a.Name, ExternalSuffix)
  479. }
  480. // IsIdle is true if the given Allocation represents idle costs.
  481. func (a *Allocation) IsIdle() bool {
  482. return strings.Contains(a.Name, IdleSuffix)
  483. }
  484. // IsUnallocated is true if the given Allocation represents unallocated costs.
  485. func (a *Allocation) IsUnallocated() bool {
  486. return strings.Contains(a.Name, UnallocatedSuffix)
  487. }
  488. // Minutes returns the number of minutes the Allocation represents, as defined
  489. // by the difference between the end and start times.
  490. func (a *Allocation) Minutes() float64 {
  491. return a.End.Sub(a.Start).Minutes()
  492. }
  493. // Share adds the TotalCost of the given Allocation to the SharedCost of the
  494. // receiving Allocation. No Start, End, Window, or AllocationProperties are considered.
  495. // Neither Allocation is mutated; a new Allocation is always returned.
  496. func (a *Allocation) Share(that *Allocation) (*Allocation, error) {
  497. if that == nil {
  498. return a.Clone(), nil
  499. }
  500. if a == nil {
  501. return nil, fmt.Errorf("cannot share with nil Allocation")
  502. }
  503. agg := a.Clone()
  504. agg.SharedCost += that.TotalCost()
  505. return agg, nil
  506. }
  507. // String represents the given Allocation as a string
  508. func (a *Allocation) String() string {
  509. return fmt.Sprintf("%s%s=%.2f", a.Name, NewWindow(&a.Start, &a.End), a.TotalCost())
  510. }
  511. func (a *Allocation) add(that *Allocation) {
  512. if a == nil {
  513. log.Warningf("Allocation.AggregateBy: trying to add a nil receiver")
  514. return
  515. }
  516. // Preserve string properties that are matching between the two allocations
  517. a.Properties = a.Properties.Intersection(that.Properties)
  518. // Expand the window to encompass both Allocations
  519. a.Window = a.Window.Expand(that.Window)
  520. // Sum non-cumulative fields by turning them into cumulative, adding them,
  521. // and then converting them back into averages after minutes have been
  522. // combined (just below).
  523. cpuReqCoreMins := a.CPUCoreRequestAverage * a.Minutes()
  524. cpuReqCoreMins += that.CPUCoreRequestAverage * that.Minutes()
  525. cpuUseCoreMins := a.CPUCoreUsageAverage * a.Minutes()
  526. cpuUseCoreMins += that.CPUCoreUsageAverage * that.Minutes()
  527. ramReqByteMins := a.RAMBytesRequestAverage * a.Minutes()
  528. ramReqByteMins += that.RAMBytesRequestAverage * that.Minutes()
  529. ramUseByteMins := a.RAMBytesUsageAverage * a.Minutes()
  530. ramUseByteMins += that.RAMBytesUsageAverage * that.Minutes()
  531. // Expand Start and End to be the "max" of among the given Allocations
  532. if that.Start.Before(a.Start) {
  533. a.Start = that.Start
  534. }
  535. if that.End.After(a.End) {
  536. a.End = that.End
  537. }
  538. // Convert cumulative request and usage back into rates
  539. // TODO:TEST write a unit test that fails if this is done incorrectly
  540. if a.Minutes() > 0 {
  541. a.CPUCoreRequestAverage = cpuReqCoreMins / a.Minutes()
  542. a.CPUCoreUsageAverage = cpuUseCoreMins / a.Minutes()
  543. a.RAMBytesRequestAverage = ramReqByteMins / a.Minutes()
  544. a.RAMBytesUsageAverage = ramUseByteMins / a.Minutes()
  545. } else {
  546. a.CPUCoreRequestAverage = 0.0
  547. a.CPUCoreUsageAverage = 0.0
  548. a.RAMBytesRequestAverage = 0.0
  549. a.RAMBytesUsageAverage = 0.0
  550. }
  551. // Sum all cumulative resource fields
  552. a.CPUCoreHours += that.CPUCoreHours
  553. a.GPUHours += that.GPUHours
  554. a.RAMByteHours += that.RAMByteHours
  555. // Sum all cumulative cost fields
  556. a.CPUCost += that.CPUCost
  557. a.GPUCost += that.GPUCost
  558. a.RAMCost += that.RAMCost
  559. a.NetworkCost += that.NetworkCost
  560. a.LoadBalancerCost += that.LoadBalancerCost
  561. a.SharedCost += that.SharedCost
  562. a.ExternalCost += that.ExternalCost
  563. // Sum PVAllocations
  564. a.PVs = a.PVs.Add(that.PVs)
  565. // Sum all cumulative adjustment fields
  566. a.CPUCostAdjustment += that.CPUCostAdjustment
  567. a.RAMCostAdjustment += that.RAMCostAdjustment
  568. a.GPUCostAdjustment += that.GPUCostAdjustment
  569. a.PVCostAdjustment += that.PVCostAdjustment
  570. a.NetworkCostAdjustment += that.NetworkCostAdjustment
  571. a.LoadBalancerCostAdjustment += that.LoadBalancerCostAdjustment
  572. a.SharedCostAdjustment += that.SharedCostAdjustment
  573. // Any data that is in a "raw allocation only" is not valid in any
  574. // sort of cumulative Allocation (like one that is added).
  575. a.RawAllocationOnly = nil
  576. }
  577. // AllocationSet stores a set of Allocations, each with a unique name, that share
  578. // a window. An AllocationSet is mutable, so treat it like a threadsafe map.
  579. type AllocationSet struct {
  580. sync.RWMutex
  581. allocations map[string]*Allocation
  582. externalKeys map[string]bool
  583. idleKeys map[string]bool
  584. Window Window
  585. Warnings []string
  586. Errors []string
  587. }
  588. // NewAllocationSet instantiates a new AllocationSet and, optionally, inserts
  589. // the given list of Allocations
  590. func NewAllocationSet(start, end time.Time, allocs ...*Allocation) *AllocationSet {
  591. as := &AllocationSet{
  592. allocations: map[string]*Allocation{},
  593. externalKeys: map[string]bool{},
  594. idleKeys: map[string]bool{},
  595. Window: NewWindow(&start, &end),
  596. }
  597. for _, a := range allocs {
  598. as.Insert(a)
  599. }
  600. return as
  601. }
  602. // AllocationAggregationOptions provide advanced functionality to AggregateBy, including
  603. // filtering results and sharing allocations. FilterFuncs are a list of match
  604. // functions such that, if any function fails, the allocation is ignored.
  605. // ShareFuncs are a list of match functions such that, if any function
  606. // succeeds, the allocation is marked as a shared resource. ShareIdle is a
  607. // simple flag for sharing idle resources.
  608. type AllocationAggregationOptions struct {
  609. FilterFuncs []AllocationMatchFunc
  610. SplitIdle bool
  611. IdleByNode bool
  612. MergeUnallocated bool
  613. ShareFuncs []AllocationMatchFunc
  614. ShareIdle string
  615. ShareSplit string
  616. SharedHourlyCosts map[string]float64
  617. }
  618. // AggregateBy aggregates the Allocations in the given AllocationSet by the given
  619. // AllocationProperty. This will only be legal if the AllocationSet is divisible by the
  620. // given AllocationProperty; e.g. Containers can be divided by Namespace, but not vice-a-versa.
  621. func (as *AllocationSet) AggregateBy(aggregateBy []string, options *AllocationAggregationOptions) error {
  622. // The order of operations for aggregating allocations is as follows:
  623. // 1. Partition external, idle, and shared allocations into separate sets.
  624. // Also, create the aggSet into which the results will be aggregated.
  625. // 2. Compute sharing coefficients for idle and shared resources
  626. // a) if idle allocation is to be shared, compute idle coefficients
  627. // b) if idle allocation is NOT shared, but filters are present, compute
  628. // idle filtration coefficients for the purpose of only returning the
  629. // portion of idle allocation that would have been shared with the
  630. // unfiltered results. (See unit tests 5.a,b,c)
  631. // c) generate shared allocation for then given shared overhead, which
  632. // must happen after (2a) and (2b)
  633. // d) if there are shared resources, compute share coefficients
  634. // 3. Drop any allocation that fails any of the filters
  635. // 4. Distribute idle allocations according to the idle coefficients
  636. // 5. Generate aggregation key and insert allocation into the output set
  637. // 6. If idle is shared and resources are shared, some idle might be shared
  638. // with a shared resource. Distribute that to the shared resources
  639. // prior to sharing them with the aggregated results.
  640. // 7. Apply idle filtration coefficients from step (2b)
  641. // 8. Distribute shared allocations according to the share coefficients.
  642. // 9. If there are external allocations that can be aggregated into
  643. // the output (i.e. they can be used to generate a valid key for
  644. // the given properties) then aggregate; otherwise... ignore them?
  645. // 10. If the merge idle option is enabled, merge any remaining idle
  646. // allocations into a single idle allocation
  647. if options == nil {
  648. options = &AllocationAggregationOptions{}
  649. }
  650. if as.IsEmpty() {
  651. return nil
  652. }
  653. // aggSet will collect the aggregated allocations
  654. aggSet := &AllocationSet{
  655. Window: as.Window.Clone(),
  656. }
  657. // externalSet will collect external allocations
  658. externalSet := &AllocationSet{
  659. Window: as.Window.Clone(),
  660. }
  661. // idleSet will be shared among aggSet after initial aggregation
  662. // is complete
  663. idleSet := &AllocationSet{
  664. Window: as.Window.Clone(),
  665. }
  666. // shareSet will be shared among aggSet after initial aggregation
  667. // is complete
  668. shareSet := &AllocationSet{
  669. Window: as.Window.Clone(),
  670. }
  671. as.Lock()
  672. defer as.Unlock()
  673. // (1) Loop and find all of the external, idle, and shared allocations. Add
  674. // them to their respective sets, removing them from the set of allocations
  675. // to aggregate.
  676. for _, alloc := range as.allocations {
  677. // External allocations get aggregated post-hoc (see step 6) and do
  678. // not necessarily contain complete sets of properties, so they are
  679. // moved to a separate AllocationSet.
  680. if alloc.IsExternal() {
  681. delete(as.externalKeys, alloc.Name)
  682. delete(as.allocations, alloc.Name)
  683. externalSet.Insert(alloc)
  684. continue
  685. }
  686. // Idle allocations should be separated into idleSet if they are to be
  687. // shared later on. If they are not to be shared, then add them to the
  688. // aggSet like any other allocation.
  689. if alloc.IsIdle() {
  690. delete(as.idleKeys, alloc.Name)
  691. delete(as.allocations, alloc.Name)
  692. if options.ShareIdle == ShareEven || options.ShareIdle == ShareWeighted {
  693. idleSet.Insert(alloc)
  694. } else {
  695. aggSet.Insert(alloc)
  696. }
  697. continue
  698. }
  699. // Shared allocations must be identified and separated prior to
  700. // aggregation and filtering. That is, if any of the ShareFuncs return
  701. // true for the allocation, then move it to shareSet.
  702. for _, sf := range options.ShareFuncs {
  703. if sf(alloc) {
  704. delete(as.idleKeys, alloc.Name)
  705. delete(as.allocations, alloc.Name)
  706. shareSet.Insert(alloc)
  707. break
  708. }
  709. }
  710. }
  711. // It's possible that no more un-shared, non-idle, non-external allocations
  712. // remain at this point. This always results in an emptySet, so return early.
  713. if len(as.allocations) == 0 {
  714. emptySet := &AllocationSet{
  715. Window: as.Window.Clone(),
  716. }
  717. as.allocations = emptySet.allocations
  718. return nil
  719. }
  720. // (2) In order to correctly share idle and shared costs, we first compute
  721. // sharing coefficients, which represent the proportion of each cost to
  722. // share with each allocation. Idle allocations are shared per-cluster or per-node,
  723. // per-allocation, and per-resource, while shared resources are shared per-
  724. // allocation only.
  725. //
  726. // For an idleCoefficient example, the entries:
  727. // [cluster1][cluster1/namespace1/pod1/container1][cpu] = 0.166667
  728. // [cluster1][cluster1/namespace1/pod1/container1][gpu] = 0.166667
  729. // [cluster1][cluster1/namespace1/pod1/container1][ram] = 0.687500
  730. // mean that the allocation "cluster1/namespace1/pod1/container1" will
  731. // receive 16.67% of cluster1's idle CPU and GPU costs and 68.75% of its
  732. // RAM costs.
  733. //
  734. // For a shareCoefficient example, the entries:
  735. // [namespace2] = 0.666667
  736. // [__filtered__] = 0.333333
  737. // mean that the post-aggregation allocation "namespace2" will receive
  738. // 66.67% of the shared resource costs, while the remaining 33.33% will
  739. // be filtered out, as they were shared with allocations that did not pass
  740. // one of the given filters.
  741. //
  742. // In order to maintain stable results when multiple operations are being
  743. // carried out (e.g. sharing idle, sharing resources, and filtering) these
  744. // coefficients are computed for the full set of allocations prior to
  745. // adding shared overhead and prior to applying filters.
  746. var err error
  747. // (2a) If there are idle costs to be shared, compute the coefficients for
  748. // sharing them among the non-idle, non-aggregated allocations (including
  749. // the shared allocations).
  750. var idleCoefficients map[string]map[string]map[string]float64
  751. if idleSet.Length() > 0 && options.ShareIdle != ShareNone {
  752. idleCoefficients, err = computeIdleCoeffs(options, as, shareSet)
  753. if err != nil {
  754. log.Warningf("AllocationSet.AggregateBy: compute idle coeff: %s", err)
  755. return fmt.Errorf("error computing idle coefficients: %s", err)
  756. }
  757. }
  758. // (2b) If idle costs are not to be shared, but there are filters, then we
  759. // need to track the amount of each idle allocation to "filter" in order to
  760. // maintain parity with the results when idle is shared. That is, we want
  761. // to return only the idle costs that would have been shared with the given
  762. // results, even if the filter had not been applied.
  763. //
  764. // For example, consider these results from aggregating by namespace with
  765. // two clusters:
  766. //
  767. // namespace1: 25.00
  768. // namespace2: 30.00
  769. // namespace3: 15.00
  770. // idle: 30.00
  771. //
  772. // When we then filter by cluster==cluster1, namespaces 2 and 3 are
  773. // reduced by the amount that existed on cluster2. Then, idle must also be
  774. // reduced by the relevant amount:
  775. //
  776. // namespace1: 25.00
  777. // namespace2: 15.00
  778. // idle: 20.00
  779. //
  780. // Note that this can happen for any field, not just cluster, so we again
  781. // need to track this on a per-cluster or per-node, per-allocation, per-resource basis.
  782. var idleFiltrationCoefficients map[string]map[string]map[string]float64
  783. if len(options.FilterFuncs) > 0 && options.ShareIdle == ShareNone {
  784. idleFiltrationCoefficients, err = computeIdleCoeffs(options, as, shareSet)
  785. if err != nil {
  786. return fmt.Errorf("error computing idle filtration coefficients: %s", err)
  787. }
  788. }
  789. // (2c) Convert SharedHourlyCosts to Allocations in the shareSet. This must
  790. // come after idle coefficients are computes so that allocations generated
  791. // by shared overhead do not skew the idle coefficient computation.
  792. for name, cost := range options.SharedHourlyCosts {
  793. if cost > 0.0 {
  794. hours := as.Resolution().Hours()
  795. // If set ends in the future, adjust hours accordingly
  796. diff := time.Now().Sub(as.End())
  797. if diff < 0.0 {
  798. hours += diff.Hours()
  799. }
  800. totalSharedCost := cost * hours
  801. shareSet.Insert(&Allocation{
  802. Name: fmt.Sprintf("%s/%s", name, SharedSuffix),
  803. Start: as.Start(),
  804. End: as.End(),
  805. SharedCost: totalSharedCost,
  806. Properties: &AllocationProperties{Cluster: SharedSuffix}, // The allocation needs to belong to a cluster,but it really doesn't matter which one, so just make it clear.
  807. })
  808. }
  809. }
  810. // (2d) Compute share coefficients for shared resources. These are computed
  811. // after idle coefficients, and are computed for the aggregated allocations
  812. // of the main allocation set. See above for details and an example.
  813. var shareCoefficients map[string]float64
  814. if shareSet.Length() > 0 {
  815. shareCoefficients, err = computeShareCoeffs(aggregateBy, options, as)
  816. if err != nil {
  817. return fmt.Errorf("error computing share coefficients: %s", err)
  818. }
  819. }
  820. // (3-5) Filter, distribute idle cost, and aggregate (in that order)
  821. for _, alloc := range as.allocations {
  822. idleKey, err := alloc.getIdleKey(options)
  823. if err != nil {
  824. log.DedupedInfof(5,"AllocationSet.AggregateBy: missing idleKey for allocation: %s", alloc.Name)
  825. continue
  826. }
  827. skip := false
  828. // (3) If any of the filter funcs fail, immediately skip the allocation.
  829. for _, ff := range options.FilterFuncs {
  830. if !ff(alloc) {
  831. skip = true
  832. break
  833. }
  834. }
  835. if skip {
  836. // If we are tracking idle filtration coefficients, delete the
  837. // entry corresponding to the filtered allocation. (Deleting the
  838. // entry will result in that proportional amount being removed
  839. // from the idle allocation at the end of the process.)
  840. if idleFiltrationCoefficients != nil {
  841. if ifcc, ok := idleFiltrationCoefficients[idleKey]; ok {
  842. delete(ifcc, alloc.Name)
  843. }
  844. }
  845. continue
  846. }
  847. // (4) Distribute idle allocations according to the idle coefficients
  848. // NOTE: if idle allocation is off (i.e. ShareIdle == ShareNone) then
  849. // all idle allocations will be in the aggSet at this point, so idleSet
  850. // will be empty and we won't enter this block.
  851. if idleSet.Length() > 0 {
  852. // Distribute idle allocations by coefficient per-idleKey, per-allocation
  853. for _, idleAlloc := range idleSet.allocations {
  854. // Only share idle if the idleKey matches; i.e. the allocation
  855. // is from the same idleKey as the idle costs
  856. iaIdleKey, err := idleAlloc.getIdleKey(options)
  857. if err != nil {
  858. return err
  859. }
  860. if iaIdleKey != idleKey {
  861. continue
  862. }
  863. // Make sure idle coefficients exist
  864. if _, ok := idleCoefficients[idleKey]; !ok {
  865. log.Warningf("AllocationSet.AggregateBy: error getting idle coefficient: no idleKey '%s' for '%s'", idleKey, alloc.Name)
  866. continue
  867. }
  868. if _, ok := idleCoefficients[idleKey][alloc.Name]; !ok {
  869. log.Warningf("AllocationSet.AggregateBy: error getting idle coefficient for '%s'", alloc.Name)
  870. continue
  871. }
  872. alloc.CPUCoreHours += idleAlloc.CPUCoreHours * idleCoefficients[idleKey][alloc.Name]["cpu"]
  873. alloc.GPUHours += idleAlloc.GPUHours * idleCoefficients[idleKey][alloc.Name]["gpu"]
  874. alloc.RAMByteHours += idleAlloc.RAMByteHours * idleCoefficients[idleKey][alloc.Name]["ram"]
  875. idleCPUCost := idleAlloc.CPUCost * idleCoefficients[idleKey][alloc.Name]["cpu"]
  876. idleGPUCost := idleAlloc.GPUCost * idleCoefficients[idleKey][alloc.Name]["gpu"]
  877. idleRAMCost := idleAlloc.RAMCost * idleCoefficients[idleKey][alloc.Name]["ram"]
  878. alloc.CPUCost += idleCPUCost
  879. alloc.GPUCost += idleGPUCost
  880. alloc.RAMCost += idleRAMCost
  881. }
  882. }
  883. // (5) generate key to use for aggregation-by-key and allocation name
  884. key := alloc.generateKey(aggregateBy)
  885. alloc.Name = key
  886. if options.MergeUnallocated && alloc.IsUnallocated() {
  887. alloc.Name = UnallocatedSuffix
  888. }
  889. // Inserting the allocation with the generated key for a name will
  890. // perform the actual basic aggregation step.
  891. aggSet.Insert(alloc)
  892. }
  893. // (6) If idle is shared and resources are shared, it's possible that some
  894. // amount of idle cost will be shared with a shared resource. Distribute
  895. // that idle allocation, if it exists, to the respective shared allocations
  896. // before sharing with the aggregated allocations.
  897. if idleSet.Length() > 0 && shareSet.Length() > 0 {
  898. for _, alloc := range shareSet.allocations {
  899. idleKey, err := alloc.getIdleKey(options)
  900. if err != nil {
  901. log.DedupedWarningf(5, "AllocationSet.AggregateBy: missing idleKey for allocation: %s", alloc.Name)
  902. continue
  903. }
  904. // Distribute idle allocations by coefficient per-idleKey, per-allocation
  905. for _, idleAlloc := range idleSet.allocations {
  906. // Only share idle if the idleKey matches; i.e. the allocation
  907. // is from the same idleKey as the idle costs
  908. iaIdleKey, err := idleAlloc.getIdleKey(options)
  909. if err != nil {
  910. return nil
  911. }
  912. if iaIdleKey != idleKey {
  913. continue
  914. }
  915. // Make sure idle coefficients exist
  916. if _, ok := idleCoefficients[idleKey]; !ok {
  917. log.Warningf("AllocationSet.AggregateBy: error getting idle coefficient: no idleKey '%s' for '%s'", idleKey, alloc.Name)
  918. continue
  919. }
  920. if _, ok := idleCoefficients[idleKey][alloc.Name]; !ok {
  921. log.Warningf("AllocationSet.AggregateBy: error getting idle coefficient for '%s'", alloc.Name)
  922. continue
  923. }
  924. alloc.CPUCoreHours += idleAlloc.CPUCoreHours * idleCoefficients[idleKey][alloc.Name]["cpu"]
  925. alloc.GPUHours += idleAlloc.GPUHours * idleCoefficients[idleKey][alloc.Name]["gpu"]
  926. alloc.RAMByteHours += idleAlloc.RAMByteHours * idleCoefficients[idleKey][alloc.Name]["ram"]
  927. idleCPUCost := idleAlloc.CPUCost * idleCoefficients[idleKey][alloc.Name]["cpu"]
  928. idleGPUCost := idleAlloc.GPUCost * idleCoefficients[idleKey][alloc.Name]["gpu"]
  929. idleRAMCost := idleAlloc.RAMCost * idleCoefficients[idleKey][alloc.Name]["ram"]
  930. alloc.CPUCost += idleCPUCost
  931. alloc.GPUCost += idleGPUCost
  932. alloc.RAMCost += idleRAMCost
  933. }
  934. }
  935. }
  936. // groupingIdleFiltrationCoeffs is used to track per-resource idle
  937. // coefficients on a cluster-by-cluster or node-by-node basis depending
  938. // on the IdleByNode option. It is, essentailly, an aggregation of
  939. // idleFiltrationCoefficients after they have been
  940. // filtered above (in step 3)
  941. var groupingIdleFiltrationCoeffs map[string]map[string]float64
  942. if idleFiltrationCoefficients != nil {
  943. groupingIdleFiltrationCoeffs = map[string]map[string]float64{}
  944. for idleKey, m := range idleFiltrationCoefficients {
  945. if _, ok := groupingIdleFiltrationCoeffs[idleKey]; !ok {
  946. groupingIdleFiltrationCoeffs[idleKey] = map[string]float64{
  947. "cpu": 0.0,
  948. "gpu": 0.0,
  949. "ram": 0.0,
  950. }
  951. }
  952. for _, n := range m {
  953. for resource, val := range n {
  954. groupingIdleFiltrationCoeffs[idleKey][resource] += val
  955. }
  956. }
  957. }
  958. }
  959. // (7) If we have both un-shared idle allocations and idle filtration
  960. // coefficients then apply those. See step (2b) for an example.
  961. if len(aggSet.idleKeys) > 0 && groupingIdleFiltrationCoeffs != nil {
  962. for idleKey := range aggSet.idleKeys {
  963. idleAlloc := aggSet.Get(idleKey)
  964. iaIdleKey, err := idleAlloc.getIdleKey(options)
  965. if err != nil {
  966. log.Warningf("AllocationSet.AggregateBy: idle allocation without IdleKey: %s", idleAlloc)
  967. continue
  968. }
  969. if resourceCoeffs, ok := groupingIdleFiltrationCoeffs[iaIdleKey]; ok {
  970. idleAlloc.CPUCost *= resourceCoeffs["cpu"]
  971. idleAlloc.CPUCoreHours *= resourceCoeffs["cpu"]
  972. idleAlloc.RAMCost *= resourceCoeffs["ram"]
  973. idleAlloc.RAMByteHours *= resourceCoeffs["ram"]
  974. }
  975. }
  976. }
  977. // (8) Distribute shared allocations according to the share coefficients.
  978. if shareSet.Length() > 0 {
  979. for _, alloc := range aggSet.allocations {
  980. for _, sharedAlloc := range shareSet.allocations {
  981. if _, ok := shareCoefficients[alloc.Name]; !ok {
  982. log.Warningf("AllocationSet.AggregateBy: error getting share coefficienct for '%s'", alloc.Name)
  983. continue
  984. }
  985. alloc.SharedCost += sharedAlloc.TotalCost() * shareCoefficients[alloc.Name]
  986. }
  987. }
  988. }
  989. // (9) Aggregate external allocations into aggregated allocations. This may
  990. // not be possible for every external allocation, but attempt to find an
  991. // exact key match, given each external allocation's proerties, and
  992. // aggregate if an exact match is found.
  993. for _, alloc := range externalSet.allocations {
  994. skip := false
  995. for _, ff := range options.FilterFuncs {
  996. if !ff(alloc) {
  997. skip = true
  998. break
  999. }
  1000. }
  1001. if !skip {
  1002. key := alloc.generateKey(aggregateBy)
  1003. alloc.Name = key
  1004. aggSet.Insert(alloc)
  1005. }
  1006. }
  1007. // (10) Combine all idle allocations into a single "__idle__" allocation
  1008. if !options.SplitIdle {
  1009. for _, idleAlloc := range aggSet.IdleAllocations() {
  1010. aggSet.Delete(idleAlloc.Name)
  1011. idleAlloc.Name = IdleSuffix
  1012. aggSet.Insert(idleAlloc)
  1013. }
  1014. }
  1015. as.allocations = aggSet.allocations
  1016. return nil
  1017. }
  1018. func computeShareCoeffs(aggregateBy []string, options *AllocationAggregationOptions, as *AllocationSet) (map[string]float64, error) {
  1019. // Compute coeffs by totalling per-allocation, then dividing by the total.
  1020. coeffs := map[string]float64{}
  1021. // Compute totals for all allocations
  1022. total := 0.0
  1023. // ShareEven counts each aggregation with even weight, whereas ShareWeighted
  1024. // counts each aggregation proportionally to its respective costs
  1025. shareType := options.ShareSplit
  1026. // Record allocation values first, then normalize by totals to get percentages
  1027. for _, alloc := range as.allocations {
  1028. if alloc.IsIdle() {
  1029. // Skip idle allocations in coefficient calculation
  1030. continue
  1031. }
  1032. // Determine the post-aggregation key under which the allocation will
  1033. // be shared.
  1034. name := alloc.generateKey(aggregateBy)
  1035. // If the current allocation will be filtered out in step 3, contribute
  1036. // its share of the shared coefficient to a "__filtered__" bin, which
  1037. // will ultimately be dropped. This step ensures that the shared cost
  1038. // of a non-filtered allocation will be conserved even when the filter
  1039. // is removed. (Otherwise, all the shared cost will get redistributed
  1040. // over the unfiltered results, inflating their shared costs.)
  1041. filtered := false
  1042. for _, ff := range options.FilterFuncs {
  1043. if !ff(alloc) {
  1044. filtered = true
  1045. break
  1046. }
  1047. }
  1048. if filtered {
  1049. name = "__filtered__"
  1050. }
  1051. if shareType == ShareEven {
  1052. // Even distribution is not additive - set to 1.0 for everything
  1053. coeffs[name] = 1.0
  1054. // Total for even distribution is always the number of coefficients
  1055. total = float64(len(coeffs))
  1056. } else {
  1057. // Both are additive for weighted distribution, where each
  1058. // cumulative coefficient will be divided by the total.
  1059. coeffs[name] += alloc.TotalCost()
  1060. total += alloc.TotalCost()
  1061. }
  1062. }
  1063. // Normalize coefficients by totals
  1064. for a := range coeffs {
  1065. if coeffs[a] > 0 && total > 0 {
  1066. coeffs[a] /= total
  1067. } else {
  1068. log.Warningf("ETL: invalid values for shared coefficients: %d, %d", coeffs[a], total)
  1069. coeffs[a] = 0.0
  1070. }
  1071. }
  1072. return coeffs, nil
  1073. }
  1074. func computeIdleCoeffs(options *AllocationAggregationOptions, as *AllocationSet, shareSet *AllocationSet) (map[string]map[string]map[string]float64, error) {
  1075. types := []string{"cpu", "gpu", "ram"}
  1076. // Compute idle coefficients, then save them in AllocationAggregationOptions
  1077. coeffs := map[string]map[string]map[string]float64{}
  1078. // Compute totals per resource for CPU, GPU, RAM, and PV
  1079. totals := map[string]map[string]float64{}
  1080. // ShareEven counts each allocation with even weight, whereas ShareWeighted
  1081. // counts each allocation proportionally to its respective costs
  1082. shareType := options.ShareIdle
  1083. // Record allocation values first, then normalize by totals to get percentages
  1084. for _, alloc := range as.allocations {
  1085. if alloc.IsIdle() {
  1086. // Skip idle allocations in coefficient calculation
  1087. continue
  1088. }
  1089. idleKey, err := alloc.getIdleKey(options)
  1090. if err != nil {
  1091. // skip allocations that are missing idleKey
  1092. continue
  1093. }
  1094. // get the name key for the allocation
  1095. name := alloc.Name
  1096. // Create key based tables if they don't exist
  1097. if _, ok := coeffs[idleKey]; !ok {
  1098. coeffs[idleKey] = map[string]map[string]float64{}
  1099. }
  1100. if _, ok := totals[idleKey]; !ok {
  1101. totals[idleKey] = map[string]float64{}
  1102. }
  1103. if _, ok := coeffs[idleKey][name]; !ok {
  1104. coeffs[idleKey][name] = map[string]float64{}
  1105. }
  1106. if shareType == ShareEven {
  1107. for _, r := range types {
  1108. // Not additive - hard set to 1.0
  1109. coeffs[idleKey][name][r] = 1.0
  1110. // totals are additive
  1111. totals[idleKey][r] += 1.0
  1112. }
  1113. } else {
  1114. coeffs[idleKey][name]["cpu"] += alloc.CPUTotalCost()
  1115. coeffs[idleKey][name]["gpu"] += alloc.GPUTotalCost()
  1116. coeffs[idleKey][name]["ram"] += alloc.RAMTotalCost()
  1117. totals[idleKey]["cpu"] += alloc.CPUTotalCost()
  1118. totals[idleKey]["gpu"] += alloc.GPUTotalCost()
  1119. totals[idleKey]["ram"] += alloc.RAMTotalCost()
  1120. }
  1121. }
  1122. // Do the same for shared allocations
  1123. for _, alloc := range shareSet.allocations {
  1124. if alloc.IsIdle() {
  1125. // Skip idle allocations in coefficient calculation
  1126. continue
  1127. }
  1128. // idleKey will be providerId or cluster
  1129. idleKey, err := alloc.getIdleKey(options)
  1130. if err != nil {
  1131. return nil, err
  1132. }
  1133. // get the name key for the allocation
  1134. name := alloc.Name
  1135. // Create idleKey based tables if they don't exist
  1136. if _, ok := coeffs[idleKey]; !ok {
  1137. coeffs[idleKey] = map[string]map[string]float64{}
  1138. }
  1139. if _, ok := totals[idleKey]; !ok {
  1140. totals[idleKey] = map[string]float64{}
  1141. }
  1142. if _, ok := coeffs[idleKey][name]; !ok {
  1143. coeffs[idleKey][name] = map[string]float64{}
  1144. }
  1145. if shareType == ShareEven {
  1146. for _, r := range types {
  1147. // Not additive - hard set to 1.0
  1148. coeffs[idleKey][name][r] = 1.0
  1149. // totals are additive
  1150. totals[idleKey][r] += 1.0
  1151. }
  1152. } else {
  1153. coeffs[idleKey][name]["cpu"] += alloc.CPUTotalCost()
  1154. coeffs[idleKey][name]["gpu"] += alloc.GPUTotalCost()
  1155. coeffs[idleKey][name]["ram"] += alloc.RAMTotalCost()
  1156. totals[idleKey]["cpu"] += alloc.CPUTotalCost()
  1157. totals[idleKey]["gpu"] += alloc.GPUTotalCost()
  1158. totals[idleKey]["ram"] += alloc.RAMTotalCost()
  1159. }
  1160. }
  1161. // Normalize coefficients by totals
  1162. for c := range coeffs {
  1163. for a := range coeffs[c] {
  1164. for _, r := range types {
  1165. if coeffs[c][a][r] > 0 && totals[c][r] > 0 {
  1166. coeffs[c][a][r] /= totals[c][r]
  1167. }
  1168. }
  1169. }
  1170. }
  1171. return coeffs, nil
  1172. }
  1173. // getIdleKey returns the providerId or cluster of an Allocation depending on the IdleByNode
  1174. // option in the AllocationAggregationOptions and an error if the respective field is missing
  1175. func (a *Allocation) getIdleKey(options *AllocationAggregationOptions) (string, error) {
  1176. var idleKey string
  1177. if options.IdleByNode {
  1178. // Key allocations to ProviderId to match against node
  1179. idleKey = a.Properties.ProviderID
  1180. if idleKey == "" {
  1181. return idleKey, fmt.Errorf("ProviderId is not set")
  1182. }
  1183. } else {
  1184. // key the allocations by cluster id
  1185. idleKey = a.Properties.Cluster
  1186. if idleKey == "" {
  1187. return idleKey, fmt.Errorf("ClusterProp is not set")
  1188. }
  1189. }
  1190. return idleKey, nil
  1191. }
  1192. func (a *Allocation) generateKey(aggregateBy []string) string {
  1193. if a == nil {
  1194. return ""
  1195. }
  1196. // Names will ultimately be joined into a single name, which uniquely
  1197. // identifies allocations.
  1198. names := []string{}
  1199. for _, agg := range aggregateBy {
  1200. switch true {
  1201. case agg == AllocationClusterProp:
  1202. names = append(names, a.Properties.Cluster)
  1203. case agg == AllocationNodeProp:
  1204. names = append(names, a.Properties.Node)
  1205. case agg == AllocationNamespaceProp:
  1206. names = append(names, a.Properties.Namespace)
  1207. case agg == AllocationControllerKindProp:
  1208. controllerKind := a.Properties.ControllerKind
  1209. if controllerKind == "" {
  1210. // Indicate that allocation has no controller
  1211. controllerKind = UnallocatedSuffix
  1212. }
  1213. names = append(names, controllerKind)
  1214. case agg == AllocationDaemonSetProp || agg == AllocationStatefulSetProp || agg == AllocationDeploymentProp || agg == AllocationJobProp:
  1215. controller := a.Properties.Controller
  1216. if agg != a.Properties.ControllerKind || controller == "" {
  1217. // The allocation does not have the specified controller kind
  1218. controller = UnallocatedSuffix
  1219. }
  1220. names = append(names, controller)
  1221. case agg == AllocationControllerProp:
  1222. controller := a.Properties.Controller
  1223. if controller == "" {
  1224. // Indicate that allocation has no controller
  1225. controller = UnallocatedSuffix
  1226. } else if a.Properties.ControllerKind != "" {
  1227. controller = fmt.Sprintf("%s:%s", a.Properties.ControllerKind, controller)
  1228. }
  1229. names = append(names, controller)
  1230. case agg == AllocationPodProp:
  1231. names = append(names, a.Properties.Pod)
  1232. case agg == AllocationContainerProp:
  1233. names = append(names, a.Properties.Container)
  1234. case agg == AllocationServiceProp:
  1235. services := a.Properties.Services
  1236. if services == nil || len(services) == 0 {
  1237. // Indicate that allocation has no services
  1238. names = append(names, UnallocatedSuffix)
  1239. } else {
  1240. // This just uses the first service
  1241. for _, service := range services {
  1242. names = append(names, service)
  1243. break
  1244. }
  1245. }
  1246. case strings.HasPrefix(agg, "label:"):
  1247. labels := a.Properties.Labels
  1248. if labels == nil {
  1249. // Indicate that allocation has no labels
  1250. names = append(names, UnallocatedSuffix)
  1251. } else {
  1252. labelNames := []string{}
  1253. aggLabels := strings.Split(strings.TrimPrefix(agg, "label:"), ";")
  1254. for _, labelName := range aggLabels {
  1255. if val, ok := labels[labelName]; ok {
  1256. labelNames = append(labelNames, fmt.Sprintf("%s=%s", labelName, val))
  1257. } else if indexOf(UnallocatedSuffix, labelNames) == -1 { // if UnallocatedSuffix not already in names
  1258. labelNames = append(labelNames, UnallocatedSuffix)
  1259. }
  1260. }
  1261. // resolve arbitrary ordering. e.g., app=app0/env=env0 is the same agg as env=env0/app=app0
  1262. if len(labelNames) > 1 {
  1263. sort.Strings(labelNames)
  1264. }
  1265. unallocatedSuffixIndex := indexOf(UnallocatedSuffix, labelNames)
  1266. // suffix should be at index 0 if it exists b/c of underscores
  1267. if unallocatedSuffixIndex != -1 {
  1268. labelNames = append(labelNames[:unallocatedSuffixIndex], labelNames[unallocatedSuffixIndex+1:]...)
  1269. labelNames = append(labelNames, UnallocatedSuffix) // append to end
  1270. }
  1271. names = append(names, labelNames...)
  1272. }
  1273. case strings.HasPrefix(agg, "annotation:"):
  1274. annotations := a.Properties.Annotations
  1275. if annotations == nil {
  1276. // Indicate that allocation has no annotations
  1277. names = append(names, UnallocatedSuffix)
  1278. } else {
  1279. annotationNames := []string{}
  1280. aggAnnotations := strings.Split(strings.TrimPrefix(agg, "annotation:"), ";")
  1281. for _, annotationName := range aggAnnotations {
  1282. if val, ok := annotations[annotationName]; ok {
  1283. annotationNames = append(annotationNames, fmt.Sprintf("%s=%s", annotationName, val))
  1284. } else if indexOf(UnallocatedSuffix, annotationNames) == -1 { // if UnallocatedSuffix not already in names
  1285. annotationNames = append(annotationNames, UnallocatedSuffix)
  1286. }
  1287. }
  1288. // resolve arbitrary ordering. e.g., app=app0/env=env0 is the same agg as env=env0/app=app0
  1289. if len(annotationNames) > 1 {
  1290. sort.Strings(annotationNames)
  1291. }
  1292. unallocatedSuffixIndex := indexOf(UnallocatedSuffix, annotationNames)
  1293. // suffix should be at index 0 if it exists b/c of underscores
  1294. if unallocatedSuffixIndex != -1 {
  1295. annotationNames = append(annotationNames[:unallocatedSuffixIndex], annotationNames[unallocatedSuffixIndex+1:]...)
  1296. annotationNames = append(annotationNames, UnallocatedSuffix) // append to end
  1297. }
  1298. names = append(names, annotationNames...)
  1299. }
  1300. }
  1301. }
  1302. return strings.Join(names, "/")
  1303. }
  1304. // TODO:CLEANUP get rid of this
  1305. // Helper function to check for slice membership. Not sure if repeated elsewhere in our codebase.
  1306. func indexOf(v string, arr []string) int {
  1307. for i, s := range arr {
  1308. // This is caseless equivalence
  1309. if strings.EqualFold(v, s) {
  1310. return i
  1311. }
  1312. }
  1313. return -1
  1314. }
  1315. // Clone returns a new AllocationSet with a deep copy of the given
  1316. // AllocationSet's allocations.
  1317. func (as *AllocationSet) Clone() *AllocationSet {
  1318. if as == nil {
  1319. return nil
  1320. }
  1321. as.RLock()
  1322. defer as.RUnlock()
  1323. allocs := map[string]*Allocation{}
  1324. for k, v := range as.allocations {
  1325. allocs[k] = v.Clone()
  1326. }
  1327. externalKeys := map[string]bool{}
  1328. for k, v := range as.externalKeys {
  1329. externalKeys[k] = v
  1330. }
  1331. idleKeys := map[string]bool{}
  1332. for k, v := range as.idleKeys {
  1333. idleKeys[k] = v
  1334. }
  1335. return &AllocationSet{
  1336. allocations: allocs,
  1337. externalKeys: externalKeys,
  1338. idleKeys: idleKeys,
  1339. Window: as.Window.Clone(),
  1340. }
  1341. }
  1342. // ComputeIdleAllocations computes the idle allocations for the AllocationSet,
  1343. // given a set of Assets. Ideally, assetSet should contain only Nodes, but if
  1344. // it contains other Assets, they will be ignored; only CPU, GPU and RAM are
  1345. // considered for idle allocation. If the Nodes have adjustments, then apply
  1346. // the adjustments proportionally to each of the resources so that total
  1347. // allocation with idle reflects the adjusted node costs. One idle allocation
  1348. // per-cluster will be computed and returned, keyed by cluster_id.
  1349. func (as *AllocationSet) ComputeIdleAllocations(assetSet *AssetSet) (map[string]*Allocation, error) {
  1350. if as == nil {
  1351. return nil, fmt.Errorf("cannot compute idle allocation for nil AllocationSet")
  1352. }
  1353. if assetSet == nil {
  1354. return nil, fmt.Errorf("cannot compute idle allocation with nil AssetSet")
  1355. }
  1356. if !as.Window.Equal(assetSet.Window) {
  1357. return nil, fmt.Errorf("cannot compute idle allocation for sets with mismatched windows: %s != %s", as.Window, assetSet.Window)
  1358. }
  1359. window := as.Window
  1360. // Build a map of cumulative cluster asset costs, per resource; i.e.
  1361. // cluster-to-{cpu|gpu|ram}-to-cost.
  1362. assetClusterResourceCosts := map[string]map[string]float64{}
  1363. assetSet.Each(func(key string, a Asset) {
  1364. if node, ok := a.(*Node); ok {
  1365. if _, ok := assetClusterResourceCosts[node.Properties().Cluster]; !ok {
  1366. assetClusterResourceCosts[node.Properties().Cluster] = map[string]float64{}
  1367. }
  1368. // adjustmentRate is used to scale resource costs proportionally
  1369. // by the adjustment. This is necessary because we only get one
  1370. // adjustment per Node, not one per-resource-per-Node.
  1371. //
  1372. // e.g. total cost = $90, adjustment = -$10 => 0.9
  1373. // e.g. total cost = $150, adjustment = -$300 => 0.3333
  1374. // e.g. total cost = $150, adjustment = $50 => 1.5
  1375. adjustmentRate := 1.0
  1376. if node.TotalCost()-node.Adjustment() == 0 {
  1377. // If (totalCost - adjustment) is 0.0 then adjustment cancels
  1378. // the entire node cost and we should make everything 0
  1379. // without dividing by 0.
  1380. adjustmentRate = 0.0
  1381. log.DedupedWarningf(5, "Compute Idle Allocations: Node Cost Adjusted to $0.00 for %s", node.properties.Name)
  1382. } else if node.Adjustment() != 0.0 {
  1383. // adjustmentRate is the ratio of cost-with-adjustment (i.e. TotalCost)
  1384. // to cost-without-adjustment (i.e. TotalCost - Adjustment).
  1385. adjustmentRate = node.TotalCost() / (node.TotalCost() - node.Adjustment())
  1386. }
  1387. cpuCost := node.CPUCost * (1.0 - node.Discount) * adjustmentRate
  1388. gpuCost := node.GPUCost * (1.0 - node.Discount) * adjustmentRate
  1389. ramCost := node.RAMCost * (1.0 - node.Discount) * adjustmentRate
  1390. assetClusterResourceCosts[node.Properties().Cluster]["cpu"] += cpuCost
  1391. assetClusterResourceCosts[node.Properties().Cluster]["gpu"] += gpuCost
  1392. assetClusterResourceCosts[node.Properties().Cluster]["ram"] += ramCost
  1393. }
  1394. })
  1395. // Determine start, end on a per-cluster basis
  1396. clusterStarts := map[string]time.Time{}
  1397. clusterEnds := map[string]time.Time{}
  1398. // Subtract allocated costs from asset costs, leaving only the remaining
  1399. // idle costs.
  1400. as.Each(func(name string, a *Allocation) {
  1401. cluster := a.Properties.Cluster
  1402. if cluster == "" {
  1403. // Failed to find allocation's cluster
  1404. return
  1405. }
  1406. if _, ok := assetClusterResourceCosts[cluster]; !ok {
  1407. // Failed to find assets for allocation's cluster
  1408. return
  1409. }
  1410. // Set cluster (start, end) if they are either not currently set,
  1411. // or if the detected (start, end) of the current allocation falls
  1412. // before or after, respectively, the current values.
  1413. if s, ok := clusterStarts[cluster]; !ok || a.Start.Before(s) {
  1414. clusterStarts[cluster] = a.Start
  1415. }
  1416. if e, ok := clusterEnds[cluster]; !ok || a.End.After(e) {
  1417. clusterEnds[cluster] = a.End
  1418. }
  1419. assetClusterResourceCosts[cluster]["cpu"] -= a.CPUTotalCost()
  1420. assetClusterResourceCosts[cluster]["gpu"] -= a.GPUTotalCost()
  1421. assetClusterResourceCosts[cluster]["ram"] -= a.RAMTotalCost()
  1422. })
  1423. // Turn remaining un-allocated asset costs into idle allocations
  1424. idleAllocs := map[string]*Allocation{}
  1425. for cluster, resources := range assetClusterResourceCosts {
  1426. // Default start and end to the (start, end) of the given window, but
  1427. // use the actual, detected (start, end) pair if they are available.
  1428. start := *window.Start()
  1429. if s, ok := clusterStarts[cluster]; ok && window.Contains(s) {
  1430. start = s
  1431. }
  1432. end := *window.End()
  1433. if e, ok := clusterEnds[cluster]; ok && window.Contains(e) {
  1434. end = e
  1435. }
  1436. idleAlloc := &Allocation{
  1437. Name: fmt.Sprintf("%s/%s", cluster, IdleSuffix),
  1438. Window: window.Clone(),
  1439. Properties: &AllocationProperties{Cluster: cluster},
  1440. Start: start,
  1441. End: end,
  1442. CPUCost: resources["cpu"],
  1443. GPUCost: resources["gpu"],
  1444. RAMCost: resources["ram"],
  1445. }
  1446. // Do not continue if multiple idle allocations are computed for a
  1447. // single cluster.
  1448. if _, ok := idleAllocs[cluster]; ok {
  1449. return nil, fmt.Errorf("duplicate idle allocations for cluster %s", cluster)
  1450. }
  1451. idleAllocs[cluster] = idleAlloc
  1452. }
  1453. return idleAllocs, nil
  1454. }
  1455. // ComputeIdleAllocationsByNode computes the idle allocations for the AllocationSet,
  1456. // given a set of Assets. Ideally, assetSet should contain only Nodes, but if
  1457. // it contains other Assets, they will be ignored; only CPU, GPU and RAM are
  1458. // considered for idle allocation. If the Nodes have adjustments, then apply
  1459. // the adjustments proportionally to each of the resources so that total
  1460. // allocation with idle reflects the adjusted node costs. One idle allocation
  1461. // per-node will be computed and returned, keyed by cluster_id.
  1462. func (as *AllocationSet) ComputeIdleAllocationsByNode(assetSet *AssetSet) (map[string]*Allocation, error) {
  1463. if as == nil {
  1464. return nil, fmt.Errorf("cannot compute idle allocation for nil AllocationSet")
  1465. }
  1466. if assetSet == nil {
  1467. return nil, fmt.Errorf("cannot compute idle allocation with nil AssetSet")
  1468. }
  1469. if !as.Window.Equal(assetSet.Window) {
  1470. return nil, fmt.Errorf("cannot compute idle allocation for sets with mismatched windows: %s != %s", as.Window, assetSet.Window)
  1471. }
  1472. window := as.Window
  1473. // Build a map of cumulative cluster asset costs, per resource; i.e.
  1474. // cluster-to-{cpu|gpu|ram}-to-cost.
  1475. assetNodeResourceCosts := map[string]map[string]float64{}
  1476. nodesByProviderId := map[string]*Node{}
  1477. assetSet.Each(func(key string, a Asset) {
  1478. if node, ok := a.(*Node); ok {
  1479. if _, ok := assetNodeResourceCosts[node.Properties().ProviderID]; ok || node.Properties().ProviderID == "" {
  1480. return
  1481. }
  1482. nodesByProviderId[node.Properties().ProviderID] = node
  1483. assetNodeResourceCosts[node.Properties().ProviderID] = map[string]float64{}
  1484. // adjustmentRate is used to scale resource costs proportionally
  1485. // by the adjustment. This is necessary because we only get one
  1486. // adjustment per Node, not one per-resource-per-Node.
  1487. //
  1488. // e.g. total cost = $90, adjustment = -$10 => 0.9
  1489. // e.g. total cost = $150, adjustment = -$300 => 0.3333
  1490. // e.g. total cost = $150, adjustment = $50 => 1.5
  1491. adjustmentRate := 1.0
  1492. if node.TotalCost()-node.Adjustment() == 0 {
  1493. // If (totalCost - adjustment) is 0.0 then adjustment cancels
  1494. // the entire node cost and we should make everything 0
  1495. // without dividing by 0.
  1496. adjustmentRate = 0.0
  1497. log.DedupedWarningf(5, "Compute Idle Allocations: Node Cost Adjusted to $0.00 for %s", node.properties.Name)
  1498. } else if node.Adjustment() != 0.0 {
  1499. // adjustmentRate is the ratio of cost-with-adjustment (i.e. TotalCost)
  1500. // to cost-without-adjustment (i.e. TotalCost - Adjustment).
  1501. adjustmentRate = node.TotalCost() / (node.TotalCost() - node.Adjustment())
  1502. }
  1503. cpuCost := node.CPUCost * (1.0 - node.Discount) * adjustmentRate
  1504. gpuCost := node.GPUCost * (1.0 - node.Discount) * adjustmentRate
  1505. ramCost := node.RAMCost * (1.0 - node.Discount) * adjustmentRate
  1506. assetNodeResourceCosts[node.Properties().ProviderID]["cpu"] += cpuCost
  1507. assetNodeResourceCosts[node.Properties().ProviderID]["gpu"] += gpuCost
  1508. assetNodeResourceCosts[node.Properties().ProviderID]["ram"] += ramCost
  1509. }
  1510. })
  1511. // Determine start, end on a per-cluster basis
  1512. nodeStarts := map[string]time.Time{}
  1513. nodeEnds := map[string]time.Time{}
  1514. // Subtract allocated costs from asset costs, leaving only the remaining
  1515. // idle costs.
  1516. as.Each(func(name string, a *Allocation) {
  1517. providerId := a.Properties.ProviderID
  1518. if providerId == "" {
  1519. // Failed to find allocation's node
  1520. return
  1521. }
  1522. if _, ok := assetNodeResourceCosts[providerId]; !ok {
  1523. // Failed to find assets for allocation's node
  1524. return
  1525. }
  1526. // Set cluster (start, end) if they are either not currently set,
  1527. // or if the detected (start, end) of the current allocation falls
  1528. // before or after, respectively, the current values.
  1529. if s, ok := nodeStarts[providerId]; !ok || a.Start.Before(s) {
  1530. nodeStarts[providerId] = a.Start
  1531. }
  1532. if e, ok := nodeEnds[providerId]; !ok || a.End.After(e) {
  1533. nodeEnds[providerId] = a.End
  1534. }
  1535. assetNodeResourceCosts[providerId]["cpu"] -= a.CPUTotalCost()
  1536. assetNodeResourceCosts[providerId]["gpu"] -= a.GPUTotalCost()
  1537. assetNodeResourceCosts[providerId]["ram"] -= a.RAMTotalCost()
  1538. })
  1539. // Turn remaining un-allocated asset costs into idle allocations
  1540. idleAllocs := map[string]*Allocation{}
  1541. for providerId, resources := range assetNodeResourceCosts {
  1542. // Default start and end to the (start, end) of the given window, but
  1543. // use the actual, detected (start, end) pair if they are available.
  1544. start := *window.Start()
  1545. if s, ok := nodeStarts[providerId]; ok && window.Contains(s) {
  1546. start = s
  1547. }
  1548. end := *window.End()
  1549. if e, ok := nodeEnds[providerId]; ok && window.Contains(e) {
  1550. end = e
  1551. }
  1552. node := nodesByProviderId[providerId]
  1553. idleAlloc := &Allocation{
  1554. Name: fmt.Sprintf("%s/%s", node.properties.Name, IdleSuffix),
  1555. Window: window.Clone(),
  1556. Properties: &AllocationProperties{
  1557. Cluster: node.properties.Cluster,
  1558. Node: node.properties.Name,
  1559. ProviderID: providerId,
  1560. },
  1561. Start: start,
  1562. End: end,
  1563. CPUCost: resources["cpu"],
  1564. GPUCost: resources["gpu"],
  1565. RAMCost: resources["ram"],
  1566. }
  1567. // Do not continue if multiple idle allocations are computed for a
  1568. // single node.
  1569. if _, ok := idleAllocs[providerId]; ok {
  1570. return nil, fmt.Errorf("duplicate idle allocations for node Provider ID: %s", providerId)
  1571. }
  1572. idleAllocs[providerId] = idleAlloc
  1573. }
  1574. return idleAllocs, nil
  1575. }
  1576. // Reconcile calculate the exact cost of Allocation by resource(cpu, ram, gpu etc) based on Asset(s) on which
  1577. // the Allocation depends.
  1578. func (as *AllocationSet) Reconcile(assetSet *AssetSet) error {
  1579. if as == nil {
  1580. return fmt.Errorf("cannot reconcile allocation for nil AllocationSet")
  1581. }
  1582. if assetSet == nil {
  1583. return fmt.Errorf("cannot reconcile allocation with nil AssetSet")
  1584. }
  1585. if !as.Window.Equal(assetSet.Window) {
  1586. return fmt.Errorf("cannot reconcile allocation for sets with mismatched windows: %s != %s", as.Window, assetSet.Window)
  1587. }
  1588. // Build map of Assets with type Node by their ProviderId so that they can be matched to Allocations to determine
  1589. // proper CPU GPU and RAM prices
  1590. nodeByProviderID := map[string]*Node{}
  1591. diskByName := map[string]*Disk{}
  1592. assetSet.Each(func(key string, a Asset) {
  1593. if node, ok := a.(*Node); ok && node.properties.ProviderID != "" {
  1594. nodeByProviderID[node.properties.ProviderID] = node
  1595. }
  1596. if disk, ok := a.(*Disk); ok {
  1597. diskByName[disk.properties.Name] = disk
  1598. }
  1599. })
  1600. // Match Assets against allocations and adjust allocation cost based on the proportion of the asset that they used
  1601. as.Each(func(name string, a *Allocation) {
  1602. a.reconcileNodes(nodeByProviderID)
  1603. a.reconcileDisks(diskByName)
  1604. })
  1605. return nil
  1606. }
  1607. func (a *Allocation) reconcileNodes(nodeByProviderID map[string]*Node) {
  1608. providerId := a.Properties.ProviderID
  1609. // Reconcile with node Assets
  1610. node, ok := nodeByProviderID[providerId]
  1611. if !ok {
  1612. // Failed to find node for allocation
  1613. return
  1614. }
  1615. // adjustmentRate is used to scale resource costs proportionally
  1616. // by the adjustment. This is necessary because we only get one
  1617. // adjustment per Node, not one per-resource-per-Node.
  1618. //
  1619. // e.g. total cost = $90, adjustment = -$10 => 0.9
  1620. // e.g. total cost = $150, adjustment = -$300 => 0.3333
  1621. // e.g. total cost = $150, adjustment = $50 => 1.5
  1622. adjustmentRate := 1.0
  1623. if node.TotalCost()-node.Adjustment() == 0 {
  1624. // If (totalCost - adjustment) is 0.0 then adjustment cancels
  1625. // the entire node cost and we should make everything 0
  1626. // without dividing by 0.
  1627. adjustmentRate = 0.0
  1628. } else if node.Adjustment() != 0.0 {
  1629. // adjustmentRate is the ratio of cost-with-adjustment (i.e. TotalCost)
  1630. // to cost-without-adjustment (i.e. TotalCost - Adjustment).
  1631. adjustmentRate = node.TotalCost() / (node.TotalCost() - node.Adjustment())
  1632. }
  1633. // Find total cost of each node resource for the window
  1634. cpuCost := node.CPUCost * (1.0 - node.Discount) * adjustmentRate
  1635. ramCost := node.RAMCost * (1.0 - node.Discount) * adjustmentRate
  1636. gpuCost := node.GPUCost * adjustmentRate
  1637. // Find the proportion of resource hours used by the allocation, checking for 0 denominators
  1638. cpuUsageProportion := 0.0
  1639. if node.CPUCoreHours != 0 {
  1640. cpuUsageProportion = a.CPUCoreHours / node.CPUCoreHours
  1641. } else {
  1642. log.Warningf("Missing CPU Hours for node Provider ID: %s", providerId)
  1643. }
  1644. ramUsageProportion := 0.0
  1645. if node.RAMByteHours != 0 {
  1646. ramUsageProportion = a.RAMByteHours / node.RAMByteHours
  1647. } else {
  1648. log.Warningf("Missing Ram Byte Hours for node Provider ID: %s", providerId)
  1649. }
  1650. gpuUsageProportion := 0.0
  1651. if node.GPUHours != 0 {
  1652. gpuUsageProportion = a.GPUHours / node.GPUHours
  1653. }
  1654. // No log for GPU because not all nodes have GPU
  1655. // Calculate the allocation's resource costs by the proportion of resources used and total costs
  1656. allocCPUCost := cpuUsageProportion * cpuCost
  1657. allocRAMCost := ramUsageProportion * ramCost
  1658. allocGPUCost := gpuUsageProportion * gpuCost
  1659. a.CPUCostAdjustment = allocCPUCost - a.CPUCost
  1660. a.RAMCostAdjustment = allocRAMCost - a.RAMCost
  1661. a.GPUCostAdjustment = allocGPUCost - a.GPUCost
  1662. }
  1663. func (a *Allocation) reconcileDisks(diskByName map[string]*Disk) {
  1664. pvs := a.PVs
  1665. if pvs == nil {
  1666. // No PV usage to reconcile
  1667. return
  1668. }
  1669. // Set PV Adjustment for allocation to 0 for idempotency
  1670. a.PVCostAdjustment = 0.0
  1671. for pvKey, pvUsage := range pvs {
  1672. disk, ok := diskByName[pvKey.Name]
  1673. if !ok {
  1674. // Failed to find disk in assets
  1675. continue
  1676. }
  1677. // Check the proportion of disk that is being used by
  1678. pvUsageProportion := 0.0
  1679. if disk.ByteHours != 0 {
  1680. pvUsageProportion = pvUsage.ByteHours / disk.ByteHours
  1681. } else {
  1682. log.Warningf("Missing Byte Hours for disk: %s", pvKey)
  1683. }
  1684. // take proportion of disk adjusted cost
  1685. allocPVCost := pvUsageProportion * disk.TotalCost()
  1686. // PVCostAdjustment is cumulative as there can be many PVs for each Allocation
  1687. a.PVCostAdjustment += allocPVCost - pvUsage.Cost
  1688. }
  1689. }
  1690. // Delete removes the allocation with the given name from the set
  1691. func (as *AllocationSet) Delete(name string) {
  1692. if as == nil {
  1693. return
  1694. }
  1695. as.Lock()
  1696. defer as.Unlock()
  1697. delete(as.externalKeys, name)
  1698. delete(as.idleKeys, name)
  1699. delete(as.allocations, name)
  1700. }
  1701. // Each invokes the given function for each Allocation in the set
  1702. func (as *AllocationSet) Each(f func(string, *Allocation)) {
  1703. if as == nil {
  1704. return
  1705. }
  1706. for k, a := range as.allocations {
  1707. f(k, a)
  1708. }
  1709. }
  1710. // End returns the End time of the AllocationSet window
  1711. func (as *AllocationSet) End() time.Time {
  1712. if as == nil {
  1713. log.Warningf("Allocation ETL: calling End on nil AllocationSet")
  1714. return time.Unix(0, 0)
  1715. }
  1716. if as.Window.End() == nil {
  1717. log.Warningf("Allocation ETL: AllocationSet with illegal window: End is nil; len(as.allocations)=%d", len(as.allocations))
  1718. return time.Unix(0, 0)
  1719. }
  1720. return *as.Window.End()
  1721. }
  1722. // Get returns the Allocation at the given key in the AllocationSet
  1723. func (as *AllocationSet) Get(key string) *Allocation {
  1724. as.RLock()
  1725. defer as.RUnlock()
  1726. if alloc, ok := as.allocations[key]; ok {
  1727. return alloc
  1728. }
  1729. return nil
  1730. }
  1731. // ExternalAllocations returns a map of the external allocations in the set.
  1732. // Returns clones of the actual Allocations, so mutability is not a problem.
  1733. func (as *AllocationSet) ExternalAllocations() map[string]*Allocation {
  1734. externals := map[string]*Allocation{}
  1735. if as.IsEmpty() {
  1736. return externals
  1737. }
  1738. as.RLock()
  1739. defer as.RUnlock()
  1740. for key := range as.externalKeys {
  1741. if alloc, ok := as.allocations[key]; ok {
  1742. externals[key] = alloc.Clone()
  1743. }
  1744. }
  1745. return externals
  1746. }
  1747. // ExternalCost returns the total aggregated external costs of the set
  1748. func (as *AllocationSet) ExternalCost() float64 {
  1749. if as.IsEmpty() {
  1750. return 0.0
  1751. }
  1752. as.RLock()
  1753. defer as.RUnlock()
  1754. externalCost := 0.0
  1755. for _, alloc := range as.allocations {
  1756. externalCost += alloc.ExternalCost
  1757. }
  1758. return externalCost
  1759. }
  1760. // IdleAllocations returns a map of the idle allocations in the AllocationSet.
  1761. // Returns clones of the actual Allocations, so mutability is not a problem.
  1762. func (as *AllocationSet) IdleAllocations() map[string]*Allocation {
  1763. idles := map[string]*Allocation{}
  1764. if as.IsEmpty() {
  1765. return idles
  1766. }
  1767. as.RLock()
  1768. defer as.RUnlock()
  1769. for key := range as.idleKeys {
  1770. if alloc, ok := as.allocations[key]; ok {
  1771. idles[key] = alloc.Clone()
  1772. }
  1773. }
  1774. return idles
  1775. }
  1776. // Insert aggregates the current entry in the AllocationSet by the given Allocation,
  1777. // but only if the Allocation is valid, i.e. matches the AllocationSet's window. If
  1778. // there is no existing entry, one is created. Nil error response indicates success.
  1779. func (as *AllocationSet) Insert(that *Allocation) error {
  1780. return as.insert(that)
  1781. }
  1782. func (as *AllocationSet) insert(that *Allocation) error {
  1783. if as == nil {
  1784. return fmt.Errorf("cannot insert into nil AllocationSet")
  1785. }
  1786. as.Lock()
  1787. defer as.Unlock()
  1788. if as.allocations == nil {
  1789. as.allocations = map[string]*Allocation{}
  1790. }
  1791. if as.externalKeys == nil {
  1792. as.externalKeys = map[string]bool{}
  1793. }
  1794. if as.idleKeys == nil {
  1795. as.idleKeys = map[string]bool{}
  1796. }
  1797. // Add the given Allocation to the existing entry, if there is one;
  1798. // otherwise just set directly into allocations
  1799. if _, ok := as.allocations[that.Name]; !ok {
  1800. as.allocations[that.Name] = that
  1801. } else {
  1802. as.allocations[that.Name].add(that)
  1803. }
  1804. // If the given Allocation is an external one, record that
  1805. if that.IsExternal() {
  1806. as.externalKeys[that.Name] = true
  1807. }
  1808. // If the given Allocation is an idle one, record that
  1809. if that.IsIdle() {
  1810. as.idleKeys[that.Name] = true
  1811. }
  1812. return nil
  1813. }
  1814. // IsEmpty returns true if the AllocationSet is nil, or if it contains
  1815. // zero allocations.
  1816. func (as *AllocationSet) IsEmpty() bool {
  1817. if as == nil || len(as.allocations) == 0 {
  1818. return true
  1819. }
  1820. as.RLock()
  1821. defer as.RUnlock()
  1822. return as.allocations == nil || len(as.allocations) == 0
  1823. }
  1824. // Length returns the number of Allocations in the set
  1825. func (as *AllocationSet) Length() int {
  1826. if as == nil {
  1827. return 0
  1828. }
  1829. as.RLock()
  1830. defer as.RUnlock()
  1831. return len(as.allocations)
  1832. }
  1833. // Map clones and returns a map of the AllocationSet's Allocations
  1834. func (as *AllocationSet) Map() map[string]*Allocation {
  1835. if as.IsEmpty() {
  1836. return map[string]*Allocation{}
  1837. }
  1838. return as.Clone().allocations
  1839. }
  1840. // MarshalJSON JSON-encodes the AllocationSet
  1841. func (as *AllocationSet) MarshalJSON() ([]byte, error) {
  1842. as.RLock()
  1843. defer as.RUnlock()
  1844. return json.Marshal(as.allocations)
  1845. }
  1846. // Resolution returns the AllocationSet's window duration
  1847. func (as *AllocationSet) Resolution() time.Duration {
  1848. return as.Window.Duration()
  1849. }
  1850. // Set uses the given Allocation to overwrite the existing entry in the
  1851. // AllocationSet under the Allocation's name.
  1852. func (as *AllocationSet) Set(alloc *Allocation) error {
  1853. if as.IsEmpty() {
  1854. as.Lock()
  1855. as.allocations = map[string]*Allocation{}
  1856. as.externalKeys = map[string]bool{}
  1857. as.idleKeys = map[string]bool{}
  1858. as.Unlock()
  1859. }
  1860. as.Lock()
  1861. defer as.Unlock()
  1862. as.allocations[alloc.Name] = alloc
  1863. // If the given Allocation is an external one, record that
  1864. if alloc.IsExternal() {
  1865. as.externalKeys[alloc.Name] = true
  1866. }
  1867. // If the given Allocation is an idle one, record that
  1868. if alloc.IsIdle() {
  1869. as.idleKeys[alloc.Name] = true
  1870. }
  1871. return nil
  1872. }
  1873. // Start returns the Start time of the AllocationSet window
  1874. func (as *AllocationSet) Start() time.Time {
  1875. if as == nil {
  1876. log.Warningf("Allocation ETL: calling Start on nil AllocationSet")
  1877. return time.Unix(0, 0)
  1878. }
  1879. if as.Window.Start() == nil {
  1880. log.Warningf("Allocation ETL: AllocationSet with illegal window: Start is nil; len(as.allocations)=%d", len(as.allocations))
  1881. return time.Unix(0, 0)
  1882. }
  1883. return *as.Window.Start()
  1884. }
  1885. // String represents the given Allocation as a string
  1886. func (as *AllocationSet) String() string {
  1887. if as == nil {
  1888. return "<nil>"
  1889. }
  1890. return fmt.Sprintf("AllocationSet{length: %d; window: %s; totalCost: %.2f}",
  1891. as.Length(), as.Window, as.TotalCost())
  1892. }
  1893. // TotalCost returns the sum of all TotalCosts of the allocations contained
  1894. func (as *AllocationSet) TotalCost() float64 {
  1895. if as.IsEmpty() {
  1896. return 0.0
  1897. }
  1898. as.RLock()
  1899. defer as.RUnlock()
  1900. tc := 0.0
  1901. for _, a := range as.allocations {
  1902. tc += a.TotalCost()
  1903. }
  1904. return tc
  1905. }
  1906. // UTCOffset returns the AllocationSet's configured UTCOffset.
  1907. func (as *AllocationSet) UTCOffset() time.Duration {
  1908. _, zone := as.Start().Zone()
  1909. return time.Duration(zone) * time.Second
  1910. }
  1911. func (as *AllocationSet) accumulate(that *AllocationSet) (*AllocationSet, error) {
  1912. if as.IsEmpty() {
  1913. return that, nil
  1914. }
  1915. if that.IsEmpty() {
  1916. return as, nil
  1917. }
  1918. // Set start, end to min(start), max(end)
  1919. start := as.Start()
  1920. end := as.End()
  1921. if that.Start().Before(start) {
  1922. start = that.Start()
  1923. }
  1924. if that.End().After(end) {
  1925. end = that.End()
  1926. }
  1927. acc := NewAllocationSet(start, end)
  1928. as.RLock()
  1929. defer as.RUnlock()
  1930. that.RLock()
  1931. defer that.RUnlock()
  1932. for _, alloc := range as.allocations {
  1933. err := acc.insert(alloc)
  1934. if err != nil {
  1935. return nil, err
  1936. }
  1937. }
  1938. for _, alloc := range that.allocations {
  1939. err := acc.insert(alloc)
  1940. if err != nil {
  1941. return nil, err
  1942. }
  1943. }
  1944. return acc, nil
  1945. }
  1946. // AllocationSetRange is a thread-safe slice of AllocationSets. It is meant to
  1947. // be used such that the AllocationSets held are consecutive and coherent with
  1948. // respect to using the same aggregation properties, UTC offset, and
  1949. // resolution. However these rules are not necessarily enforced, so use wisely.
  1950. type AllocationSetRange struct {
  1951. sync.RWMutex
  1952. allocations []*AllocationSet
  1953. }
  1954. // NewAllocationSetRange instantiates a new range composed of the given
  1955. // AllocationSets in the order provided.
  1956. func NewAllocationSetRange(allocs ...*AllocationSet) *AllocationSetRange {
  1957. return &AllocationSetRange{
  1958. allocations: allocs,
  1959. }
  1960. }
  1961. // Accumulate sums each AllocationSet in the given range, returning a single cumulative
  1962. // AllocationSet for the entire range.
  1963. func (asr *AllocationSetRange) Accumulate() (*AllocationSet, error) {
  1964. var allocSet *AllocationSet
  1965. var err error
  1966. asr.RLock()
  1967. defer asr.RUnlock()
  1968. for _, as := range asr.allocations {
  1969. allocSet, err = allocSet.accumulate(as)
  1970. if err != nil {
  1971. return nil, err
  1972. }
  1973. }
  1974. return allocSet, nil
  1975. }
  1976. // TODO niko/etl accumulate into lower-resolution chunks of the given resolution
  1977. // func (asr *AllocationSetRange) AccumulateBy(resolution time.Duration) *AllocationSetRange
  1978. // AggregateBy aggregates each AllocationSet in the range by the given
  1979. // properties and options.
  1980. func (asr *AllocationSetRange) AggregateBy(aggregateBy []string, options *AllocationAggregationOptions) error {
  1981. aggRange := &AllocationSetRange{allocations: []*AllocationSet{}}
  1982. asr.Lock()
  1983. defer asr.Unlock()
  1984. for _, as := range asr.allocations {
  1985. err := as.AggregateBy(aggregateBy, options)
  1986. if err != nil {
  1987. return err
  1988. }
  1989. aggRange.allocations = append(aggRange.allocations, as)
  1990. }
  1991. asr.allocations = aggRange.allocations
  1992. return nil
  1993. }
  1994. // Append appends the given AllocationSet to the end of the range. It does not
  1995. // validate whether or not that violates window continuity.
  1996. func (asr *AllocationSetRange) Append(that *AllocationSet) {
  1997. asr.Lock()
  1998. defer asr.Unlock()
  1999. asr.allocations = append(asr.allocations, that)
  2000. }
  2001. // Each invokes the given function for each AllocationSet in the range
  2002. func (asr *AllocationSetRange) Each(f func(int, *AllocationSet)) {
  2003. if asr == nil {
  2004. return
  2005. }
  2006. for i, as := range asr.allocations {
  2007. f(i, as)
  2008. }
  2009. }
  2010. // Get retrieves the AllocationSet at the given index of the range.
  2011. func (asr *AllocationSetRange) Get(i int) (*AllocationSet, error) {
  2012. if i < 0 || i >= len(asr.allocations) {
  2013. return nil, fmt.Errorf("AllocationSetRange: index out of range: %d", i)
  2014. }
  2015. asr.RLock()
  2016. defer asr.RUnlock()
  2017. return asr.allocations[i], nil
  2018. }
  2019. // InsertRange merges the given AllocationSetRange into the receiving one by
  2020. // lining up sets with matching windows, then inserting each allocation from
  2021. // the given ASR into the respective set in the receiving ASR. If the given
  2022. // ASR contains an AllocationSet from a window that does not exist in the
  2023. // receiving ASR, then an error is returned. However, the given ASR does not
  2024. // need to cover the full range of the receiver.
  2025. func (asr *AllocationSetRange) InsertRange(that *AllocationSetRange) error {
  2026. if asr == nil {
  2027. return fmt.Errorf("cannot insert range into nil AllocationSetRange")
  2028. }
  2029. // keys maps window to index in asr
  2030. keys := map[string]int{}
  2031. asr.Each(func(i int, as *AllocationSet) {
  2032. if as == nil {
  2033. return
  2034. }
  2035. keys[as.Window.String()] = i
  2036. })
  2037. // Nothing to merge, so simply return
  2038. if len(keys) == 0 {
  2039. return nil
  2040. }
  2041. var err error
  2042. that.Each(func(j int, thatAS *AllocationSet) {
  2043. if thatAS == nil || err != nil {
  2044. return
  2045. }
  2046. // Find matching AllocationSet in asr
  2047. i, ok := keys[thatAS.Window.String()]
  2048. if !ok {
  2049. err = fmt.Errorf("cannot merge AllocationSet into window that does not exist: %s", thatAS.Window.String())
  2050. return
  2051. }
  2052. as, err := asr.Get(i)
  2053. if err != nil {
  2054. err = fmt.Errorf("AllocationSetRange index does not exist: %d", i)
  2055. return
  2056. }
  2057. // Insert each Allocation from the given set
  2058. thatAS.Each(func(k string, alloc *Allocation) {
  2059. err = as.Insert(alloc)
  2060. if err != nil {
  2061. err = fmt.Errorf("error inserting allocation: %s", err)
  2062. return
  2063. }
  2064. })
  2065. })
  2066. // err might be nil
  2067. return err
  2068. }
  2069. // Length returns the length of the range, which is zero if nil
  2070. func (asr *AllocationSetRange) Length() int {
  2071. if asr == nil || asr.allocations == nil {
  2072. return 0
  2073. }
  2074. asr.RLock()
  2075. defer asr.RUnlock()
  2076. return len(asr.allocations)
  2077. }
  2078. // MarshalJSON JSON-encodes the range
  2079. func (asr *AllocationSetRange) MarshalJSON() ([]byte, error) {
  2080. asr.RLock()
  2081. asr.RUnlock()
  2082. return json.Marshal(asr.allocations)
  2083. }
  2084. // Slice copies the underlying slice of AllocationSets, maintaining order,
  2085. // and returns the copied slice.
  2086. func (asr *AllocationSetRange) Slice() []*AllocationSet {
  2087. if asr == nil || asr.allocations == nil {
  2088. return nil
  2089. }
  2090. asr.RLock()
  2091. defer asr.RUnlock()
  2092. copy := []*AllocationSet{}
  2093. for _, as := range asr.allocations {
  2094. copy = append(copy, as.Clone())
  2095. }
  2096. return copy
  2097. }
  2098. // String represents the given AllocationSetRange as a string
  2099. func (asr *AllocationSetRange) String() string {
  2100. if asr == nil {
  2101. return "<nil>"
  2102. }
  2103. return fmt.Sprintf("AllocationSetRange{length: %d}", asr.Length())
  2104. }
  2105. // UTCOffset returns the detected UTCOffset of the AllocationSets within the
  2106. // range. Defaults to 0 if the range is nil or empty. Does not warn if there
  2107. // are sets with conflicting UTCOffsets (just returns the first).
  2108. func (asr *AllocationSetRange) UTCOffset() time.Duration {
  2109. if asr.Length() == 0 {
  2110. return 0
  2111. }
  2112. as, err := asr.Get(0)
  2113. if err != nil {
  2114. return 0
  2115. }
  2116. return as.UTCOffset()
  2117. }
  2118. // Window returns the full window that the AllocationSetRange spans, from the
  2119. // start of the first AllocationSet to the end of the last one.
  2120. func (asr *AllocationSetRange) Window() Window {
  2121. if asr == nil || asr.Length() == 0 {
  2122. return NewWindow(nil, nil)
  2123. }
  2124. start := asr.allocations[0].Start()
  2125. end := asr.allocations[asr.Length()-1].End()
  2126. return NewWindow(&start, &end)
  2127. }