allocation.go 72 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369
  1. package kubecost
  2. import (
  3. "bytes"
  4. "fmt"
  5. "sort"
  6. "strings"
  7. "sync"
  8. "time"
  9. "github.com/opencost/opencost/pkg/log"
  10. "github.com/opencost/opencost/pkg/util"
  11. "github.com/opencost/opencost/pkg/util/json"
  12. )
  13. // TODO Clean-up use of IsEmpty; nil checks should be separated for safety.
  14. // TODO Consider making Allocation an interface, which is fulfilled by structs
  15. // like KubernetesAllocation, IdleAllocation, and ExternalAllocation.
  16. // ExternalSuffix indicates an external allocation
  17. const ExternalSuffix = "__external__"
  18. // IdleSuffix indicates an idle allocation property
  19. const IdleSuffix = "__idle__"
  20. // SharedSuffix indicates an shared allocation property
  21. const SharedSuffix = "__shared__"
  22. // UnallocatedSuffix indicates an unallocated allocation property
  23. const UnallocatedSuffix = "__unallocated__"
  24. // UnmountedSuffix indicated allocation to an unmounted PV
  25. const UnmountedSuffix = "__unmounted__"
  26. // ShareWeighted indicates that a shared resource should be shared as a
  27. // proportion of the cost of the remaining allocations.
  28. const ShareWeighted = "__weighted__"
  29. // ShareEven indicates that a shared resource should be shared evenly across
  30. // all remaining allocations.
  31. const ShareEven = "__even__"
  32. // ShareNone indicates that a shareable resource should not be shared
  33. const ShareNone = "__none__"
  34. // Allocation is a unit of resource allocation and cost for a given window
  35. // of time and for a given kubernetes construct with its associated set of
  36. // properties.
  37. // TODO:CLEANUP consider dropping name in favor of just Allocation and an
  38. // Assets-style key() function for AllocationSet.
  39. type Allocation struct {
  40. Name string `json:"name"`
  41. Properties *AllocationProperties `json:"properties,omitempty"`
  42. Window Window `json:"window"`
  43. Start time.Time `json:"start"`
  44. End time.Time `json:"end"`
  45. CPUCoreHours float64 `json:"cpuCoreHours"`
  46. CPUCoreRequestAverage float64 `json:"cpuCoreRequestAverage"`
  47. CPUCoreUsageAverage float64 `json:"cpuCoreUsageAverage"`
  48. CPUCost float64 `json:"cpuCost"`
  49. CPUCostAdjustment float64 `json:"cpuCostAdjustment"`
  50. GPUHours float64 `json:"gpuHours"`
  51. GPUCost float64 `json:"gpuCost"`
  52. GPUCostAdjustment float64 `json:"gpuCostAdjustment"`
  53. NetworkTransferBytes float64 `json:"networkTransferBytes"`
  54. NetworkReceiveBytes float64 `json:"networkReceiveBytes"`
  55. NetworkCost float64 `json:"networkCost"`
  56. NetworkCostAdjustment float64 `json:"networkCostAdjustment"`
  57. LoadBalancerCost float64 `json:"loadBalancerCost"`
  58. LoadBalancerCostAdjustment float64 `json:"loadBalancerCostAdjustment"`
  59. PVs PVAllocations `json:"-"`
  60. PVCostAdjustment float64 `json:"pvCostAdjustment"`
  61. RAMByteHours float64 `json:"ramByteHours"`
  62. RAMBytesRequestAverage float64 `json:"ramByteRequestAverage"`
  63. RAMBytesUsageAverage float64 `json:"ramByteUsageAverage"`
  64. RAMCost float64 `json:"ramCost"`
  65. RAMCostAdjustment float64 `json:"ramCostAdjustment"`
  66. SharedCost float64 `json:"sharedCost"`
  67. ExternalCost float64 `json:"externalCost"`
  68. // RawAllocationOnly is a pointer so if it is not present it will be
  69. // marshalled as null rather than as an object with Go default values.
  70. RawAllocationOnly *RawAllocationOnlyData `json:"rawAllocationOnly"`
  71. }
  72. // RawAllocationOnlyData is information that only belong in "raw" Allocations,
  73. // those which have not undergone aggregation, accumulation, or any other form
  74. // of combination to produce a new Allocation from other Allocations.
  75. //
  76. // Max usage data belongs here because computing the overall maximum from two
  77. // or more Allocations is a non-trivial operation that cannot be defined without
  78. // maintaining a large amount of state. Consider the following example:
  79. // _______________________________________________
  80. //
  81. // A1 Using 3 CPU ---- ----- ------
  82. // A2 Using 2 CPU ---- ----- ----
  83. // A3 Using 1 CPU --- --
  84. // _______________________________________________
  85. // Time ---->
  86. //
  87. // The logical maximum CPU usage is 5, but this cannot be calculated iteratively,
  88. // which is how we calculate aggregations and accumulations of Allocations currently.
  89. // This becomes a problem I could call "maximum sum of overlapping intervals" and is
  90. // essentially a variant of an interval scheduling algorithm.
  91. //
  92. // If we had types to differentiate between regular Allocations and AggregatedAllocations
  93. // then this type would be unnecessary and its fields would go into the regular Allocation
  94. // and not in the AggregatedAllocation.
  95. type RawAllocationOnlyData struct {
  96. CPUCoreUsageMax float64 `json:"cpuCoreUsageMax"`
  97. RAMBytesUsageMax float64 `json:"ramByteUsageMax"`
  98. }
  99. // PVAllocations is a map of Disk Asset Identifiers to the
  100. // usage of them by an Allocation as recorded in a PVAllocation
  101. type PVAllocations map[PVKey]*PVAllocation
  102. // Clone creates a deep copy of a PVAllocations
  103. func (pv *PVAllocations) Clone() PVAllocations {
  104. if pv == nil || *pv == nil {
  105. return nil
  106. }
  107. apv := *pv
  108. clonePV := make(map[PVKey]*PVAllocation, len(apv))
  109. for k, v := range apv {
  110. clonePV[k] = &PVAllocation{
  111. ByteHours: v.ByteHours,
  112. Cost: v.Cost,
  113. }
  114. }
  115. return clonePV
  116. }
  117. // Add adds contents of that to the calling PVAllocations
  118. func (pv *PVAllocations) Add(that PVAllocations) PVAllocations {
  119. apv := pv.Clone()
  120. if that != nil {
  121. if apv == nil {
  122. apv = PVAllocations{}
  123. }
  124. for pvKey, thatPVAlloc := range that {
  125. apvAlloc, ok := apv[pvKey]
  126. if !ok {
  127. apvAlloc = &PVAllocation{}
  128. }
  129. apvAlloc.Cost += thatPVAlloc.Cost
  130. apvAlloc.ByteHours += thatPVAlloc.ByteHours
  131. apv[pvKey] = apvAlloc
  132. }
  133. }
  134. return apv
  135. }
  136. // PVKey for identifying Disk type assets
  137. type PVKey struct {
  138. Cluster string `json:"cluster"`
  139. Name string `json:"name"`
  140. }
  141. // PVAllocation contains the byte hour usage
  142. // and cost of an Allocation for a single PV
  143. type PVAllocation struct {
  144. ByteHours float64 `json:"byteHours"`
  145. Cost float64 `json:"cost"`
  146. }
  147. // AllocationMatchFunc is a function that can be used to match Allocations by
  148. // returning true for any given Allocation if a condition is met.
  149. type AllocationMatchFunc func(*Allocation) bool
  150. // Add returns the result of summing the two given Allocations, which sums the
  151. // summary fields (e.g. costs, resources) and recomputes efficiency. Neither of
  152. // the two original Allocations are mutated in the process.
  153. func (a *Allocation) Add(that *Allocation) (*Allocation, error) {
  154. if a == nil {
  155. return that.Clone(), nil
  156. }
  157. if that == nil {
  158. return a.Clone(), nil
  159. }
  160. // Note: no need to clone "that", as add only mutates the receiver
  161. agg := a.Clone()
  162. agg.add(that)
  163. return agg, nil
  164. }
  165. // Clone returns a deep copy of the given Allocation
  166. func (a *Allocation) Clone() *Allocation {
  167. if a == nil {
  168. return nil
  169. }
  170. return &Allocation{
  171. Name: a.Name,
  172. Properties: a.Properties.Clone(),
  173. Window: a.Window.Clone(),
  174. Start: a.Start,
  175. End: a.End,
  176. CPUCoreHours: a.CPUCoreHours,
  177. CPUCoreRequestAverage: a.CPUCoreRequestAverage,
  178. CPUCoreUsageAverage: a.CPUCoreUsageAverage,
  179. CPUCost: a.CPUCost,
  180. CPUCostAdjustment: a.CPUCostAdjustment,
  181. GPUHours: a.GPUHours,
  182. GPUCost: a.GPUCost,
  183. GPUCostAdjustment: a.GPUCostAdjustment,
  184. NetworkTransferBytes: a.NetworkTransferBytes,
  185. NetworkReceiveBytes: a.NetworkReceiveBytes,
  186. NetworkCost: a.NetworkCost,
  187. NetworkCostAdjustment: a.NetworkCostAdjustment,
  188. LoadBalancerCost: a.LoadBalancerCost,
  189. LoadBalancerCostAdjustment: a.LoadBalancerCostAdjustment,
  190. PVs: a.PVs.Clone(),
  191. PVCostAdjustment: a.PVCostAdjustment,
  192. RAMByteHours: a.RAMByteHours,
  193. RAMBytesRequestAverage: a.RAMBytesRequestAverage,
  194. RAMBytesUsageAverage: a.RAMBytesUsageAverage,
  195. RAMCost: a.RAMCost,
  196. RAMCostAdjustment: a.RAMCostAdjustment,
  197. SharedCost: a.SharedCost,
  198. ExternalCost: a.ExternalCost,
  199. RawAllocationOnly: a.RawAllocationOnly.Clone(),
  200. }
  201. }
  202. // Clone returns a deep copy of the given RawAllocationOnlyData
  203. func (r *RawAllocationOnlyData) Clone() *RawAllocationOnlyData {
  204. if r == nil {
  205. return nil
  206. }
  207. return &RawAllocationOnlyData{
  208. CPUCoreUsageMax: r.CPUCoreUsageMax,
  209. RAMBytesUsageMax: r.RAMBytesUsageMax,
  210. }
  211. }
  212. // Equal returns true if the values held in the given Allocation precisely
  213. // match those of the receiving Allocation. nil does not match nil. Floating
  214. // point values need to match according to util.IsApproximately, which accounts
  215. // for small, reasonable floating point error margins.
  216. func (a *Allocation) Equal(that *Allocation) bool {
  217. if a == nil || that == nil {
  218. return false
  219. }
  220. if a.Name != that.Name {
  221. return false
  222. }
  223. if !a.Properties.Equal(that.Properties) {
  224. return false
  225. }
  226. if !a.Window.Equal(that.Window) {
  227. return false
  228. }
  229. if !a.Start.Equal(that.Start) {
  230. return false
  231. }
  232. if !a.End.Equal(that.End) {
  233. return false
  234. }
  235. if !util.IsApproximately(a.CPUCoreHours, that.CPUCoreHours) {
  236. return false
  237. }
  238. if !util.IsApproximately(a.CPUCost, that.CPUCost) {
  239. return false
  240. }
  241. if !util.IsApproximately(a.CPUCostAdjustment, that.CPUCostAdjustment) {
  242. return false
  243. }
  244. if !util.IsApproximately(a.GPUHours, that.GPUHours) {
  245. return false
  246. }
  247. if !util.IsApproximately(a.GPUCost, that.GPUCost) {
  248. return false
  249. }
  250. if !util.IsApproximately(a.GPUCostAdjustment, that.GPUCostAdjustment) {
  251. return false
  252. }
  253. if !util.IsApproximately(a.NetworkTransferBytes, that.NetworkTransferBytes) {
  254. return false
  255. }
  256. if !util.IsApproximately(a.NetworkReceiveBytes, that.NetworkReceiveBytes) {
  257. return false
  258. }
  259. if !util.IsApproximately(a.NetworkCost, that.NetworkCost) {
  260. return false
  261. }
  262. if !util.IsApproximately(a.NetworkCostAdjustment, that.NetworkCostAdjustment) {
  263. return false
  264. }
  265. if !util.IsApproximately(a.LoadBalancerCost, that.LoadBalancerCost) {
  266. return false
  267. }
  268. if !util.IsApproximately(a.LoadBalancerCostAdjustment, that.LoadBalancerCostAdjustment) {
  269. return false
  270. }
  271. if !util.IsApproximately(a.PVCostAdjustment, that.PVCostAdjustment) {
  272. return false
  273. }
  274. if !util.IsApproximately(a.RAMByteHours, that.RAMByteHours) {
  275. return false
  276. }
  277. if !util.IsApproximately(a.RAMCost, that.RAMCost) {
  278. return false
  279. }
  280. if !util.IsApproximately(a.RAMCostAdjustment, that.RAMCostAdjustment) {
  281. return false
  282. }
  283. if !util.IsApproximately(a.SharedCost, that.SharedCost) {
  284. return false
  285. }
  286. if !util.IsApproximately(a.ExternalCost, that.ExternalCost) {
  287. return false
  288. }
  289. if a.RawAllocationOnly == nil && that.RawAllocationOnly != nil {
  290. return false
  291. }
  292. if a.RawAllocationOnly != nil && that.RawAllocationOnly == nil {
  293. return false
  294. }
  295. if a.RawAllocationOnly != nil && that.RawAllocationOnly != nil {
  296. if !util.IsApproximately(a.RawAllocationOnly.CPUCoreUsageMax, that.RawAllocationOnly.CPUCoreUsageMax) {
  297. return false
  298. }
  299. if !util.IsApproximately(a.RawAllocationOnly.RAMBytesUsageMax, that.RawAllocationOnly.RAMBytesUsageMax) {
  300. return false
  301. }
  302. }
  303. aPVs := a.PVs
  304. thatPVs := that.PVs
  305. if len(aPVs) == len(thatPVs) {
  306. for k, pv := range aPVs {
  307. tv, ok := thatPVs[k]
  308. if !ok || *tv != *pv {
  309. return false
  310. }
  311. }
  312. } else {
  313. return false
  314. }
  315. return true
  316. }
  317. // TotalCost is the total cost of the Allocation including adjustments
  318. func (a *Allocation) TotalCost() float64 {
  319. if a == nil {
  320. return 0.0
  321. }
  322. return a.CPUTotalCost() + a.GPUTotalCost() + a.RAMTotalCost() + a.PVTotalCost() + a.NetworkTotalCost() + a.LBTotalCost() + a.SharedTotalCost() + a.ExternalCost
  323. }
  324. // CPUTotalCost calculates total CPU cost of Allocation including adjustment
  325. func (a *Allocation) CPUTotalCost() float64 {
  326. if a == nil {
  327. return 0.0
  328. }
  329. return a.CPUCost + a.CPUCostAdjustment
  330. }
  331. // GPUTotalCost calculates total GPU cost of Allocation including adjustment
  332. func (a *Allocation) GPUTotalCost() float64 {
  333. if a == nil {
  334. return 0.0
  335. }
  336. return a.GPUCost + a.GPUCostAdjustment
  337. }
  338. // RAMTotalCost calculates total RAM cost of Allocation including adjustment
  339. func (a *Allocation) RAMTotalCost() float64 {
  340. if a == nil {
  341. return 0.0
  342. }
  343. return a.RAMCost + a.RAMCostAdjustment
  344. }
  345. // PVTotalCost calculates total PV cost of Allocation including adjustment
  346. func (a *Allocation) PVTotalCost() float64 {
  347. if a == nil {
  348. return 0.0
  349. }
  350. return a.PVCost() + a.PVCostAdjustment
  351. }
  352. // NetworkTotalCost calculates total Network cost of Allocation including adjustment
  353. func (a *Allocation) NetworkTotalCost() float64 {
  354. if a == nil {
  355. return 0.0
  356. }
  357. return a.NetworkCost + a.NetworkCostAdjustment
  358. }
  359. // LBTotalCost calculates total LB cost of Allocation including adjustment
  360. // TODO deprecate
  361. func (a *Allocation) LBTotalCost() float64 {
  362. return a.LoadBalancerTotalCost()
  363. }
  364. // LoadBalancerTotalCost calculates total LB cost of Allocation including adjustment
  365. func (a *Allocation) LoadBalancerTotalCost() float64 {
  366. if a == nil {
  367. return 0.0
  368. }
  369. return a.LoadBalancerCost + a.LoadBalancerCostAdjustment
  370. }
  371. // SharedTotalCost calculates total shared cost of Allocation including adjustment
  372. func (a *Allocation) SharedTotalCost() float64 {
  373. if a == nil {
  374. return 0.0
  375. }
  376. return a.SharedCost
  377. }
  378. // PVCost calculate cumulative cost of all PVs that Allocation is attached to
  379. func (a *Allocation) PVCost() float64 {
  380. if a == nil {
  381. return 0.0
  382. }
  383. cost := 0.0
  384. for _, pv := range a.PVs {
  385. cost += pv.Cost
  386. }
  387. return cost
  388. }
  389. // PVByteHours calculate cumulative ByteHours of all PVs that Allocation is attached to
  390. func (a *Allocation) PVByteHours() float64 {
  391. if a == nil {
  392. return 0.0
  393. }
  394. byteHours := 0.0
  395. for _, pv := range a.PVs {
  396. byteHours += pv.ByteHours
  397. }
  398. return byteHours
  399. }
  400. // CPUEfficiency is the ratio of usage to request. If there is no request and
  401. // no usage or cost, then efficiency is zero. If there is no request, but there
  402. // is usage or cost, then efficiency is 100%.
  403. func (a *Allocation) CPUEfficiency() float64 {
  404. if a == nil {
  405. return 0.0
  406. }
  407. if a.CPUCoreRequestAverage > 0 {
  408. return a.CPUCoreUsageAverage / a.CPUCoreRequestAverage
  409. }
  410. if a.CPUCoreUsageAverage == 0.0 || a.CPUCost == 0.0 {
  411. return 0.0
  412. }
  413. return 1.0
  414. }
  415. // RAMEfficiency is the ratio of usage to request. If there is no request and
  416. // no usage or cost, then efficiency is zero. If there is no request, but there
  417. // is usage or cost, then efficiency is 100%.
  418. func (a *Allocation) RAMEfficiency() float64 {
  419. if a == nil {
  420. return 0.0
  421. }
  422. if a.RAMBytesRequestAverage > 0 {
  423. return a.RAMBytesUsageAverage / a.RAMBytesRequestAverage
  424. }
  425. if a.RAMBytesUsageAverage == 0.0 || a.RAMCost == 0.0 {
  426. return 0.0
  427. }
  428. return 1.0
  429. }
  430. // TotalEfficiency is the cost-weighted average of CPU and RAM efficiency. If
  431. // there is no cost at all, then efficiency is zero.
  432. func (a *Allocation) TotalEfficiency() float64 {
  433. if a == nil {
  434. return 0.0
  435. }
  436. if a.RAMTotalCost()+a.CPUTotalCost() > 0 {
  437. ramCostEff := a.RAMEfficiency() * a.RAMTotalCost()
  438. cpuCostEff := a.CPUEfficiency() * a.CPUTotalCost()
  439. return (ramCostEff + cpuCostEff) / (a.CPUTotalCost() + a.RAMTotalCost())
  440. }
  441. return 0.0
  442. }
  443. // CPUCores converts the Allocation's CPUCoreHours into average CPUCores
  444. func (a *Allocation) CPUCores() float64 {
  445. if a.Minutes() <= 0.0 {
  446. return 0.0
  447. }
  448. return a.CPUCoreHours / (a.Minutes() / 60.0)
  449. }
  450. // RAMBytes converts the Allocation's RAMByteHours into average RAMBytes
  451. func (a *Allocation) RAMBytes() float64 {
  452. if a.Minutes() <= 0.0 {
  453. return 0.0
  454. }
  455. return a.RAMByteHours / (a.Minutes() / 60.0)
  456. }
  457. // GPUs converts the Allocation's GPUHours into average GPUs
  458. func (a *Allocation) GPUs() float64 {
  459. if a.Minutes() <= 0.0 {
  460. return 0.0
  461. }
  462. return a.GPUHours / (a.Minutes() / 60.0)
  463. }
  464. // PVBytes converts the Allocation's PVByteHours into average PVBytes
  465. func (a *Allocation) PVBytes() float64 {
  466. if a.Minutes() <= 0.0 {
  467. return 0.0
  468. }
  469. return a.PVByteHours() / (a.Minutes() / 60.0)
  470. }
  471. // ResetAdjustments sets all cost adjustment fields to zero
  472. func (a *Allocation) ResetAdjustments() {
  473. if a == nil {
  474. return
  475. }
  476. a.CPUCostAdjustment = 0.0
  477. a.GPUCostAdjustment = 0.0
  478. a.RAMCostAdjustment = 0.0
  479. a.PVCostAdjustment = 0.0
  480. a.NetworkCostAdjustment = 0.0
  481. a.LoadBalancerCostAdjustment = 0.0
  482. }
  483. // MarshalJSON implements json.Marshaler interface
  484. func (a *Allocation) MarshalJSON() ([]byte, error) {
  485. buffer := bytes.NewBufferString("{")
  486. jsonEncodeString(buffer, "name", a.Name, ",")
  487. jsonEncode(buffer, "properties", a.Properties, ",")
  488. jsonEncode(buffer, "window", a.Window, ",")
  489. jsonEncodeString(buffer, "start", a.Start.Format(time.RFC3339), ",")
  490. jsonEncodeString(buffer, "end", a.End.Format(time.RFC3339), ",")
  491. jsonEncodeFloat64(buffer, "minutes", a.Minutes(), ",")
  492. jsonEncodeFloat64(buffer, "cpuCores", a.CPUCores(), ",")
  493. jsonEncodeFloat64(buffer, "cpuCoreRequestAverage", a.CPUCoreRequestAverage, ",")
  494. jsonEncodeFloat64(buffer, "cpuCoreUsageAverage", a.CPUCoreUsageAverage, ",")
  495. jsonEncodeFloat64(buffer, "cpuCoreHours", a.CPUCoreHours, ",")
  496. jsonEncodeFloat64(buffer, "cpuCost", a.CPUCost, ",")
  497. jsonEncodeFloat64(buffer, "cpuCostAdjustment", a.CPUCostAdjustment, ",")
  498. jsonEncodeFloat64(buffer, "cpuEfficiency", a.CPUEfficiency(), ",")
  499. jsonEncodeFloat64(buffer, "gpuCount", a.GPUs(), ",")
  500. jsonEncodeFloat64(buffer, "gpuHours", a.GPUHours, ",")
  501. jsonEncodeFloat64(buffer, "gpuCost", a.GPUCost, ",")
  502. jsonEncodeFloat64(buffer, "gpuCostAdjustment", a.GPUCostAdjustment, ",")
  503. jsonEncodeFloat64(buffer, "networkTransferBytes", a.NetworkTransferBytes, ",")
  504. jsonEncodeFloat64(buffer, "networkReceiveBytes", a.NetworkReceiveBytes, ",")
  505. jsonEncodeFloat64(buffer, "networkCost", a.NetworkCost, ",")
  506. jsonEncodeFloat64(buffer, "networkCostAdjustment", a.NetworkCostAdjustment, ",")
  507. jsonEncodeFloat64(buffer, "loadBalancerCost", a.LoadBalancerCost, ",")
  508. jsonEncodeFloat64(buffer, "loadBalancerCostAdjustment", a.LoadBalancerCostAdjustment, ",")
  509. jsonEncodeFloat64(buffer, "pvBytes", a.PVBytes(), ",")
  510. jsonEncodeFloat64(buffer, "pvByteHours", a.PVByteHours(), ",")
  511. jsonEncodeFloat64(buffer, "pvCost", a.PVCost(), ",")
  512. jsonEncode(buffer, "pvs", a.PVs, ",") // Todo Sean: this does not work properly
  513. jsonEncodeFloat64(buffer, "pvCostAdjustment", a.PVCostAdjustment, ",")
  514. jsonEncodeFloat64(buffer, "ramBytes", a.RAMBytes(), ",")
  515. jsonEncodeFloat64(buffer, "ramByteRequestAverage", a.RAMBytesRequestAverage, ",")
  516. jsonEncodeFloat64(buffer, "ramByteUsageAverage", a.RAMBytesUsageAverage, ",")
  517. jsonEncodeFloat64(buffer, "ramByteHours", a.RAMByteHours, ",")
  518. jsonEncodeFloat64(buffer, "ramCost", a.RAMCost, ",")
  519. jsonEncodeFloat64(buffer, "ramCostAdjustment", a.RAMCostAdjustment, ",")
  520. jsonEncodeFloat64(buffer, "ramEfficiency", a.RAMEfficiency(), ",")
  521. jsonEncodeFloat64(buffer, "sharedCost", a.SharedCost, ",")
  522. jsonEncodeFloat64(buffer, "externalCost", a.ExternalCost, ",")
  523. jsonEncodeFloat64(buffer, "totalCost", a.TotalCost(), ",")
  524. jsonEncodeFloat64(buffer, "totalEfficiency", a.TotalEfficiency(), ",")
  525. jsonEncode(buffer, "rawAllocationOnly", a.RawAllocationOnly, "")
  526. buffer.WriteString("}")
  527. return buffer.Bytes(), nil
  528. }
  529. // Resolution returns the duration of time covered by the Allocation
  530. func (a *Allocation) Resolution() time.Duration {
  531. return a.End.Sub(a.Start)
  532. }
  533. // IsAggregated is true if the given Allocation has been aggregated, which we
  534. // define by a lack of AllocationProperties.
  535. func (a *Allocation) IsAggregated() bool {
  536. return a == nil || a.Properties == nil
  537. }
  538. // IsExternal is true if the given Allocation represents external costs.
  539. func (a *Allocation) IsExternal() bool {
  540. if a == nil {
  541. return false
  542. }
  543. return strings.Contains(a.Name, ExternalSuffix)
  544. }
  545. // IsIdle is true if the given Allocation represents idle costs.
  546. func (a *Allocation) IsIdle() bool {
  547. if a == nil {
  548. return false
  549. }
  550. return strings.Contains(a.Name, IdleSuffix)
  551. }
  552. // IsUnallocated is true if the given Allocation represents unallocated costs.
  553. func (a *Allocation) IsUnallocated() bool {
  554. if a == nil {
  555. return false
  556. }
  557. return strings.Contains(a.Name, UnallocatedSuffix)
  558. }
  559. // IsUnmounted is true if the given Allocation represents unmounted volume costs.
  560. func (a *Allocation) IsUnmounted() bool {
  561. if a == nil {
  562. return false
  563. }
  564. return strings.Contains(a.Name, UnmountedSuffix)
  565. }
  566. // Minutes returns the number of minutes the Allocation represents, as defined
  567. // by the difference between the end and start times.
  568. func (a *Allocation) Minutes() float64 {
  569. if a == nil {
  570. return 0.0
  571. }
  572. return a.End.Sub(a.Start).Minutes()
  573. }
  574. // Share adds the TotalCost of the given Allocation to the SharedCost of the
  575. // receiving Allocation. No Start, End, Window, or AllocationProperties are considered.
  576. // Neither Allocation is mutated; a new Allocation is always returned.
  577. func (a *Allocation) Share(that *Allocation) (*Allocation, error) {
  578. if that == nil {
  579. return a.Clone(), nil
  580. }
  581. if a == nil {
  582. return nil, fmt.Errorf("cannot share with nil Allocation")
  583. }
  584. agg := a.Clone()
  585. agg.SharedCost += that.TotalCost()
  586. return agg, nil
  587. }
  588. // String represents the given Allocation as a string
  589. func (a *Allocation) String() string {
  590. if a == nil {
  591. return "<nil>"
  592. }
  593. return fmt.Sprintf("%s%s=%.2f", a.Name, NewWindow(&a.Start, &a.End), a.TotalCost())
  594. }
  595. func (a *Allocation) add(that *Allocation) {
  596. if a == nil {
  597. log.Warnf("Allocation.AggregateBy: trying to add a nil receiver")
  598. return
  599. }
  600. // Generate keys for each allocation to allow for special logic to set the controller
  601. // in the case of keys matching but controllers not matching.
  602. aggByForKey := []string{"cluster", "node", "namespace", "pod", "container"}
  603. leftKey := a.generateKey(aggByForKey, nil)
  604. rightKey := that.generateKey(aggByForKey, nil)
  605. leftProperties := a.Properties
  606. rightProperties := that.Properties
  607. // Preserve string properties that are matching between the two allocations
  608. a.Properties = a.Properties.Intersection(that.Properties)
  609. // Overwrite regular intersection logic for the controller name property in the
  610. // case that the Allocation keys are the same but the controllers are not.
  611. if leftKey == rightKey &&
  612. leftProperties != nil &&
  613. rightProperties != nil &&
  614. leftProperties.Controller != rightProperties.Controller {
  615. if leftProperties.Controller == "" {
  616. a.Properties.Controller = rightProperties.Controller
  617. } else if rightProperties.Controller == "" {
  618. a.Properties.Controller = leftProperties.Controller
  619. } else {
  620. controllers := []string{
  621. leftProperties.Controller,
  622. rightProperties.Controller,
  623. }
  624. sort.Strings(controllers)
  625. a.Properties.Controller = controllers[0]
  626. }
  627. }
  628. // Expand the window to encompass both Allocations
  629. a.Window = a.Window.Expand(that.Window)
  630. // Sum non-cumulative fields by turning them into cumulative, adding them,
  631. // and then converting them back into averages after minutes have been
  632. // combined (just below).
  633. cpuReqCoreMins := a.CPUCoreRequestAverage * a.Minutes()
  634. cpuReqCoreMins += that.CPUCoreRequestAverage * that.Minutes()
  635. cpuUseCoreMins := a.CPUCoreUsageAverage * a.Minutes()
  636. cpuUseCoreMins += that.CPUCoreUsageAverage * that.Minutes()
  637. ramReqByteMins := a.RAMBytesRequestAverage * a.Minutes()
  638. ramReqByteMins += that.RAMBytesRequestAverage * that.Minutes()
  639. ramUseByteMins := a.RAMBytesUsageAverage * a.Minutes()
  640. ramUseByteMins += that.RAMBytesUsageAverage * that.Minutes()
  641. // Expand Start and End to be the "max" of among the given Allocations
  642. if that.Start.Before(a.Start) {
  643. a.Start = that.Start
  644. }
  645. if that.End.After(a.End) {
  646. a.End = that.End
  647. }
  648. // Convert cumulative request and usage back into rates
  649. // TODO:TEST write a unit test that fails if this is done incorrectly
  650. if a.Minutes() > 0 {
  651. a.CPUCoreRequestAverage = cpuReqCoreMins / a.Minutes()
  652. a.CPUCoreUsageAverage = cpuUseCoreMins / a.Minutes()
  653. a.RAMBytesRequestAverage = ramReqByteMins / a.Minutes()
  654. a.RAMBytesUsageAverage = ramUseByteMins / a.Minutes()
  655. } else {
  656. a.CPUCoreRequestAverage = 0.0
  657. a.CPUCoreUsageAverage = 0.0
  658. a.RAMBytesRequestAverage = 0.0
  659. a.RAMBytesUsageAverage = 0.0
  660. }
  661. // Sum all cumulative resource fields
  662. a.CPUCoreHours += that.CPUCoreHours
  663. a.GPUHours += that.GPUHours
  664. a.RAMByteHours += that.RAMByteHours
  665. a.NetworkTransferBytes += that.NetworkTransferBytes
  666. a.NetworkReceiveBytes += that.NetworkReceiveBytes
  667. // Sum all cumulative cost fields
  668. a.CPUCost += that.CPUCost
  669. a.GPUCost += that.GPUCost
  670. a.RAMCost += that.RAMCost
  671. a.NetworkCost += that.NetworkCost
  672. a.LoadBalancerCost += that.LoadBalancerCost
  673. a.SharedCost += that.SharedCost
  674. a.ExternalCost += that.ExternalCost
  675. // Sum PVAllocations
  676. a.PVs = a.PVs.Add(that.PVs)
  677. // Sum all cumulative adjustment fields
  678. a.CPUCostAdjustment += that.CPUCostAdjustment
  679. a.RAMCostAdjustment += that.RAMCostAdjustment
  680. a.GPUCostAdjustment += that.GPUCostAdjustment
  681. a.PVCostAdjustment += that.PVCostAdjustment
  682. a.NetworkCostAdjustment += that.NetworkCostAdjustment
  683. a.LoadBalancerCostAdjustment += that.LoadBalancerCostAdjustment
  684. // Any data that is in a "raw allocation only" is not valid in any
  685. // sort of cumulative Allocation (like one that is added).
  686. a.RawAllocationOnly = nil
  687. }
  688. // AllocationSet stores a set of Allocations, each with a unique name, that share
  689. // a window. An AllocationSet is mutable, so treat it like a threadsafe map.
  690. type AllocationSet struct {
  691. sync.RWMutex
  692. allocations map[string]*Allocation
  693. externalKeys map[string]bool
  694. idleKeys map[string]bool
  695. FromSource string // stores the name of the source used to compute the data
  696. Window Window
  697. Warnings []string
  698. Errors []string
  699. }
  700. // NewAllocationSet instantiates a new AllocationSet and, optionally, inserts
  701. // the given list of Allocations
  702. func NewAllocationSet(start, end time.Time, allocs ...*Allocation) *AllocationSet {
  703. as := &AllocationSet{
  704. allocations: map[string]*Allocation{},
  705. externalKeys: map[string]bool{},
  706. idleKeys: map[string]bool{},
  707. Window: NewWindow(&start, &end),
  708. }
  709. for _, a := range allocs {
  710. as.Insert(a)
  711. }
  712. return as
  713. }
  714. // AllocationAggregationOptions provide advanced functionality to AggregateBy, including
  715. // filtering results and sharing allocations. FilterFuncs are a list of match
  716. // functions such that, if any function fails, the allocation is ignored.
  717. // ShareFuncs are a list of match functions such that, if any function
  718. // succeeds, the allocation is marked as a shared resource. ShareIdle is a
  719. // simple flag for sharing idle resources.
  720. type AllocationAggregationOptions struct {
  721. AllocationTotalsStore AllocationTotalsStore
  722. FilterFuncs []AllocationMatchFunc
  723. IdleByNode bool
  724. LabelConfig *LabelConfig
  725. MergeUnallocated bool
  726. Reconcile bool
  727. ReconcileNetwork bool
  728. ShareFuncs []AllocationMatchFunc
  729. ShareIdle string
  730. ShareSplit string
  731. SharedHourlyCosts map[string]float64
  732. SplitIdle bool
  733. }
  734. // AggregateBy aggregates the Allocations in the given AllocationSet by the given
  735. // AllocationProperty. This will only be legal if the AllocationSet is divisible by the
  736. // given AllocationProperty; e.g. Containers can be divided by Namespace, but not vice-a-versa.
  737. func (as *AllocationSet) AggregateBy(aggregateBy []string, options *AllocationAggregationOptions) error {
  738. // The order of operations for aggregating allocations is as follows:
  739. //
  740. // 1. Partition external, idle, and shared allocations into separate sets.
  741. // Also, create the aggSet into which the results will be aggregated.
  742. //
  743. // 2. Compute sharing coefficients for idle and shared resources
  744. // a) if idle allocation is to be shared, compute idle coefficients
  745. // b) if idle allocation is NOT shared, but filters are present, compute
  746. // idle filtration coefficients for the purpose of only returning the
  747. // portion of idle allocation that would have been shared with the
  748. // unfiltered results. (See unit tests 5.a,b,c)
  749. // c) generate shared allocation for them given shared overhead, which
  750. // must happen after (2a) and (2b)
  751. // d) if there are shared resources, compute share coefficients
  752. //
  753. // 3. Drop any allocation that fails any of the filters
  754. //
  755. // 4. Distribute idle allocations according to the idle coefficients
  756. //
  757. // 5. Generate aggregation key and insert allocation into the output set
  758. //
  759. // 6. If idle is shared and resources are shared, some idle might be shared
  760. // with a shared resource. Distribute that to the shared resources
  761. // prior to sharing them with the aggregated results.
  762. //
  763. // 7. Apply idle filtration coefficients from step (2b)
  764. //
  765. // 8. Distribute shared allocations according to the share coefficients.
  766. //
  767. // 9. If there are external allocations that can be aggregated into
  768. // the output (i.e. they can be used to generate a valid key for
  769. // the given properties) then aggregate; otherwise... ignore them?
  770. //
  771. // 10. Distribute any undistributed idle, in the case that idle
  772. // coefficients end up being zero and some idle is not shared.
  773. //
  774. // 11. If the merge idle option is enabled, merge any remaining idle
  775. // allocations into a single idle allocation. If there was any idle
  776. // whose costs were not distributed because there was no usage of a
  777. // specific resource type, re-add the idle to the aggregation with
  778. // only that type.
  779. if as.IsEmpty() {
  780. return nil
  781. }
  782. if options == nil {
  783. options = &AllocationAggregationOptions{}
  784. }
  785. if options.LabelConfig == nil {
  786. options.LabelConfig = NewLabelConfig()
  787. }
  788. // idleFiltrationCoefficients relies on this being explicitly set
  789. if options.ShareIdle != ShareWeighted {
  790. options.ShareIdle = ShareNone
  791. }
  792. var allocatedTotalsMap map[string]map[string]float64
  793. // If aggregateBy is nil, we don't aggregate anything. On the other hand,
  794. // an empty slice implies that we should aggregate everything. See
  795. // generateKey for why that makes sense.
  796. shouldAggregate := aggregateBy != nil
  797. shouldFilter := len(options.FilterFuncs) > 0
  798. shouldShare := len(options.SharedHourlyCosts) > 0 || len(options.ShareFuncs) > 0
  799. if !shouldAggregate && !shouldFilter && !shouldShare && options.ShareIdle == ShareNone {
  800. // There is nothing for AggregateBy to do, so simply return nil
  801. return nil
  802. }
  803. // aggSet will collect the aggregated allocations
  804. aggSet := &AllocationSet{
  805. Window: as.Window.Clone(),
  806. }
  807. // externalSet will collect external allocations
  808. externalSet := &AllocationSet{
  809. Window: as.Window.Clone(),
  810. }
  811. // idleSet will be shared among aggSet after initial aggregation
  812. // is complete
  813. idleSet := &AllocationSet{
  814. Window: as.Window.Clone(),
  815. }
  816. // shareSet will be shared among aggSet after initial aggregation
  817. // is complete
  818. shareSet := &AllocationSet{
  819. Window: as.Window.Clone(),
  820. }
  821. as.Lock()
  822. defer as.Unlock()
  823. // (1) Loop and find all of the external, idle, and shared allocations. Add
  824. // them to their respective sets, removing them from the set of allocations
  825. // to aggregate.
  826. for _, alloc := range as.allocations {
  827. // External allocations get aggregated post-hoc (see step 6) and do
  828. // not necessarily contain complete sets of properties, so they are
  829. // moved to a separate AllocationSet.
  830. if alloc.IsExternal() {
  831. delete(as.externalKeys, alloc.Name)
  832. delete(as.allocations, alloc.Name)
  833. externalSet.Insert(alloc)
  834. continue
  835. }
  836. // Idle allocations should be separated into idleSet if they are to be
  837. // shared later on. If they are not to be shared, then add them to the
  838. // aggSet like any other allocation.
  839. if alloc.IsIdle() {
  840. delete(as.idleKeys, alloc.Name)
  841. delete(as.allocations, alloc.Name)
  842. if options.ShareIdle == ShareEven || options.ShareIdle == ShareWeighted {
  843. idleSet.Insert(alloc)
  844. } else {
  845. aggSet.Insert(alloc)
  846. }
  847. continue
  848. }
  849. // Shared allocations must be identified and separated prior to
  850. // aggregation and filtering. That is, if any of the ShareFuncs return
  851. // true for the allocation, then move it to shareSet.
  852. for _, sf := range options.ShareFuncs {
  853. if sf(alloc) {
  854. delete(as.idleKeys, alloc.Name)
  855. delete(as.allocations, alloc.Name)
  856. shareSet.Insert(alloc)
  857. break
  858. }
  859. }
  860. }
  861. // It's possible that no more un-shared, non-idle, non-external allocations
  862. // remain at this point. This always results in an emptySet, so return early.
  863. if len(as.allocations) == 0 {
  864. emptySet := &AllocationSet{
  865. Window: as.Window.Clone(),
  866. }
  867. as.allocations = emptySet.allocations
  868. return nil
  869. }
  870. // (2) In order to correctly share idle and shared costs, we first compute
  871. // sharing coefficients, which represent the proportion of each cost to
  872. // share with each allocation. Idle allocations are shared per-cluster or per-node,
  873. // per-allocation, and per-resource, while shared resources are shared per-
  874. // allocation only.
  875. //
  876. // For an idleCoefficient example, the entries:
  877. // [cluster1][cluster1/namespace1/pod1/container1][cpu] = 0.166667
  878. // [cluster1][cluster1/namespace1/pod1/container1][gpu] = 0.166667
  879. // [cluster1][cluster1/namespace1/pod1/container1][ram] = 0.687500
  880. // mean that the allocation "cluster1/namespace1/pod1/container1" will
  881. // receive 16.67% of cluster1's idle CPU and GPU costs and 68.75% of its
  882. // RAM costs.
  883. //
  884. // For a shareCoefficient example, the entries:
  885. // [namespace2] = 0.666667
  886. // [__filtered__] = 0.333333
  887. // mean that the post-aggregation allocation "namespace2" will receive
  888. // 66.67% of the shared resource costs, while the remaining 33.33% will
  889. // be filtered out, as they were shared with allocations that did not pass
  890. // one of the given filters.
  891. //
  892. // In order to maintain stable results when multiple operations are being
  893. // carried out (e.g. sharing idle, sharing resources, and filtering) these
  894. // coefficients are computed for the full set of allocations prior to
  895. // adding shared overhead and prior to applying filters.
  896. var err error
  897. // (2a) If there are idle costs to be shared, compute the coefficients for
  898. // sharing them among the non-idle, non-aggregated allocations (including
  899. // the shared allocations).
  900. var idleCoefficients map[string]map[string]map[string]float64
  901. if idleSet.Length() > 0 && options.ShareIdle != ShareNone {
  902. idleCoefficients, allocatedTotalsMap, err = computeIdleCoeffs(options, as, shareSet)
  903. if err != nil {
  904. log.Warnf("AllocationSet.AggregateBy: compute idle coeff: %s", err)
  905. return fmt.Errorf("error computing idle coefficients: %s", err)
  906. }
  907. }
  908. // (2b) If idle costs are not to be shared, but there are filters, then we
  909. // need to track the amount of each idle allocation to "filter" in order to
  910. // maintain parity with the results when idle is shared. That is, we want
  911. // to return only the idle costs that would have been shared with the given
  912. // results, even if the filter had not been applied.
  913. //
  914. // For example, consider these results from aggregating by namespace with
  915. // two clusters:
  916. //
  917. // namespace1: 25.00
  918. // namespace2: 30.00
  919. // namespace3: 15.00
  920. // idle: 30.00
  921. //
  922. // When we then filter by cluster==cluster1, namespaces 2 and 3 are
  923. // reduced by the amount that existed on cluster2. Then, idle must also be
  924. // reduced by the relevant amount:
  925. //
  926. // namespace1: 25.00
  927. // namespace2: 15.00
  928. // idle: 20.00
  929. //
  930. // Note that this can happen for any field, not just cluster, so we again
  931. // need to track this on a per-cluster or per-node, per-allocation, per-resource basis.
  932. var idleFiltrationCoefficients map[string]map[string]map[string]float64
  933. if len(options.FilterFuncs) > 0 && options.ShareIdle == ShareNone {
  934. idleFiltrationCoefficients, _, err = computeIdleCoeffs(options, as, shareSet)
  935. if err != nil {
  936. return fmt.Errorf("error computing idle filtration coefficients: %s", err)
  937. }
  938. }
  939. // (2c) Convert SharedHourlyCosts to Allocations in the shareSet. This must
  940. // come after idle coefficients are computed so that allocations generated
  941. // by shared overhead do not skew the idle coefficient computation.
  942. for name, cost := range options.SharedHourlyCosts {
  943. if cost > 0.0 {
  944. hours := as.Resolution().Hours()
  945. // If set ends in the future, adjust hours accordingly
  946. diff := time.Since(as.End())
  947. if diff < 0.0 {
  948. hours += diff.Hours()
  949. }
  950. totalSharedCost := cost * hours
  951. shareSet.Insert(&Allocation{
  952. Name: fmt.Sprintf("%s/%s", name, SharedSuffix),
  953. Start: as.Start(),
  954. End: as.End(),
  955. SharedCost: totalSharedCost,
  956. Properties: &AllocationProperties{Cluster: SharedSuffix}, // The allocation needs to belong to a cluster,but it really doesn't matter which one, so just make it clear.
  957. })
  958. }
  959. }
  960. // (2d) Compute share coefficients for shared resources. These are computed
  961. // after idle coefficients, and are computed for the aggregated allocations
  962. // of the main allocation set. See above for details and an example.
  963. var shareCoefficients map[string]float64
  964. if shareSet.Length() > 0 {
  965. shareCoefficients, err = computeShareCoeffs(aggregateBy, options, as)
  966. if err != nil {
  967. return fmt.Errorf("error computing share coefficients: %s", err)
  968. }
  969. }
  970. // (3-5) Filter, distribute idle cost, and aggregate (in that order)
  971. for _, alloc := range as.allocations {
  972. idleId, err := alloc.getIdleId(options)
  973. if err != nil {
  974. log.DedupedWarningf(3, "AllocationSet.AggregateBy: missing idleId for allocation: %s", alloc.Name)
  975. }
  976. skip := false
  977. // (3) If any of the filter funcs fail, immediately skip the allocation.
  978. for _, ff := range options.FilterFuncs {
  979. if !ff(alloc) {
  980. skip = true
  981. break
  982. }
  983. }
  984. if skip {
  985. // If we are tracking idle filtration coefficients, delete the
  986. // entry corresponding to the filtered allocation. (Deleting the
  987. // entry will result in that proportional amount being removed
  988. // from the idle allocation at the end of the process.)
  989. if idleFiltrationCoefficients != nil {
  990. if ifcc, ok := idleFiltrationCoefficients[idleId]; ok {
  991. delete(ifcc, alloc.Name)
  992. }
  993. }
  994. continue
  995. }
  996. // (4) Distribute idle allocations according to the idle coefficients
  997. // NOTE: if idle allocation is off (i.e. ShareIdle == ShareNone) then
  998. // all idle allocations will be in the aggSet at this point, so idleSet
  999. // will be empty and we won't enter this block.
  1000. if idleSet.Length() > 0 {
  1001. // Distribute idle allocations by coefficient per-idleId, per-allocation
  1002. for _, idleAlloc := range idleSet.allocations {
  1003. // Only share idle if the idleId matches; i.e. the allocation
  1004. // is from the same idleId as the idle costs
  1005. iaidleId, err := idleAlloc.getIdleId(options)
  1006. if err != nil {
  1007. log.Errorf("AllocationSet.AggregateBy: Idle allocation is missing idleId %s", idleAlloc.Name)
  1008. return err
  1009. }
  1010. if iaidleId != idleId {
  1011. continue
  1012. }
  1013. // Make sure idle coefficients exist
  1014. if _, ok := idleCoefficients[idleId]; !ok {
  1015. log.Warnf("AllocationSet.AggregateBy: error getting idle coefficient: no idleId '%s' for '%s'", idleId, alloc.Name)
  1016. continue
  1017. }
  1018. if _, ok := idleCoefficients[idleId][alloc.Name]; !ok {
  1019. log.Warnf("AllocationSet.AggregateBy: error getting idle coefficient for '%s'", alloc.Name)
  1020. continue
  1021. }
  1022. alloc.CPUCoreHours += idleAlloc.CPUCoreHours * idleCoefficients[idleId][alloc.Name]["cpu"]
  1023. alloc.GPUHours += idleAlloc.GPUHours * idleCoefficients[idleId][alloc.Name]["gpu"]
  1024. alloc.RAMByteHours += idleAlloc.RAMByteHours * idleCoefficients[idleId][alloc.Name]["ram"]
  1025. idleCPUCost := idleAlloc.CPUCost * idleCoefficients[idleId][alloc.Name]["cpu"]
  1026. idleGPUCost := idleAlloc.GPUCost * idleCoefficients[idleId][alloc.Name]["gpu"]
  1027. idleRAMCost := idleAlloc.RAMCost * idleCoefficients[idleId][alloc.Name]["ram"]
  1028. alloc.CPUCost += idleCPUCost
  1029. alloc.GPUCost += idleGPUCost
  1030. alloc.RAMCost += idleRAMCost
  1031. }
  1032. }
  1033. // (5) generate key to use for aggregation-by-key and allocation name
  1034. key := alloc.generateKey(aggregateBy, options.LabelConfig)
  1035. alloc.Name = key
  1036. if options.MergeUnallocated && alloc.IsUnallocated() {
  1037. alloc.Name = UnallocatedSuffix
  1038. }
  1039. // Inserting the allocation with the generated key for a name will
  1040. // perform the actual basic aggregation step.
  1041. aggSet.Insert(alloc)
  1042. }
  1043. // (6) If idle is shared and resources are shared, it's possible that some
  1044. // amount of idle cost will be shared with a shared resource. Distribute
  1045. // that idle allocation, if it exists, to the respective shared allocations
  1046. // before sharing with the aggregated allocations.
  1047. if idleSet.Length() > 0 && shareSet.Length() > 0 {
  1048. for _, alloc := range shareSet.allocations {
  1049. idleId, err := alloc.getIdleId(options)
  1050. if err != nil {
  1051. log.DedupedWarningf(3, "AllocationSet.AggregateBy: missing idleId for allocation: %s", alloc.Name)
  1052. }
  1053. // Distribute idle allocations by coefficient per-idleId, per-allocation
  1054. for _, idleAlloc := range idleSet.allocations {
  1055. // Only share idle if the idleId matches; i.e. the allocation
  1056. // is from the same idleId as the idle costs
  1057. iaidleId, _ := idleAlloc.getIdleId(options)
  1058. if iaidleId != idleId {
  1059. continue
  1060. }
  1061. // Make sure idle coefficients exist
  1062. if _, ok := idleCoefficients[idleId]; !ok {
  1063. log.Warnf("AllocationSet.AggregateBy: error getting idle coefficient: no idleId '%s' for '%s'", idleId, alloc.Name)
  1064. continue
  1065. }
  1066. if _, ok := idleCoefficients[idleId][alloc.Name]; !ok {
  1067. log.Warnf("AllocationSet.AggregateBy: error getting idle coefficient for '%s'", alloc.Name)
  1068. continue
  1069. }
  1070. alloc.CPUCoreHours += idleAlloc.CPUCoreHours * idleCoefficients[idleId][alloc.Name]["cpu"]
  1071. alloc.GPUHours += idleAlloc.GPUHours * idleCoefficients[idleId][alloc.Name]["gpu"]
  1072. alloc.RAMByteHours += idleAlloc.RAMByteHours * idleCoefficients[idleId][alloc.Name]["ram"]
  1073. idleCPUCost := idleAlloc.CPUCost * idleCoefficients[idleId][alloc.Name]["cpu"]
  1074. idleGPUCost := idleAlloc.GPUCost * idleCoefficients[idleId][alloc.Name]["gpu"]
  1075. idleRAMCost := idleAlloc.RAMCost * idleCoefficients[idleId][alloc.Name]["ram"]
  1076. alloc.CPUCost += idleCPUCost
  1077. alloc.GPUCost += idleGPUCost
  1078. alloc.RAMCost += idleRAMCost
  1079. }
  1080. }
  1081. }
  1082. // groupingIdleFiltrationCoeffs is used to track per-resource idle
  1083. // coefficients on a cluster-by-cluster or node-by-node basis depending
  1084. // on the IdleByNode option. It is, essentailly, an aggregation of
  1085. // idleFiltrationCoefficients after they have been
  1086. // filtered above (in step 3)
  1087. var groupingIdleFiltrationCoeffs map[string]map[string]float64
  1088. if idleFiltrationCoefficients != nil {
  1089. groupingIdleFiltrationCoeffs = map[string]map[string]float64{}
  1090. for idleId, m := range idleFiltrationCoefficients {
  1091. if _, ok := groupingIdleFiltrationCoeffs[idleId]; !ok {
  1092. groupingIdleFiltrationCoeffs[idleId] = map[string]float64{
  1093. "cpu": 0.0,
  1094. "gpu": 0.0,
  1095. "ram": 0.0,
  1096. }
  1097. }
  1098. for _, n := range m {
  1099. for resource, val := range n {
  1100. groupingIdleFiltrationCoeffs[idleId][resource] += val
  1101. }
  1102. }
  1103. }
  1104. }
  1105. // (7) If we have both un-shared idle allocations and idle filtration
  1106. // coefficients then apply those. See step (2b) for an example.
  1107. if len(aggSet.idleKeys) > 0 && groupingIdleFiltrationCoeffs != nil {
  1108. for idleKey := range aggSet.idleKeys {
  1109. idleAlloc := aggSet.Get(idleKey)
  1110. iaidleId, err := idleAlloc.getIdleId(options)
  1111. if err != nil {
  1112. log.Errorf("AllocationSet.AggregateBy: Idle allocation is missing idleId %s", idleAlloc.Name)
  1113. return err
  1114. }
  1115. if resourceCoeffs, ok := groupingIdleFiltrationCoeffs[iaidleId]; ok {
  1116. idleAlloc.CPUCost *= resourceCoeffs["cpu"]
  1117. idleAlloc.CPUCoreHours *= resourceCoeffs["cpu"]
  1118. idleAlloc.RAMCost *= resourceCoeffs["ram"]
  1119. idleAlloc.RAMByteHours *= resourceCoeffs["ram"]
  1120. idleAlloc.GPUCost *= resourceCoeffs["gpu"]
  1121. idleAlloc.GPUHours *= resourceCoeffs["gpu"]
  1122. }
  1123. }
  1124. }
  1125. // (8) Distribute shared allocations according to the share coefficients.
  1126. if shareSet.Length() > 0 {
  1127. for _, alloc := range aggSet.allocations {
  1128. for _, sharedAlloc := range shareSet.allocations {
  1129. if _, ok := shareCoefficients[alloc.Name]; !ok {
  1130. if !alloc.IsIdle() && !alloc.IsUnmounted() {
  1131. log.Warnf("AllocationSet.AggregateBy: error getting share coefficienct for '%s'", alloc.Name)
  1132. }
  1133. continue
  1134. }
  1135. alloc.SharedCost += sharedAlloc.TotalCost() * shareCoefficients[alloc.Name]
  1136. }
  1137. }
  1138. }
  1139. // (9) Aggregate external allocations into aggregated allocations. This may
  1140. // not be possible for every external allocation, but attempt to find an
  1141. // exact key match, given each external allocation's proerties, and
  1142. // aggregate if an exact match is found.
  1143. for _, alloc := range externalSet.allocations {
  1144. skip := false
  1145. for _, ff := range options.FilterFuncs {
  1146. if !ff(alloc) {
  1147. skip = true
  1148. break
  1149. }
  1150. }
  1151. if !skip {
  1152. key := alloc.generateKey(aggregateBy, options.LabelConfig)
  1153. alloc.Name = key
  1154. aggSet.Insert(alloc)
  1155. }
  1156. }
  1157. // (10) In the edge case that some idle has not been distributed because
  1158. // there is no usage of that resource type, add idle back to
  1159. // aggregations with only that cost applied.
  1160. // E.g. in the case where we have a result that looks like this on the
  1161. // frontend:
  1162. // Name CPU GPU RAM
  1163. // __idle__ $10 $12 $6
  1164. // kubecost $2 $0 $1
  1165. // Sharing idle weighted would result in no idle GPU cost being
  1166. // distributed, because the coefficient for the kubecost GPU cost would
  1167. // be zero. Thus, instead we re-add idle to the aggSet with distributed
  1168. // costs zeroed out but the undistributed costs left in.
  1169. // Name CPU GPU RAM
  1170. // __idle__ $0 $12 $0
  1171. // kubecost $12 $0 $7
  1172. if idleSet.Length() > 0 {
  1173. for _, idleAlloc := range idleSet.allocations {
  1174. // if the idle does not apply to the non-filtered values, skip it
  1175. skip := false
  1176. for _, ff := range options.FilterFuncs {
  1177. if !ff(idleAlloc) {
  1178. skip = true
  1179. break
  1180. }
  1181. }
  1182. if skip {
  1183. continue
  1184. }
  1185. idleId, err := idleAlloc.getIdleId(options)
  1186. if err != nil {
  1187. log.Errorf("AllocationSet.AggregateBy: idle allocation is missing idleId %s", idleAlloc.Name)
  1188. continue
  1189. }
  1190. hasUndistributableCost := false
  1191. if idleAlloc.CPUCost > 0 && allocatedTotalsMap[idleId]["cpu"] == 0 {
  1192. hasUndistributableCost = true
  1193. } else {
  1194. idleAlloc.CPUCost = 0
  1195. }
  1196. if idleAlloc.GPUCost > 0 && allocatedTotalsMap[idleId]["gpu"] == 0 {
  1197. hasUndistributableCost = true
  1198. } else {
  1199. idleAlloc.GPUCost = 0
  1200. }
  1201. if idleAlloc.RAMCost > 0 && allocatedTotalsMap[idleId]["ram"] == 0 {
  1202. hasUndistributableCost = true
  1203. } else {
  1204. idleAlloc.RAMCost = 0
  1205. }
  1206. if hasUndistributableCost {
  1207. idleAlloc.Name = fmt.Sprintf("%s/%s", idleId, IdleSuffix)
  1208. aggSet.Insert(idleAlloc)
  1209. }
  1210. }
  1211. }
  1212. // (11) Combine all idle allocations into a single "__idle__" allocation
  1213. if !options.SplitIdle {
  1214. for _, idleAlloc := range aggSet.IdleAllocations() {
  1215. aggSet.Delete(idleAlloc.Name)
  1216. idleAlloc.Name = IdleSuffix
  1217. aggSet.Insert(idleAlloc)
  1218. }
  1219. }
  1220. // TODO revisit this (ideally we just remove sharing from this function!)
  1221. // If filters and shared resources and shared idle are all enabled then
  1222. // we will over-count idle by exactly the portion that gets shared with the
  1223. // filtered allocations -- and idle filtration will miss this because it
  1224. // only filters the non-idle filtered costs.
  1225. //
  1226. // Consider the following example, from unit tests:
  1227. // - namespace1 28.000
  1228. // - namespace2 36.000
  1229. // - namespace3 18.000
  1230. // - cluster1/idle 20.000
  1231. // - cluster2/idle 10.000
  1232. //
  1233. // Now, we want to share namespace1, filter namespace2, and share idle:
  1234. //
  1235. // 1. Distribute idle
  1236. // ns1 ns2 ns3
  1237. // non-idle 28.000 36.000 18.000
  1238. // idle 14.688 10.312 5.000
  1239. //
  1240. // 2. Share namespace1
  1241. //
  1242. // ns2 ns3
  1243. // non-idle 36.000 18.000
  1244. // idle 10.312 5.000
  1245. // shared non-idle 18.667 9.333
  1246. // shared idle 9.792 4.896 (***)
  1247. //
  1248. // 3. Filter out all but namespace2
  1249. //
  1250. // ns2 = 36.000 + 10.312 + 18.667 + 9.792 = 74.771
  1251. //
  1252. // So, if we had NOT shared idle, we would expect something like this:
  1253. //
  1254. // ns2 = 36.000 + 18.667 = 54.667
  1255. // idle = 10.312 + 9.792 = 20.104
  1256. //
  1257. // But we will instead get this:
  1258. //
  1259. // ns2 = 36.000 + 18.667 = 54.667
  1260. // idle = 10.312 + 14.688 = 25.000
  1261. //
  1262. // Which over-shoots idle by 4.896 (***), i.e. precisely the amount of idle
  1263. // cost corresponding to namespace1 AND shared with namespace3. Phew.
  1264. //
  1265. // I originally wanted to fix this, but after 2 days, I'm punting with the
  1266. // recommendation that we rewrite this function soon. Too difficult.
  1267. // - Niko
  1268. as.allocations = aggSet.allocations
  1269. return nil
  1270. }
  1271. func computeShareCoeffs(aggregateBy []string, options *AllocationAggregationOptions, as *AllocationSet) (map[string]float64, error) {
  1272. // Compute coeffs by totalling per-allocation, then dividing by the total.
  1273. coeffs := map[string]float64{}
  1274. // Compute totals for all allocations
  1275. total := 0.0
  1276. // ShareEven counts each aggregation with even weight, whereas ShareWeighted
  1277. // counts each aggregation proportionally to its respective costs
  1278. shareType := options.ShareSplit
  1279. // Record allocation values first, then normalize by totals to get percentages
  1280. for _, alloc := range as.allocations {
  1281. if alloc.IsIdle() {
  1282. // Skip idle allocations in coefficient calculation
  1283. continue
  1284. }
  1285. if alloc.IsUnmounted() {
  1286. // Skip unmounted allocations in coefficient calculation
  1287. continue
  1288. }
  1289. // Determine the post-aggregation key under which the allocation will
  1290. // be shared.
  1291. name := alloc.generateKey(aggregateBy, options.LabelConfig)
  1292. // If the current allocation will be filtered out in step 3, contribute
  1293. // its share of the shared coefficient to a "__filtered__" bin, which
  1294. // will ultimately be dropped. This step ensures that the shared cost
  1295. // of a non-filtered allocation will be conserved even when the filter
  1296. // is removed. (Otherwise, all the shared cost will get redistributed
  1297. // over the unfiltered results, inflating their shared costs.)
  1298. filtered := false
  1299. for _, ff := range options.FilterFuncs {
  1300. if !ff(alloc) {
  1301. filtered = true
  1302. break
  1303. }
  1304. }
  1305. if filtered {
  1306. name = "__filtered__"
  1307. }
  1308. if shareType == ShareEven {
  1309. // Even distribution is not additive - set to 1.0 for everything
  1310. coeffs[name] = 1.0
  1311. // Total for even distribution is always the number of coefficients
  1312. total = float64(len(coeffs))
  1313. } else {
  1314. // Both are additive for weighted distribution, where each
  1315. // cumulative coefficient will be divided by the total.
  1316. coeffs[name] += alloc.TotalCost() - alloc.SharedCost
  1317. total += alloc.TotalCost() - alloc.SharedCost
  1318. }
  1319. }
  1320. // Normalize coefficients by totals
  1321. for a := range coeffs {
  1322. if coeffs[a] > 0 && total > 0 {
  1323. coeffs[a] /= total
  1324. } else {
  1325. log.Warnf("ETL: invalid values for shared coefficients: %v, %v", coeffs[a], total)
  1326. coeffs[a] = 0.0
  1327. }
  1328. }
  1329. return coeffs, nil
  1330. }
  1331. func computeIdleCoeffs(options *AllocationAggregationOptions, as *AllocationSet, shareSet *AllocationSet) (map[string]map[string]map[string]float64, map[string]map[string]float64, error) {
  1332. types := []string{"cpu", "gpu", "ram"}
  1333. // Compute idle coefficients, then save them in AllocationAggregationOptions
  1334. // [idle_id][allocation name][resource] = [coeff]
  1335. coeffs := map[string]map[string]map[string]float64{}
  1336. // Compute totals per resource for CPU, GPU, RAM, and PV
  1337. // [idle_id][resource] = [total]
  1338. totals := map[string]map[string]float64{}
  1339. // Record allocation values first, then normalize by totals to get percentages
  1340. for _, alloc := range as.allocations {
  1341. if alloc.IsIdle() {
  1342. // Skip idle allocations in coefficient calculation
  1343. continue
  1344. }
  1345. idleId, err := alloc.getIdleId(options)
  1346. if err != nil {
  1347. log.DedupedWarningf(3, "Missing Idle Key for %s", alloc.Name)
  1348. }
  1349. // get the name key for the allocation
  1350. name := alloc.Name
  1351. // Create key based tables if they don't exist
  1352. if _, ok := coeffs[idleId]; !ok {
  1353. coeffs[idleId] = map[string]map[string]float64{}
  1354. }
  1355. if _, ok := totals[idleId]; !ok {
  1356. totals[idleId] = map[string]float64{}
  1357. }
  1358. if _, ok := coeffs[idleId][name]; !ok {
  1359. coeffs[idleId][name] = map[string]float64{}
  1360. }
  1361. coeffs[idleId][name]["cpu"] += alloc.CPUTotalCost()
  1362. coeffs[idleId][name]["gpu"] += alloc.GPUTotalCost()
  1363. coeffs[idleId][name]["ram"] += alloc.RAMTotalCost()
  1364. totals[idleId]["cpu"] += alloc.CPUTotalCost()
  1365. totals[idleId]["gpu"] += alloc.GPUTotalCost()
  1366. totals[idleId]["ram"] += alloc.RAMTotalCost()
  1367. }
  1368. // Do the same for shared allocations
  1369. for _, alloc := range shareSet.allocations {
  1370. if alloc.IsIdle() {
  1371. // Skip idle allocations in coefficient calculation
  1372. continue
  1373. }
  1374. // idleId will be providerId or cluster
  1375. idleId, err := alloc.getIdleId(options)
  1376. if err != nil {
  1377. log.DedupedWarningf(3, "Missing Idle Key in share set for %s", alloc.Name)
  1378. }
  1379. // get the name key for the allocation
  1380. name := alloc.Name
  1381. // Create idleId based tables if they don't exist
  1382. if _, ok := coeffs[idleId]; !ok {
  1383. coeffs[idleId] = map[string]map[string]float64{}
  1384. }
  1385. if _, ok := totals[idleId]; !ok {
  1386. totals[idleId] = map[string]float64{}
  1387. }
  1388. if _, ok := coeffs[idleId][name]; !ok {
  1389. coeffs[idleId][name] = map[string]float64{}
  1390. }
  1391. coeffs[idleId][name]["cpu"] += alloc.CPUTotalCost()
  1392. coeffs[idleId][name]["gpu"] += alloc.GPUTotalCost()
  1393. coeffs[idleId][name]["ram"] += alloc.RAMTotalCost()
  1394. totals[idleId]["cpu"] += alloc.CPUTotalCost()
  1395. totals[idleId]["gpu"] += alloc.GPUTotalCost()
  1396. totals[idleId]["ram"] += alloc.RAMTotalCost()
  1397. }
  1398. // Normalize coefficients by totals
  1399. for id := range coeffs {
  1400. for a := range coeffs[id] {
  1401. for _, r := range types {
  1402. if coeffs[id][a][r] > 0 && totals[id][r] > 0 {
  1403. coeffs[id][a][r] /= totals[id][r]
  1404. }
  1405. }
  1406. }
  1407. }
  1408. return coeffs, totals, nil
  1409. }
  1410. // getIdleId returns the providerId or cluster of an Allocation depending on the IdleByNode
  1411. // option in the AllocationAggregationOptions and an error if the respective field is missing
  1412. func (a *Allocation) getIdleId(options *AllocationAggregationOptions) (string, error) {
  1413. var idleId string
  1414. if options.IdleByNode {
  1415. // Key allocations to ProviderId to match against node
  1416. idleId = fmt.Sprintf("%s/%s", a.Properties.Cluster, a.Properties.Node)
  1417. if idleId == "" {
  1418. return idleId, fmt.Errorf("ProviderId is not set")
  1419. }
  1420. } else {
  1421. // key the allocations by cluster id
  1422. idleId = a.Properties.Cluster
  1423. if idleId == "" {
  1424. return idleId, fmt.Errorf("ClusterProp is not set")
  1425. }
  1426. }
  1427. return idleId, nil
  1428. }
  1429. func (a *Allocation) generateKey(aggregateBy []string, labelConfig *LabelConfig) string {
  1430. if a == nil {
  1431. return ""
  1432. }
  1433. return a.Properties.GenerateKey(aggregateBy, labelConfig)
  1434. }
  1435. // Clone returns a new AllocationSet with a deep copy of the given
  1436. // AllocationSet's allocations.
  1437. func (as *AllocationSet) Clone() *AllocationSet {
  1438. if as == nil {
  1439. return nil
  1440. }
  1441. as.RLock()
  1442. defer as.RUnlock()
  1443. allocs := make(map[string]*Allocation, len(as.allocations))
  1444. for k, v := range as.allocations {
  1445. allocs[k] = v.Clone()
  1446. }
  1447. externalKeys := make(map[string]bool, len(as.externalKeys))
  1448. for k, v := range as.externalKeys {
  1449. externalKeys[k] = v
  1450. }
  1451. idleKeys := make(map[string]bool, len(as.idleKeys))
  1452. for k, v := range as.idleKeys {
  1453. idleKeys[k] = v
  1454. }
  1455. var errors []string
  1456. var warnings []string
  1457. if as.Errors != nil {
  1458. errors = make([]string, len(as.Errors))
  1459. copy(errors, as.Errors)
  1460. } else {
  1461. errors = nil
  1462. }
  1463. if as.Warnings != nil {
  1464. warnings := make([]string, len(as.Warnings))
  1465. copy(warnings, as.Warnings)
  1466. } else {
  1467. warnings = nil
  1468. }
  1469. return &AllocationSet{
  1470. allocations: allocs,
  1471. externalKeys: externalKeys,
  1472. idleKeys: idleKeys,
  1473. Window: as.Window.Clone(),
  1474. Errors: errors,
  1475. Warnings: warnings,
  1476. }
  1477. }
  1478. // Delete removes the allocation with the given name from the set
  1479. func (as *AllocationSet) Delete(name string) {
  1480. if as == nil {
  1481. return
  1482. }
  1483. as.Lock()
  1484. defer as.Unlock()
  1485. delete(as.externalKeys, name)
  1486. delete(as.idleKeys, name)
  1487. delete(as.allocations, name)
  1488. }
  1489. // Each invokes the given function for each Allocation in the set
  1490. func (as *AllocationSet) Each(f func(string, *Allocation)) {
  1491. if as == nil {
  1492. return
  1493. }
  1494. for k, a := range as.allocations {
  1495. f(k, a)
  1496. }
  1497. }
  1498. // End returns the End time of the AllocationSet window
  1499. func (as *AllocationSet) End() time.Time {
  1500. if as == nil {
  1501. log.Warnf("AllocationSet: calling End on nil AllocationSet")
  1502. return time.Unix(0, 0)
  1503. }
  1504. if as.Window.End() == nil {
  1505. log.Warnf("AllocationSet: AllocationSet with illegal window: End is nil; len(as.allocations)=%d", len(as.allocations))
  1506. return time.Unix(0, 0)
  1507. }
  1508. return *as.Window.End()
  1509. }
  1510. // Get returns the Allocation at the given key in the AllocationSet
  1511. func (as *AllocationSet) Get(key string) *Allocation {
  1512. as.RLock()
  1513. defer as.RUnlock()
  1514. if alloc, ok := as.allocations[key]; ok {
  1515. return alloc
  1516. }
  1517. return nil
  1518. }
  1519. // ExternalAllocations returns a map of the external allocations in the set.
  1520. // Returns clones of the actual Allocations, so mutability is not a problem.
  1521. func (as *AllocationSet) ExternalAllocations() map[string]*Allocation {
  1522. externals := map[string]*Allocation{}
  1523. if as.IsEmpty() {
  1524. return externals
  1525. }
  1526. as.RLock()
  1527. defer as.RUnlock()
  1528. for key := range as.externalKeys {
  1529. if alloc, ok := as.allocations[key]; ok {
  1530. externals[key] = alloc.Clone()
  1531. }
  1532. }
  1533. return externals
  1534. }
  1535. // ExternalCost returns the total aggregated external costs of the set
  1536. func (as *AllocationSet) ExternalCost() float64 {
  1537. if as.IsEmpty() {
  1538. return 0.0
  1539. }
  1540. as.RLock()
  1541. defer as.RUnlock()
  1542. externalCost := 0.0
  1543. for _, alloc := range as.allocations {
  1544. externalCost += alloc.ExternalCost
  1545. }
  1546. return externalCost
  1547. }
  1548. // IdleAllocations returns a map of the idle allocations in the AllocationSet.
  1549. // Returns clones of the actual Allocations, so mutability is not a problem.
  1550. func (as *AllocationSet) IdleAllocations() map[string]*Allocation {
  1551. idles := map[string]*Allocation{}
  1552. if as.IsEmpty() {
  1553. return idles
  1554. }
  1555. as.RLock()
  1556. defer as.RUnlock()
  1557. for key := range as.idleKeys {
  1558. if alloc, ok := as.allocations[key]; ok {
  1559. idles[key] = alloc.Clone()
  1560. }
  1561. }
  1562. return idles
  1563. }
  1564. // Insert aggregates the current entry in the AllocationSet by the given Allocation,
  1565. // but only if the Allocation is valid, i.e. matches the AllocationSet's window. If
  1566. // there is no existing entry, one is created. Nil error response indicates success.
  1567. func (as *AllocationSet) Insert(that *Allocation) error {
  1568. return as.insert(that)
  1569. }
  1570. func (as *AllocationSet) insert(that *Allocation) error {
  1571. if as == nil {
  1572. return fmt.Errorf("cannot insert into nil AllocationSet")
  1573. }
  1574. as.Lock()
  1575. defer as.Unlock()
  1576. if as.allocations == nil {
  1577. as.allocations = map[string]*Allocation{}
  1578. }
  1579. if as.externalKeys == nil {
  1580. as.externalKeys = map[string]bool{}
  1581. }
  1582. if as.idleKeys == nil {
  1583. as.idleKeys = map[string]bool{}
  1584. }
  1585. // Add the given Allocation to the existing entry, if there is one;
  1586. // otherwise just set directly into allocations
  1587. if _, ok := as.allocations[that.Name]; !ok {
  1588. as.allocations[that.Name] = that
  1589. } else {
  1590. as.allocations[that.Name].add(that)
  1591. }
  1592. // If the given Allocation is an external one, record that
  1593. if that.IsExternal() {
  1594. as.externalKeys[that.Name] = true
  1595. }
  1596. // If the given Allocation is an idle one, record that
  1597. if that.IsIdle() {
  1598. as.idleKeys[that.Name] = true
  1599. }
  1600. // Expand the window, just to be safe. It's possible that the Allocation will
  1601. // be set into the map without expanding it to the AllocationSet's window.
  1602. as.allocations[that.Name].Window = as.allocations[that.Name].Window.Expand(as.Window)
  1603. return nil
  1604. }
  1605. // IsEmpty returns true if the AllocationSet is nil, or if it contains
  1606. // zero allocations.
  1607. func (as *AllocationSet) IsEmpty() bool {
  1608. if as == nil || len(as.allocations) == 0 {
  1609. return true
  1610. }
  1611. as.RLock()
  1612. defer as.RUnlock()
  1613. return as.allocations == nil || len(as.allocations) == 0
  1614. }
  1615. // Length returns the number of Allocations in the set
  1616. func (as *AllocationSet) Length() int {
  1617. if as == nil {
  1618. return 0
  1619. }
  1620. as.RLock()
  1621. defer as.RUnlock()
  1622. return len(as.allocations)
  1623. }
  1624. // Map clones and returns a map of the AllocationSet's Allocations
  1625. func (as *AllocationSet) Map() map[string]*Allocation {
  1626. if as.IsEmpty() {
  1627. return map[string]*Allocation{}
  1628. }
  1629. return as.Clone().allocations
  1630. }
  1631. // MarshalJSON JSON-encodes the AllocationSet
  1632. func (as *AllocationSet) MarshalJSON() ([]byte, error) {
  1633. if as == nil {
  1634. return json.Marshal(map[string]*Allocation{})
  1635. }
  1636. as.RLock()
  1637. defer as.RUnlock()
  1638. return json.Marshal(as.allocations)
  1639. }
  1640. // ResetAdjustments sets all cost adjustment fields to zero
  1641. func (as *AllocationSet) ResetAdjustments() {
  1642. if as == nil {
  1643. return
  1644. }
  1645. as.Lock()
  1646. defer as.Unlock()
  1647. as.resetAdjustments()
  1648. }
  1649. func (as *AllocationSet) resetAdjustments() {
  1650. for _, a := range as.allocations {
  1651. a.ResetAdjustments()
  1652. }
  1653. }
  1654. // Resolution returns the AllocationSet's window duration
  1655. func (as *AllocationSet) Resolution() time.Duration {
  1656. return as.Window.Duration()
  1657. }
  1658. // Set uses the given Allocation to overwrite the existing entry in the
  1659. // AllocationSet under the Allocation's name.
  1660. func (as *AllocationSet) Set(alloc *Allocation) error {
  1661. if as.IsEmpty() {
  1662. as.Lock()
  1663. as.allocations = map[string]*Allocation{}
  1664. as.externalKeys = map[string]bool{}
  1665. as.idleKeys = map[string]bool{}
  1666. as.Unlock()
  1667. }
  1668. as.Lock()
  1669. defer as.Unlock()
  1670. as.allocations[alloc.Name] = alloc
  1671. // If the given Allocation is an external one, record that
  1672. if alloc.IsExternal() {
  1673. as.externalKeys[alloc.Name] = true
  1674. }
  1675. // If the given Allocation is an idle one, record that
  1676. if alloc.IsIdle() {
  1677. as.idleKeys[alloc.Name] = true
  1678. }
  1679. return nil
  1680. }
  1681. // Start returns the Start time of the AllocationSet window
  1682. func (as *AllocationSet) Start() time.Time {
  1683. if as == nil {
  1684. log.Warnf("AllocationSet: calling Start on nil AllocationSet")
  1685. return time.Unix(0, 0)
  1686. }
  1687. if as.Window.Start() == nil {
  1688. log.Warnf("AllocationSet: AllocationSet with illegal window: Start is nil; len(as.allocations)=%d", len(as.allocations))
  1689. return time.Unix(0, 0)
  1690. }
  1691. return *as.Window.Start()
  1692. }
  1693. // String represents the given Allocation as a string
  1694. func (as *AllocationSet) String() string {
  1695. if as == nil {
  1696. return "<nil>"
  1697. }
  1698. return fmt.Sprintf("AllocationSet{length: %d; window: %s; totalCost: %.2f}",
  1699. as.Length(), as.Window, as.TotalCost())
  1700. }
  1701. // TotalCost returns the sum of all TotalCosts of the allocations contained
  1702. func (as *AllocationSet) TotalCost() float64 {
  1703. if as.IsEmpty() {
  1704. return 0.0
  1705. }
  1706. as.RLock()
  1707. defer as.RUnlock()
  1708. tc := 0.0
  1709. for _, a := range as.allocations {
  1710. tc += a.TotalCost()
  1711. }
  1712. return tc
  1713. }
  1714. // UTCOffset returns the AllocationSet's configured UTCOffset.
  1715. func (as *AllocationSet) UTCOffset() time.Duration {
  1716. _, zone := as.Start().Zone()
  1717. return time.Duration(zone) * time.Second
  1718. }
  1719. func (as *AllocationSet) accumulate(that *AllocationSet) (*AllocationSet, error) {
  1720. if as.IsEmpty() {
  1721. return that.Clone(), nil
  1722. }
  1723. if that.IsEmpty() {
  1724. return as.Clone(), nil
  1725. }
  1726. // Set start, end to min(start), max(end)
  1727. start := as.Start()
  1728. end := as.End()
  1729. if that.Start().Before(start) {
  1730. start = that.Start()
  1731. }
  1732. if that.End().After(end) {
  1733. end = that.End()
  1734. }
  1735. acc := NewAllocationSet(start, end)
  1736. as.RLock()
  1737. defer as.RUnlock()
  1738. that.RLock()
  1739. defer that.RUnlock()
  1740. for _, alloc := range as.allocations {
  1741. err := acc.insert(alloc)
  1742. if err != nil {
  1743. return nil, err
  1744. }
  1745. }
  1746. for _, alloc := range that.allocations {
  1747. err := acc.insert(alloc)
  1748. if err != nil {
  1749. return nil, err
  1750. }
  1751. }
  1752. return acc, nil
  1753. }
  1754. // AllocationSetRange is a thread-safe slice of AllocationSets. It is meant to
  1755. // be used such that the AllocationSets held are consecutive and coherent with
  1756. // respect to using the same aggregation properties, UTC offset, and
  1757. // resolution. However these rules are not necessarily enforced, so use wisely.
  1758. type AllocationSetRange struct {
  1759. sync.RWMutex
  1760. allocations []*AllocationSet
  1761. FromStore string // stores the name of the store used to retrieve the data
  1762. }
  1763. // NewAllocationSetRange instantiates a new range composed of the given
  1764. // AllocationSets in the order provided.
  1765. func NewAllocationSetRange(allocs ...*AllocationSet) *AllocationSetRange {
  1766. return &AllocationSetRange{
  1767. allocations: allocs,
  1768. }
  1769. }
  1770. // Accumulate sums each AllocationSet in the given range, returning a single cumulative
  1771. // AllocationSet for the entire range.
  1772. func (asr *AllocationSetRange) Accumulate() (*AllocationSet, error) {
  1773. var allocSet *AllocationSet
  1774. var err error
  1775. asr.RLock()
  1776. defer asr.RUnlock()
  1777. for _, as := range asr.allocations {
  1778. allocSet, err = allocSet.accumulate(as)
  1779. if err != nil {
  1780. return nil, err
  1781. }
  1782. }
  1783. return allocSet, nil
  1784. }
  1785. // AccumulateBy sums AllocationSets based on the resolution given. The resolution given is subject to the scale used for the AllocationSets.
  1786. // Resolutions not evenly divisible by the AllocationSetRange window durations accumulate sets until a sum greater than or equal to the resolution is met,
  1787. // at which point AccumulateBy will start summing from 0 until the requested resolution is met again.
  1788. // If the requested resolution is smaller than the window of an AllocationSet then the resolution will default to the duration of a set.
  1789. // Resolutions larger than the duration of the entire AllocationSetRange will default to the duration of the range.
  1790. func (asr *AllocationSetRange) AccumulateBy(resolution time.Duration) (*AllocationSetRange, error) {
  1791. allocSetRange := NewAllocationSetRange()
  1792. var allocSet *AllocationSet
  1793. var err error
  1794. asr.Lock()
  1795. defer asr.Unlock()
  1796. for i, as := range asr.allocations {
  1797. allocSet, err = allocSet.accumulate(as)
  1798. if err != nil {
  1799. return nil, err
  1800. }
  1801. if allocSet != nil {
  1802. // check if end of asr to sum the final set
  1803. // If total asr accumulated sum <= resolution return 1 accumulated set
  1804. if allocSet.Window.Duration() >= resolution || i == len(asr.allocations)-1 {
  1805. allocSetRange.allocations = append(allocSetRange.allocations, allocSet)
  1806. allocSet = NewAllocationSet(time.Time{}, time.Time{})
  1807. }
  1808. }
  1809. }
  1810. return allocSetRange, nil
  1811. }
  1812. // AggregateBy aggregates each AllocationSet in the range by the given
  1813. // properties and options.
  1814. func (asr *AllocationSetRange) AggregateBy(aggregateBy []string, options *AllocationAggregationOptions) error {
  1815. aggRange := &AllocationSetRange{allocations: []*AllocationSet{}}
  1816. asr.Lock()
  1817. defer asr.Unlock()
  1818. for _, as := range asr.allocations {
  1819. err := as.AggregateBy(aggregateBy, options)
  1820. if err != nil {
  1821. return err
  1822. }
  1823. aggRange.allocations = append(aggRange.allocations, as)
  1824. }
  1825. asr.allocations = aggRange.allocations
  1826. return nil
  1827. }
  1828. // Append appends the given AllocationSet to the end of the range. It does not
  1829. // validate whether or not that violates window continuity.
  1830. func (asr *AllocationSetRange) Append(that *AllocationSet) {
  1831. asr.Lock()
  1832. defer asr.Unlock()
  1833. asr.allocations = append(asr.allocations, that)
  1834. }
  1835. // Each invokes the given function for each AllocationSet in the range
  1836. func (asr *AllocationSetRange) Each(f func(int, *AllocationSet)) {
  1837. if asr == nil {
  1838. return
  1839. }
  1840. for i, as := range asr.allocations {
  1841. f(i, as)
  1842. }
  1843. }
  1844. // Get retrieves the AllocationSet at the given index of the range.
  1845. func (asr *AllocationSetRange) Get(i int) (*AllocationSet, error) {
  1846. if i < 0 || i >= len(asr.allocations) {
  1847. return nil, fmt.Errorf("AllocationSetRange: index out of range: %d", i)
  1848. }
  1849. asr.RLock()
  1850. defer asr.RUnlock()
  1851. return asr.allocations[i], nil
  1852. }
  1853. // InsertRange merges the given AllocationSetRange into the receiving one by
  1854. // lining up sets with matching windows, then inserting each allocation from
  1855. // the given ASR into the respective set in the receiving ASR. If the given
  1856. // ASR contains an AllocationSet from a window that does not exist in the
  1857. // receiving ASR, then an error is returned. However, the given ASR does not
  1858. // need to cover the full range of the receiver.
  1859. func (asr *AllocationSetRange) InsertRange(that *AllocationSetRange) error {
  1860. if asr == nil {
  1861. return fmt.Errorf("cannot insert range into nil AllocationSetRange")
  1862. }
  1863. // keys maps window to index in asr
  1864. keys := map[string]int{}
  1865. asr.Each(func(i int, as *AllocationSet) {
  1866. if as == nil {
  1867. return
  1868. }
  1869. keys[as.Window.String()] = i
  1870. })
  1871. // Nothing to merge, so simply return
  1872. if len(keys) == 0 {
  1873. return nil
  1874. }
  1875. var err error
  1876. that.Each(func(j int, thatAS *AllocationSet) {
  1877. if thatAS == nil || err != nil {
  1878. return
  1879. }
  1880. // Find matching AllocationSet in asr
  1881. i, ok := keys[thatAS.Window.String()]
  1882. if !ok {
  1883. err = fmt.Errorf("cannot merge AllocationSet into window that does not exist: %s", thatAS.Window.String())
  1884. return
  1885. }
  1886. as, err := asr.Get(i)
  1887. if err != nil {
  1888. err = fmt.Errorf("AllocationSetRange index does not exist: %d", i)
  1889. return
  1890. }
  1891. // Insert each Allocation from the given set
  1892. thatAS.Each(func(k string, alloc *Allocation) {
  1893. err = as.Insert(alloc)
  1894. if err != nil {
  1895. err = fmt.Errorf("error inserting allocation: %s", err)
  1896. return
  1897. }
  1898. })
  1899. })
  1900. // err might be nil
  1901. return err
  1902. }
  1903. // Length returns the length of the range, which is zero if nil
  1904. func (asr *AllocationSetRange) Length() int {
  1905. if asr == nil || asr.allocations == nil {
  1906. return 0
  1907. }
  1908. asr.RLock()
  1909. defer asr.RUnlock()
  1910. return len(asr.allocations)
  1911. }
  1912. // MarshalJSON JSON-encodes the range
  1913. func (asr *AllocationSetRange) MarshalJSON() ([]byte, error) {
  1914. if asr == nil {
  1915. return json.Marshal([]*AllocationSet{})
  1916. }
  1917. asr.RLock()
  1918. defer asr.RUnlock()
  1919. return json.Marshal(asr.allocations)
  1920. }
  1921. // Slice copies the underlying slice of AllocationSets, maintaining order,
  1922. // and returns the copied slice.
  1923. func (asr *AllocationSetRange) Slice() []*AllocationSet {
  1924. if asr == nil || asr.allocations == nil {
  1925. return nil
  1926. }
  1927. asr.RLock()
  1928. defer asr.RUnlock()
  1929. copy := []*AllocationSet{}
  1930. for _, as := range asr.allocations {
  1931. copy = append(copy, as.Clone())
  1932. }
  1933. return copy
  1934. }
  1935. // String represents the given AllocationSetRange as a string
  1936. func (asr *AllocationSetRange) String() string {
  1937. if asr == nil {
  1938. return "<nil>"
  1939. }
  1940. return fmt.Sprintf("AllocationSetRange{length: %d}", asr.Length())
  1941. }
  1942. // UTCOffset returns the detected UTCOffset of the AllocationSets within the
  1943. // range. Defaults to 0 if the range is nil or empty. Does not warn if there
  1944. // are sets with conflicting UTCOffsets (just returns the first).
  1945. func (asr *AllocationSetRange) UTCOffset() time.Duration {
  1946. if asr.Length() == 0 {
  1947. return 0
  1948. }
  1949. as, err := asr.Get(0)
  1950. if err != nil {
  1951. return 0
  1952. }
  1953. return as.UTCOffset()
  1954. }
  1955. // Window returns the full window that the AllocationSetRange spans, from the
  1956. // start of the first AllocationSet to the end of the last one.
  1957. func (asr *AllocationSetRange) Window() Window {
  1958. if asr == nil || asr.Length() == 0 {
  1959. return NewWindow(nil, nil)
  1960. }
  1961. start := asr.allocations[0].Start()
  1962. end := asr.allocations[asr.Length()-1].End()
  1963. return NewWindow(&start, &end)
  1964. }
  1965. // Start returns the earliest start of all Allocations in the AllocationSetRange.
  1966. // It returns an error if there are no allocations.
  1967. func (asr *AllocationSetRange) Start() (time.Time, error) {
  1968. start := time.Time{}
  1969. firstStartNotSet := true
  1970. asr.Each(func(i int, as *AllocationSet) {
  1971. as.Each(func(s string, a *Allocation) {
  1972. if firstStartNotSet {
  1973. start = a.Start
  1974. firstStartNotSet = false
  1975. }
  1976. if a.Start.Before(start) {
  1977. start = a.Start
  1978. }
  1979. })
  1980. })
  1981. if firstStartNotSet {
  1982. return start, fmt.Errorf("had no data to compute a start from")
  1983. }
  1984. return start, nil
  1985. }
  1986. // End returns the latest end of all Allocations in the AllocationSetRange.
  1987. // It returns an error if there are no allocations.
  1988. func (asr *AllocationSetRange) End() (time.Time, error) {
  1989. end := time.Time{}
  1990. firstEndNotSet := true
  1991. asr.Each(func(i int, as *AllocationSet) {
  1992. as.Each(func(s string, a *Allocation) {
  1993. if firstEndNotSet {
  1994. end = a.End
  1995. firstEndNotSet = false
  1996. }
  1997. if a.End.After(end) {
  1998. end = a.End
  1999. }
  2000. })
  2001. })
  2002. if firstEndNotSet {
  2003. return end, fmt.Errorf("had no data to compute an end from")
  2004. }
  2005. return end, nil
  2006. }
  2007. // Minutes returns the duration, in minutes, between the earliest start
  2008. // and the latest end of all allocations in the AllocationSetRange.
  2009. func (asr *AllocationSetRange) Minutes() float64 {
  2010. start, err := asr.Start()
  2011. if err != nil {
  2012. return 0
  2013. }
  2014. end, err := asr.End()
  2015. if err != nil {
  2016. return 0
  2017. }
  2018. duration := end.Sub(start)
  2019. return duration.Minutes()
  2020. }
  2021. // TotalCost returns the sum of all TotalCosts of the allocations contained
  2022. func (asr *AllocationSetRange) TotalCost() float64 {
  2023. if asr == nil || len(asr.allocations) == 0 {
  2024. return 0.0
  2025. }
  2026. asr.RLock()
  2027. defer asr.RUnlock()
  2028. tc := 0.0
  2029. for _, as := range asr.allocations {
  2030. tc += as.TotalCost()
  2031. }
  2032. return tc
  2033. }