allocation.go 83 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587
  1. package kubecost
  2. import (
  3. "bytes"
  4. "fmt"
  5. "sort"
  6. "strings"
  7. "sync"
  8. "time"
  9. "github.com/kubecost/cost-model/pkg/log"
  10. "github.com/kubecost/cost-model/pkg/util"
  11. "github.com/kubecost/cost-model/pkg/util/json"
  12. )
  13. // TODO Clean-up use of IsEmpty; nil checks should be separated for safety.
  14. // TODO Consider making Allocation an interface, which is fulfilled by structs
  15. // like KubernetesAllocation, IdleAllocation, and ExternalAllocation.
  16. // ExternalSuffix indicates an external allocation
  17. const ExternalSuffix = "__external__"
  18. // IdleSuffix indicates an idle allocation property
  19. const IdleSuffix = "__idle__"
  20. // SharedSuffix indicates an shared allocation property
  21. const SharedSuffix = "__shared__"
  22. // UnallocatedSuffix indicates an unallocated allocation property
  23. const UnallocatedSuffix = "__unallocated__"
  24. // UnmountedSuffix indicated allocation to an unmounted PV
  25. const UnmountedSuffix = "__unmounted__"
  26. // ShareWeighted indicates that a shared resource should be shared as a
  27. // proportion of the cost of the remaining allocations.
  28. const ShareWeighted = "__weighted__"
  29. // ShareEven indicates that a shared resource should be shared evenly across
  30. // all remaining allocations.
  31. const ShareEven = "__even__"
  32. // ShareNone indicates that a shareable resource should not be shared
  33. const ShareNone = "__none__"
  34. // Allocation is a unit of resource allocation and cost for a given window
  35. // of time and for a given kubernetes construct with its associated set of
  36. // properties.
  37. // TODO:CLEANUP consider dropping name in favor of just Allocation and an
  38. // Assets-style key() function for AllocationSet.
  39. type Allocation struct {
  40. Name string `json:"name"`
  41. Properties *AllocationProperties `json:"properties,omitempty"`
  42. Window Window `json:"window"`
  43. Start time.Time `json:"start"`
  44. End time.Time `json:"end"`
  45. CPUCoreHours float64 `json:"cpuCoreHours"`
  46. CPUCoreRequestAverage float64 `json:"cpuCoreRequestAverage"`
  47. CPUCoreUsageAverage float64 `json:"cpuCoreUsageAverage"`
  48. CPUCost float64 `json:"cpuCost"`
  49. CPUCostAdjustment float64 `json:"cpuCostAdjustment"`
  50. GPUHours float64 `json:"gpuHours"`
  51. GPURequestAverage float64 `json:"gpuRequestAverage"`
  52. GPUUsageAverage float64 `json:"gpuUsageAverage"`
  53. GPUCost float64 `json:"gpuCost"`
  54. GPUCostAdjustment float64 `json:"gpuCostAdjustment"`
  55. NetworkTransferBytes float64 `json:"networkTransferBytes"`
  56. NetworkReceiveBytes float64 `json:"networkReceiveBytes"`
  57. NetworkCost float64 `json:"networkCost"`
  58. NetworkCostAdjustment float64 `json:"networkCostAdjustment"`
  59. LoadBalancerCost float64 `json:"loadBalancerCost"`
  60. LoadBalancerCostAdjustment float64 `json:"loadBalancerCostAdjustment"`
  61. PVs PVAllocations `json:"-"`
  62. PVCostAdjustment float64 `json:"pvCostAdjustment"`
  63. RAMByteHours float64 `json:"ramByteHours"`
  64. RAMBytesRequestAverage float64 `json:"ramByteRequestAverage"`
  65. RAMBytesUsageAverage float64 `json:"ramByteUsageAverage"`
  66. RAMCost float64 `json:"ramCost"`
  67. RAMCostAdjustment float64 `json:"ramCostAdjustment"`
  68. SharedCost float64 `json:"sharedCost"`
  69. ExternalCost float64 `json:"externalCost"`
  70. // RawAllocationOnly is a pointer so if it is not present it will be
  71. // marshalled as null rather than as an object with Go default values.
  72. RawAllocationOnly *RawAllocationOnlyData `json:"rawAllocationOnly"`
  73. }
  74. // RawAllocationOnlyData is information that only belong in "raw" Allocations,
  75. // those which have not undergone aggregation, accumulation, or any other form
  76. // of combination to produce a new Allocation from other Allocations.
  77. //
  78. // Max usage data belongs here because computing the overall maximum from two
  79. // or more Allocations is a non-trivial operation that cannot be defined without
  80. // maintaining a large amount of state. Consider the following example:
  81. // _______________________________________________
  82. //
  83. // A1 Using 3 CPU ---- ----- ------
  84. // A2 Using 2 CPU ---- ----- ----
  85. // A3 Using 1 CPU --- --
  86. // _______________________________________________
  87. // Time ---->
  88. //
  89. // The logical maximum CPU usage is 5, but this cannot be calculated iteratively,
  90. // which is how we calculate aggregations and accumulations of Allocations currently.
  91. // This becomes a problem I could call "maximum sum of overlapping intervals" and is
  92. // essentially a variant of an interval scheduling algorithm.
  93. //
  94. // If we had types to differentiate between regular Allocations and AggregatedAllocations
  95. // then this type would be unnecessary and its fields would go into the regular Allocation
  96. // and not in the AggregatedAllocation.
  97. type RawAllocationOnlyData struct {
  98. CPUCoreUsageMax float64 `json:"cpuCoreUsageMax"`
  99. RAMBytesUsageMax float64 `json:"ramByteUsageMax"`
  100. }
  101. // PVAllocations is a map of Disk Asset Identifiers to the
  102. // usage of them by an Allocation as recorded in a PVAllocation
  103. type PVAllocations map[PVKey]*PVAllocation
  104. // Clone creates a deep copy of a PVAllocations
  105. func (pv *PVAllocations) Clone() PVAllocations {
  106. if pv == nil || *pv == nil {
  107. return nil
  108. }
  109. apv := *pv
  110. clonePV := PVAllocations{}
  111. for k, v := range apv {
  112. clonePV[k] = &PVAllocation{
  113. ByteHours: v.ByteHours,
  114. Cost: v.Cost,
  115. }
  116. }
  117. return clonePV
  118. }
  119. // Add adds contents of that to the calling PVAllocations
  120. func (pv *PVAllocations) Add(that PVAllocations) PVAllocations {
  121. apv := pv.Clone()
  122. if that != nil {
  123. if apv == nil {
  124. apv = PVAllocations{}
  125. }
  126. for pvKey, thatPVAlloc := range that {
  127. apvAlloc, ok := apv[pvKey]
  128. if !ok {
  129. apvAlloc = &PVAllocation{}
  130. }
  131. apvAlloc.Cost += thatPVAlloc.Cost
  132. apvAlloc.ByteHours += thatPVAlloc.ByteHours
  133. apv[pvKey] = apvAlloc
  134. }
  135. }
  136. return apv
  137. }
  138. // PVKey for identifying Disk type assets
  139. type PVKey struct {
  140. Cluster string `json:"cluster"`
  141. Name string `json:"name"`
  142. }
  143. // PVAllocation contains the byte hour usage
  144. // and cost of an Allocation for a single PV
  145. type PVAllocation struct {
  146. ByteHours float64 `json:"byteHours"`
  147. Cost float64 `json:"cost"`
  148. }
  149. // AllocationMatchFunc is a function that can be used to match Allocations by
  150. // returning true for any given Allocation if a condition is met.
  151. type AllocationMatchFunc func(*Allocation) bool
  152. // Add returns the result of summing the two given Allocations, which sums the
  153. // summary fields (e.g. costs, resources) and recomputes efficiency. Neither of
  154. // the two original Allocations are mutated in the process.
  155. func (a *Allocation) Add(that *Allocation) (*Allocation, error) {
  156. if a == nil {
  157. return that.Clone(), nil
  158. }
  159. if that == nil {
  160. return a.Clone(), nil
  161. }
  162. // Note: no need to clone "that", as add only mutates the receiver
  163. agg := a.Clone()
  164. agg.add(that)
  165. return agg, nil
  166. }
  167. // Clone returns a deep copy of the given Allocation
  168. func (a *Allocation) Clone() *Allocation {
  169. if a == nil {
  170. return nil
  171. }
  172. return &Allocation{
  173. Name: a.Name,
  174. Properties: a.Properties.Clone(),
  175. Window: a.Window.Clone(),
  176. Start: a.Start,
  177. End: a.End,
  178. CPUCoreHours: a.CPUCoreHours,
  179. CPUCoreRequestAverage: a.CPUCoreRequestAverage,
  180. CPUCoreUsageAverage: a.CPUCoreUsageAverage,
  181. CPUCost: a.CPUCost,
  182. CPUCostAdjustment: a.CPUCostAdjustment,
  183. GPUHours: a.GPUHours,
  184. GPURequestAverage: a.GPURequestAverage,
  185. GPUUsageAverage: a.GPUUsageAverage,
  186. GPUCost: a.GPUCost,
  187. GPUCostAdjustment: a.GPUCostAdjustment,
  188. NetworkTransferBytes: a.NetworkTransferBytes,
  189. NetworkReceiveBytes: a.NetworkReceiveBytes,
  190. NetworkCost: a.NetworkCost,
  191. NetworkCostAdjustment: a.NetworkCostAdjustment,
  192. LoadBalancerCost: a.LoadBalancerCost,
  193. LoadBalancerCostAdjustment: a.LoadBalancerCostAdjustment,
  194. PVs: a.PVs.Clone(),
  195. PVCostAdjustment: a.PVCostAdjustment,
  196. RAMByteHours: a.RAMByteHours,
  197. RAMBytesRequestAverage: a.RAMBytesRequestAverage,
  198. RAMBytesUsageAverage: a.RAMBytesUsageAverage,
  199. RAMCost: a.RAMCost,
  200. RAMCostAdjustment: a.RAMCostAdjustment,
  201. SharedCost: a.SharedCost,
  202. ExternalCost: a.ExternalCost,
  203. RawAllocationOnly: a.RawAllocationOnly.Clone(),
  204. }
  205. }
  206. // Clone returns a deep copy of the given RawAllocationOnlyData
  207. func (r *RawAllocationOnlyData) Clone() *RawAllocationOnlyData {
  208. if r == nil {
  209. return nil
  210. }
  211. return &RawAllocationOnlyData{
  212. CPUCoreUsageMax: r.CPUCoreUsageMax,
  213. RAMBytesUsageMax: r.RAMBytesUsageMax,
  214. }
  215. }
  216. // Equal returns true if the values held in the given Allocation precisely
  217. // match those of the receiving Allocation. nil does not match nil. Floating
  218. // point values need to match according to util.IsApproximately, which accounts
  219. // for small, reasonable floating point error margins.
  220. func (a *Allocation) Equal(that *Allocation) bool {
  221. if a == nil || that == nil {
  222. return false
  223. }
  224. if a.Name != that.Name {
  225. return false
  226. }
  227. if !a.Properties.Equal(that.Properties) {
  228. return false
  229. }
  230. if !a.Window.Equal(that.Window) {
  231. return false
  232. }
  233. if !a.Start.Equal(that.Start) {
  234. return false
  235. }
  236. if !a.End.Equal(that.End) {
  237. return false
  238. }
  239. if !util.IsApproximately(a.CPUCoreHours, that.CPUCoreHours) {
  240. return false
  241. }
  242. if !util.IsApproximately(a.CPUCost, that.CPUCost) {
  243. return false
  244. }
  245. if !util.IsApproximately(a.CPUCostAdjustment, that.CPUCostAdjustment) {
  246. return false
  247. }
  248. if !util.IsApproximately(a.GPUHours, that.GPUHours) {
  249. return false
  250. }
  251. if !util.IsApproximately(a.GPURequestAverage, that.GPURequestAverage) {
  252. return false
  253. }
  254. if !util.IsApproximately(a.GPUUsageAverage, that.GPUUsageAverage) {
  255. return false
  256. }
  257. if !util.IsApproximately(a.GPUCost, that.GPUCost) {
  258. return false
  259. }
  260. if !util.IsApproximately(a.GPUCostAdjustment, that.GPUCostAdjustment) {
  261. return false
  262. }
  263. if !util.IsApproximately(a.NetworkTransferBytes, that.NetworkTransferBytes) {
  264. return false
  265. }
  266. if !util.IsApproximately(a.NetworkReceiveBytes, that.NetworkReceiveBytes) {
  267. return false
  268. }
  269. if !util.IsApproximately(a.NetworkCost, that.NetworkCost) {
  270. return false
  271. }
  272. if !util.IsApproximately(a.NetworkCostAdjustment, that.NetworkCostAdjustment) {
  273. return false
  274. }
  275. if !util.IsApproximately(a.LoadBalancerCost, that.LoadBalancerCost) {
  276. return false
  277. }
  278. if !util.IsApproximately(a.LoadBalancerCostAdjustment, that.LoadBalancerCostAdjustment) {
  279. return false
  280. }
  281. if !util.IsApproximately(a.PVCostAdjustment, that.PVCostAdjustment) {
  282. return false
  283. }
  284. if !util.IsApproximately(a.RAMByteHours, that.RAMByteHours) {
  285. return false
  286. }
  287. if !util.IsApproximately(a.RAMCost, that.RAMCost) {
  288. return false
  289. }
  290. if !util.IsApproximately(a.RAMCostAdjustment, that.RAMCostAdjustment) {
  291. return false
  292. }
  293. if !util.IsApproximately(a.SharedCost, that.SharedCost) {
  294. return false
  295. }
  296. if !util.IsApproximately(a.ExternalCost, that.ExternalCost) {
  297. return false
  298. }
  299. if a.RawAllocationOnly == nil && that.RawAllocationOnly != nil {
  300. return false
  301. }
  302. if a.RawAllocationOnly != nil && that.RawAllocationOnly == nil {
  303. return false
  304. }
  305. if a.RawAllocationOnly != nil && that.RawAllocationOnly != nil {
  306. if !util.IsApproximately(a.RawAllocationOnly.CPUCoreUsageMax, that.RawAllocationOnly.CPUCoreUsageMax) {
  307. return false
  308. }
  309. if !util.IsApproximately(a.RawAllocationOnly.RAMBytesUsageMax, that.RawAllocationOnly.RAMBytesUsageMax) {
  310. return false
  311. }
  312. }
  313. aPVs := a.PVs
  314. thatPVs := that.PVs
  315. if len(aPVs) == len(thatPVs) {
  316. for k, pv := range aPVs {
  317. tv, ok := thatPVs[k]
  318. if !ok || *tv != *pv {
  319. return false
  320. }
  321. }
  322. } else {
  323. return false
  324. }
  325. return true
  326. }
  327. // TotalCost is the total cost of the Allocation including adjustments
  328. func (a *Allocation) TotalCost() float64 {
  329. return a.CPUTotalCost() + a.GPUTotalCost() + a.RAMTotalCost() + a.PVTotalCost() + a.NetworkTotalCost() + a.LBTotalCost() + a.SharedTotalCost() + a.ExternalCost
  330. }
  331. // CPUTotalCost calculates total CPU cost of Allocation including adjustment
  332. func (a *Allocation) CPUTotalCost() float64 {
  333. return a.CPUCost + a.CPUCostAdjustment
  334. }
  335. // GPUTotalCost calculates total GPU cost of Allocation including adjustment
  336. func (a *Allocation) GPUTotalCost() float64 {
  337. return a.GPUCost + a.GPUCostAdjustment
  338. }
  339. // RAMTotalCost calculates total RAM cost of Allocation including adjustment
  340. func (a *Allocation) RAMTotalCost() float64 {
  341. return a.RAMCost + a.RAMCostAdjustment
  342. }
  343. // PVTotalCost calculates total PV cost of Allocation including adjustment
  344. func (a *Allocation) PVTotalCost() float64 {
  345. return a.PVCost() + a.PVCostAdjustment
  346. }
  347. // NetworkTotalCost calculates total Network cost of Allocation including adjustment
  348. func (a *Allocation) NetworkTotalCost() float64 {
  349. return a.NetworkCost + a.NetworkCostAdjustment
  350. }
  351. // LBTotalCost calculates total LB cost of Allocation including adjustment
  352. func (a *Allocation) LBTotalCost() float64 {
  353. return a.LoadBalancerCost + a.LoadBalancerCostAdjustment
  354. }
  355. // SharedTotalCost calculates total shared cost of Allocation including adjustment
  356. func (a *Allocation) SharedTotalCost() float64 {
  357. return a.SharedCost
  358. }
  359. // PVCost calculate cumulative cost of all PVs that Allocation is attached to
  360. func (a *Allocation) PVCost() float64 {
  361. cost := 0.0
  362. for _, pv := range a.PVs {
  363. cost += pv.Cost
  364. }
  365. return cost
  366. }
  367. // PVByteHours calculate cumulative ByteHours of all PVs that Allocation is attached to
  368. func (a *Allocation) PVByteHours() float64 {
  369. byteHours := 0.0
  370. for _, pv := range a.PVs {
  371. byteHours += pv.ByteHours
  372. }
  373. return byteHours
  374. }
  375. // CPUEfficiency is the ratio of usage to request. If there is no request and
  376. // no usage or cost, then efficiency is zero. If there is no request, but there
  377. // is usage or cost, then efficiency is 100%.
  378. func (a *Allocation) CPUEfficiency() float64 {
  379. if a.CPUCoreRequestAverage > 0 {
  380. return a.CPUCoreUsageAverage / a.CPUCoreRequestAverage
  381. }
  382. if a.CPUCoreUsageAverage == 0.0 || a.CPUCost == 0.0 {
  383. return 0.0
  384. }
  385. return 1.0
  386. }
  387. // RAMEfficiency is the ratio of usage to request. If there is no request and
  388. // no usage or cost, then efficiency is zero. If there is no request, but there
  389. // is usage or cost, then efficiency is 100%.
  390. func (a *Allocation) RAMEfficiency() float64 {
  391. if a.RAMBytesRequestAverage > 0 {
  392. return a.RAMBytesUsageAverage / a.RAMBytesRequestAverage
  393. }
  394. if a.RAMBytesUsageAverage == 0.0 || a.RAMCost == 0.0 {
  395. return 0.0
  396. }
  397. return 1.0
  398. }
  399. // TotalEfficiency is the cost-weighted average of CPU and RAM efficiency. If
  400. // there is no cost at all, then efficiency is zero.
  401. func (a *Allocation) TotalEfficiency() float64 {
  402. if a.RAMTotalCost()+a.CPUTotalCost() > 0 {
  403. ramCostEff := a.RAMEfficiency() * a.RAMTotalCost()
  404. cpuCostEff := a.CPUEfficiency() * a.CPUTotalCost()
  405. return (ramCostEff + cpuCostEff) / (a.CPUTotalCost() + a.RAMTotalCost())
  406. }
  407. return 0.0
  408. }
  409. // CPUCores converts the Allocation's CPUCoreHours into average CPUCores
  410. func (a *Allocation) CPUCores() float64 {
  411. if a.Minutes() <= 0.0 {
  412. return 0.0
  413. }
  414. return a.CPUCoreHours / (a.Minutes() / 60.0)
  415. }
  416. // RAMBytes converts the Allocation's RAMByteHours into average RAMBytes
  417. func (a *Allocation) RAMBytes() float64 {
  418. if a.Minutes() <= 0.0 {
  419. return 0.0
  420. }
  421. return a.RAMByteHours / (a.Minutes() / 60.0)
  422. }
  423. // GPUs converts the Allocation's GPUHours into average GPUs
  424. func (a *Allocation) GPUs() float64 {
  425. if a.Minutes() <= 0.0 {
  426. return 0.0
  427. }
  428. return a.GPUHours / (a.Minutes() / 60.0)
  429. }
  430. // PVBytes converts the Allocation's PVByteHours into average PVBytes
  431. func (a *Allocation) PVBytes() float64 {
  432. if a.Minutes() <= 0.0 {
  433. return 0.0
  434. }
  435. return a.PVByteHours() / (a.Minutes() / 60.0)
  436. }
  437. // ResetAdjustments sets all cost adjustment fields to zero
  438. func (a *Allocation) ResetAdjustments() {
  439. a.CPUCostAdjustment = 0.0
  440. a.GPUCostAdjustment = 0.0
  441. a.RAMCostAdjustment = 0.0
  442. a.PVCostAdjustment = 0.0
  443. a.NetworkCostAdjustment = 0.0
  444. a.LoadBalancerCostAdjustment = 0.0
  445. }
  446. // MarshalJSON implements json.Marshaler interface
  447. func (a *Allocation) MarshalJSON() ([]byte, error) {
  448. buffer := bytes.NewBufferString("{")
  449. jsonEncodeString(buffer, "name", a.Name, ",")
  450. jsonEncode(buffer, "properties", a.Properties, ",")
  451. jsonEncode(buffer, "window", a.Window, ",")
  452. jsonEncodeString(buffer, "start", a.Start.Format(time.RFC3339), ",")
  453. jsonEncodeString(buffer, "end", a.End.Format(time.RFC3339), ",")
  454. jsonEncodeFloat64(buffer, "minutes", a.Minutes(), ",")
  455. jsonEncodeFloat64(buffer, "cpuCores", a.CPUCores(), ",")
  456. jsonEncodeFloat64(buffer, "cpuCoreRequestAverage", a.CPUCoreRequestAverage, ",")
  457. jsonEncodeFloat64(buffer, "cpuCoreUsageAverage", a.CPUCoreUsageAverage, ",")
  458. jsonEncodeFloat64(buffer, "cpuCoreHours", a.CPUCoreHours, ",")
  459. jsonEncodeFloat64(buffer, "cpuCost", a.CPUCost, ",")
  460. jsonEncodeFloat64(buffer, "cpuCostAdjustment", a.CPUCostAdjustment, ",")
  461. jsonEncodeFloat64(buffer, "cpuEfficiency", a.CPUEfficiency(), ",")
  462. jsonEncodeFloat64(buffer, "gpuCount", a.GPUs(), ",")
  463. jsonEncodeFloat64(buffer, "gpuHours", a.GPUHours, ",")
  464. jsonEncodeFloat64(buffer, "gpuRequestAverage", a.GPURequestAverage, ",")
  465. jsonEncodeFloat64(buffer, "gpuUsageAverage", a.GPUUsageAverage, ",")
  466. jsonEncodeFloat64(buffer, "gpuCost", a.GPUCost, ",")
  467. jsonEncodeFloat64(buffer, "gpuCostAdjustment", a.GPUCostAdjustment, ",")
  468. jsonEncodeFloat64(buffer, "networkTransferBytes", a.NetworkTransferBytes, ",")
  469. jsonEncodeFloat64(buffer, "networkReceiveBytes", a.NetworkReceiveBytes, ",")
  470. jsonEncodeFloat64(buffer, "networkCost", a.NetworkCost, ",")
  471. jsonEncodeFloat64(buffer, "networkCostAdjustment", a.NetworkCostAdjustment, ",")
  472. jsonEncodeFloat64(buffer, "loadBalancerCost", a.LoadBalancerCost, ",")
  473. jsonEncodeFloat64(buffer, "loadBalancerCostAdjustment", a.LoadBalancerCostAdjustment, ",")
  474. jsonEncodeFloat64(buffer, "pvBytes", a.PVBytes(), ",")
  475. jsonEncodeFloat64(buffer, "pvByteHours", a.PVByteHours(), ",")
  476. jsonEncodeFloat64(buffer, "pvCost", a.PVCost(), ",")
  477. jsonEncode(buffer, "pvs", a.PVs, ",") // Todo Sean: this does not work properly
  478. jsonEncodeFloat64(buffer, "pvCostAdjustment", a.PVCostAdjustment, ",")
  479. jsonEncodeFloat64(buffer, "ramBytes", a.RAMBytes(), ",")
  480. jsonEncodeFloat64(buffer, "ramByteRequestAverage", a.RAMBytesRequestAverage, ",")
  481. jsonEncodeFloat64(buffer, "ramByteUsageAverage", a.RAMBytesUsageAverage, ",")
  482. jsonEncodeFloat64(buffer, "ramByteHours", a.RAMByteHours, ",")
  483. jsonEncodeFloat64(buffer, "ramCost", a.RAMCost, ",")
  484. jsonEncodeFloat64(buffer, "ramCostAdjustment", a.RAMCostAdjustment, ",")
  485. jsonEncodeFloat64(buffer, "ramEfficiency", a.RAMEfficiency(), ",")
  486. jsonEncodeFloat64(buffer, "sharedCost", a.SharedCost, ",")
  487. jsonEncodeFloat64(buffer, "externalCost", a.ExternalCost, ",")
  488. jsonEncodeFloat64(buffer, "totalCost", a.TotalCost(), ",")
  489. jsonEncodeFloat64(buffer, "totalEfficiency", a.TotalEfficiency(), ",")
  490. jsonEncode(buffer, "rawAllocationOnly", a.RawAllocationOnly, "")
  491. buffer.WriteString("}")
  492. return buffer.Bytes(), nil
  493. }
  494. // Resolution returns the duration of time covered by the Allocation
  495. func (a *Allocation) Resolution() time.Duration {
  496. return a.End.Sub(a.Start)
  497. }
  498. // IsAggregated is true if the given Allocation has been aggregated, which we
  499. // define by a lack of AllocationProperties.
  500. func (a *Allocation) IsAggregated() bool {
  501. return a == nil || a.Properties == nil
  502. }
  503. // IsExternal is true if the given Allocation represents external costs.
  504. func (a *Allocation) IsExternal() bool {
  505. return strings.Contains(a.Name, ExternalSuffix)
  506. }
  507. // IsIdle is true if the given Allocation represents idle costs.
  508. func (a *Allocation) IsIdle() bool {
  509. return strings.Contains(a.Name, IdleSuffix)
  510. }
  511. // IsUnallocated is true if the given Allocation represents unallocated costs.
  512. func (a *Allocation) IsUnallocated() bool {
  513. return strings.Contains(a.Name, UnallocatedSuffix)
  514. }
  515. // IsUnmounted is true if the given Allocation represents unmounted volume costs.
  516. func (a *Allocation) IsUnmounted() bool {
  517. return strings.Contains(a.Name, UnmountedSuffix)
  518. }
  519. // Minutes returns the number of minutes the Allocation represents, as defined
  520. // by the difference between the end and start times.
  521. func (a *Allocation) Minutes() float64 {
  522. return a.End.Sub(a.Start).Minutes()
  523. }
  524. // Share adds the TotalCost of the given Allocation to the SharedCost of the
  525. // receiving Allocation. No Start, End, Window, or AllocationProperties are considered.
  526. // Neither Allocation is mutated; a new Allocation is always returned.
  527. func (a *Allocation) Share(that *Allocation) (*Allocation, error) {
  528. if that == nil {
  529. return a.Clone(), nil
  530. }
  531. if a == nil {
  532. return nil, fmt.Errorf("cannot share with nil Allocation")
  533. }
  534. agg := a.Clone()
  535. agg.SharedCost += that.TotalCost()
  536. return agg, nil
  537. }
  538. // String represents the given Allocation as a string
  539. func (a *Allocation) String() string {
  540. return fmt.Sprintf("%s%s=%.2f", a.Name, NewWindow(&a.Start, &a.End), a.TotalCost())
  541. }
  542. func (a *Allocation) add(that *Allocation) {
  543. if a == nil {
  544. log.Warningf("Allocation.AggregateBy: trying to add a nil receiver")
  545. return
  546. }
  547. // Generate keys for each allocation to allow for special logic to set the controller
  548. // in the case of keys matching but controllers not matching.
  549. aggByForKey := []string{"cluster", "node", "namespace", "pod", "container"}
  550. leftKey := a.generateKey(aggByForKey, nil)
  551. rightKey := a.generateKey(aggByForKey, nil)
  552. leftProperties := a.Properties
  553. rightProperties := that.Properties
  554. // Preserve string properties that are matching between the two allocations
  555. a.Properties = a.Properties.Intersection(that.Properties)
  556. // Overwrite regular intersection logic for the controller name property in the
  557. // case that the Allocation keys are the same but the controllers are not.
  558. if leftKey == rightKey &&
  559. leftProperties != nil &&
  560. rightProperties != nil &&
  561. leftProperties.Controller != rightProperties.Controller {
  562. if leftProperties.Controller == "" {
  563. a.Properties.Controller = rightProperties.Controller
  564. } else if rightProperties.Controller == "" {
  565. a.Properties.Controller = leftProperties.Controller
  566. } else {
  567. controllers := []string{
  568. leftProperties.Controller,
  569. rightProperties.Controller,
  570. }
  571. sort.Strings(controllers)
  572. a.Properties.Controller = controllers[0]
  573. }
  574. }
  575. // Expand the window to encompass both Allocations
  576. a.Window = a.Window.Expand(that.Window)
  577. // Sum non-cumulative fields by turning them into cumulative, adding them,
  578. // and then converting them back into averages after minutes have been
  579. // combined (just below).
  580. cpuReqCoreMins := a.CPUCoreRequestAverage * a.Minutes()
  581. cpuReqCoreMins += that.CPUCoreRequestAverage * that.Minutes()
  582. cpuUseCoreMins := a.CPUCoreUsageAverage * a.Minutes()
  583. cpuUseCoreMins += that.CPUCoreUsageAverage * that.Minutes()
  584. ramReqByteMins := a.RAMBytesRequestAverage * a.Minutes()
  585. ramReqByteMins += that.RAMBytesRequestAverage * that.Minutes()
  586. ramUseByteMins := a.RAMBytesUsageAverage * a.Minutes()
  587. ramUseByteMins += that.RAMBytesUsageAverage * that.Minutes()
  588. // Expand Start and End to be the "max" of among the given Allocations
  589. if that.Start.Before(a.Start) {
  590. a.Start = that.Start
  591. }
  592. if that.End.After(a.End) {
  593. a.End = that.End
  594. }
  595. // Convert cumulative request and usage back into rates
  596. // TODO:TEST write a unit test that fails if this is done incorrectly
  597. if a.Minutes() > 0 {
  598. a.CPUCoreRequestAverage = cpuReqCoreMins / a.Minutes()
  599. a.CPUCoreUsageAverage = cpuUseCoreMins / a.Minutes()
  600. a.RAMBytesRequestAverage = ramReqByteMins / a.Minutes()
  601. a.RAMBytesUsageAverage = ramUseByteMins / a.Minutes()
  602. } else {
  603. a.CPUCoreRequestAverage = 0.0
  604. a.CPUCoreUsageAverage = 0.0
  605. a.RAMBytesRequestAverage = 0.0
  606. a.RAMBytesUsageAverage = 0.0
  607. }
  608. // Sum all cumulative resource fields
  609. a.CPUCoreHours += that.CPUCoreHours
  610. a.GPUHours += that.GPUHours
  611. a.RAMByteHours += that.RAMByteHours
  612. a.NetworkTransferBytes += that.NetworkTransferBytes
  613. a.NetworkReceiveBytes += that.NetworkReceiveBytes
  614. // Sum all cumulative cost fields
  615. a.CPUCost += that.CPUCost
  616. a.GPUCost += that.GPUCost
  617. a.RAMCost += that.RAMCost
  618. a.NetworkCost += that.NetworkCost
  619. a.LoadBalancerCost += that.LoadBalancerCost
  620. a.SharedCost += that.SharedCost
  621. a.ExternalCost += that.ExternalCost
  622. // Sum PVAllocations
  623. a.PVs = a.PVs.Add(that.PVs)
  624. // Sum all cumulative adjustment fields
  625. a.CPUCostAdjustment += that.CPUCostAdjustment
  626. a.RAMCostAdjustment += that.RAMCostAdjustment
  627. a.GPUCostAdjustment += that.GPUCostAdjustment
  628. a.PVCostAdjustment += that.PVCostAdjustment
  629. a.NetworkCostAdjustment += that.NetworkCostAdjustment
  630. a.LoadBalancerCostAdjustment += that.LoadBalancerCostAdjustment
  631. // Any data that is in a "raw allocation only" is not valid in any
  632. // sort of cumulative Allocation (like one that is added).
  633. a.RawAllocationOnly = nil
  634. }
  635. // AllocationSet stores a set of Allocations, each with a unique name, that share
  636. // a window. An AllocationSet is mutable, so treat it like a threadsafe map.
  637. type AllocationSet struct {
  638. sync.RWMutex
  639. allocations map[string]*Allocation
  640. externalKeys map[string]bool
  641. idleKeys map[string]bool
  642. FromSource string // stores the name of the source used to compute the data
  643. Window Window
  644. Warnings []string
  645. Errors []string
  646. }
  647. // NewAllocationSet instantiates a new AllocationSet and, optionally, inserts
  648. // the given list of Allocations
  649. func NewAllocationSet(start, end time.Time, allocs ...*Allocation) *AllocationSet {
  650. as := &AllocationSet{
  651. allocations: map[string]*Allocation{},
  652. externalKeys: map[string]bool{},
  653. idleKeys: map[string]bool{},
  654. Window: NewWindow(&start, &end),
  655. }
  656. for _, a := range allocs {
  657. as.Insert(a)
  658. }
  659. return as
  660. }
  661. // AllocationAggregationOptions provide advanced functionality to AggregateBy, including
  662. // filtering results and sharing allocations. FilterFuncs are a list of match
  663. // functions such that, if any function fails, the allocation is ignored.
  664. // ShareFuncs are a list of match functions such that, if any function
  665. // succeeds, the allocation is marked as a shared resource. ShareIdle is a
  666. // simple flag for sharing idle resources.
  667. type AllocationAggregationOptions struct {
  668. FilterFuncs []AllocationMatchFunc
  669. IdleByNode bool
  670. LabelConfig *LabelConfig
  671. MergeUnallocated bool
  672. SharedHourlyCosts map[string]float64
  673. ShareFuncs []AllocationMatchFunc
  674. ShareIdle string
  675. ShareSplit string
  676. SplitIdle bool
  677. }
  678. // AggregateBy aggregates the Allocations in the given AllocationSet by the given
  679. // AllocationProperty. This will only be legal if the AllocationSet is divisible by the
  680. // given AllocationProperty; e.g. Containers can be divided by Namespace, but not vice-a-versa.
  681. func (as *AllocationSet) AggregateBy(aggregateBy []string, options *AllocationAggregationOptions) error {
  682. // The order of operations for aggregating allocations is as follows:
  683. // 1. Partition external, idle, and shared allocations into separate sets.
  684. // Also, create the aggSet into which the results will be aggregated.
  685. // 2. Compute sharing coefficients for idle and shared resources
  686. // a) if idle allocation is to be shared, compute idle coefficients
  687. // b) if idle allocation is NOT shared, but filters are present, compute
  688. // idle filtration coefficients for the purpose of only returning the
  689. // portion of idle allocation that would have been shared with the
  690. // unfiltered results. (See unit tests 5.a,b,c)
  691. // c) generate shared allocation for then given shared overhead, which
  692. // must happen after (2a) and (2b)
  693. // d) if there are shared resources, compute share coefficients
  694. // 3. Drop any allocation that fails any of the filters
  695. // 4. Distribute idle allocations according to the idle coefficients
  696. // 5. Generate aggregation key and insert allocation into the output set
  697. // 6. If idle is shared and resources are shared, some idle might be shared
  698. // with a shared resource. Distribute that to the shared resources
  699. // prior to sharing them with the aggregated results.
  700. // 7. Apply idle filtration coefficients from step (2b)
  701. // 8. Distribute shared allocations according to the share coefficients.
  702. // 9. If there are external allocations that can be aggregated into
  703. // the output (i.e. they can be used to generate a valid key for
  704. // the given properties) then aggregate; otherwise... ignore them?
  705. // 10. If the merge idle option is enabled, merge any remaining idle
  706. // allocations into a single idle allocation. If there was any idle
  707. // whose costs were not distributed because there was no usage of a
  708. // specific resource type, re-add the idle to the aggregation with
  709. // only that type.
  710. if as.IsEmpty() {
  711. return nil
  712. }
  713. if options == nil {
  714. options = &AllocationAggregationOptions{}
  715. }
  716. if options.LabelConfig == nil {
  717. options.LabelConfig = NewLabelConfig()
  718. }
  719. var undistributedIdleMap map[string]bool
  720. // If aggregateBy is nil, we don't aggregate anything. On the other hand,
  721. // an empty slice implies that we should aggregate everything. See
  722. // generateKey for why that makes sense.
  723. shouldAggregate := aggregateBy != nil
  724. shouldFilter := len(options.FilterFuncs) > 0
  725. shouldShare := len(options.SharedHourlyCosts) > 0 || len(options.ShareFuncs) > 0
  726. if !shouldAggregate && !shouldFilter && !shouldShare {
  727. // There is nothing for AggregateBy to do, so simply return nil
  728. return nil
  729. }
  730. // aggSet will collect the aggregated allocations
  731. aggSet := &AllocationSet{
  732. Window: as.Window.Clone(),
  733. }
  734. // externalSet will collect external allocations
  735. externalSet := &AllocationSet{
  736. Window: as.Window.Clone(),
  737. }
  738. // idleSet will be shared among aggSet after initial aggregation
  739. // is complete
  740. idleSet := &AllocationSet{
  741. Window: as.Window.Clone(),
  742. }
  743. // shareSet will be shared among aggSet after initial aggregation
  744. // is complete
  745. shareSet := &AllocationSet{
  746. Window: as.Window.Clone(),
  747. }
  748. as.Lock()
  749. defer as.Unlock()
  750. // (1) Loop and find all of the external, idle, and shared allocations. Add
  751. // them to their respective sets, removing them from the set of allocations
  752. // to aggregate.
  753. for _, alloc := range as.allocations {
  754. // External allocations get aggregated post-hoc (see step 6) and do
  755. // not necessarily contain complete sets of properties, so they are
  756. // moved to a separate AllocationSet.
  757. if alloc.IsExternal() {
  758. delete(as.externalKeys, alloc.Name)
  759. delete(as.allocations, alloc.Name)
  760. externalSet.Insert(alloc)
  761. continue
  762. }
  763. // Idle allocations should be separated into idleSet if they are to be
  764. // shared later on. If they are not to be shared, then add them to the
  765. // aggSet like any other allocation.
  766. if alloc.IsIdle() {
  767. delete(as.idleKeys, alloc.Name)
  768. delete(as.allocations, alloc.Name)
  769. if options.ShareIdle == ShareEven || options.ShareIdle == ShareWeighted {
  770. idleSet.Insert(alloc)
  771. } else {
  772. aggSet.Insert(alloc)
  773. }
  774. continue
  775. }
  776. // Shared allocations must be identified and separated prior to
  777. // aggregation and filtering. That is, if any of the ShareFuncs return
  778. // true for the allocation, then move it to shareSet.
  779. for _, sf := range options.ShareFuncs {
  780. if sf(alloc) {
  781. delete(as.idleKeys, alloc.Name)
  782. delete(as.allocations, alloc.Name)
  783. shareSet.Insert(alloc)
  784. break
  785. }
  786. }
  787. }
  788. // It's possible that no more un-shared, non-idle, non-external allocations
  789. // remain at this point. This always results in an emptySet, so return early.
  790. if len(as.allocations) == 0 {
  791. emptySet := &AllocationSet{
  792. Window: as.Window.Clone(),
  793. }
  794. as.allocations = emptySet.allocations
  795. return nil
  796. }
  797. // (2) In order to correctly share idle and shared costs, we first compute
  798. // sharing coefficients, which represent the proportion of each cost to
  799. // share with each allocation. Idle allocations are shared per-cluster or per-node,
  800. // per-allocation, and per-resource, while shared resources are shared per-
  801. // allocation only.
  802. //
  803. // For an idleCoefficient example, the entries:
  804. // [cluster1][cluster1/namespace1/pod1/container1][cpu] = 0.166667
  805. // [cluster1][cluster1/namespace1/pod1/container1][gpu] = 0.166667
  806. // [cluster1][cluster1/namespace1/pod1/container1][ram] = 0.687500
  807. // mean that the allocation "cluster1/namespace1/pod1/container1" will
  808. // receive 16.67% of cluster1's idle CPU and GPU costs and 68.75% of its
  809. // RAM costs.
  810. //
  811. // For a shareCoefficient example, the entries:
  812. // [namespace2] = 0.666667
  813. // [__filtered__] = 0.333333
  814. // mean that the post-aggregation allocation "namespace2" will receive
  815. // 66.67% of the shared resource costs, while the remaining 33.33% will
  816. // be filtered out, as they were shared with allocations that did not pass
  817. // one of the given filters.
  818. //
  819. // In order to maintain stable results when multiple operations are being
  820. // carried out (e.g. sharing idle, sharing resources, and filtering) these
  821. // coefficients are computed for the full set of allocations prior to
  822. // adding shared overhead and prior to applying filters.
  823. var err error
  824. // (2a) If there are idle costs to be shared, compute the coefficients for
  825. // sharing them among the non-idle, non-aggregated allocations (including
  826. // the shared allocations).
  827. var idleCoefficients map[string]map[string]map[string]float64
  828. if idleSet.Length() > 0 && options.ShareIdle != ShareNone {
  829. idleCoefficients, undistributedIdleMap, err = computeIdleCoeffs(options, as, shareSet)
  830. if err != nil {
  831. log.Warningf("AllocationSet.AggregateBy: compute idle coeff: %s", err)
  832. return fmt.Errorf("error computing idle coefficients: %s", err)
  833. }
  834. }
  835. // (2b) If idle costs are not to be shared, but there are filters, then we
  836. // need to track the amount of each idle allocation to "filter" in order to
  837. // maintain parity with the results when idle is shared. That is, we want
  838. // to return only the idle costs that would have been shared with the given
  839. // results, even if the filter had not been applied.
  840. //
  841. // For example, consider these results from aggregating by namespace with
  842. // two clusters:
  843. //
  844. // namespace1: 25.00
  845. // namespace2: 30.00
  846. // namespace3: 15.00
  847. // idle: 30.00
  848. //
  849. // When we then filter by cluster==cluster1, namespaces 2 and 3 are
  850. // reduced by the amount that existed on cluster2. Then, idle must also be
  851. // reduced by the relevant amount:
  852. //
  853. // namespace1: 25.00
  854. // namespace2: 15.00
  855. // idle: 20.00
  856. //
  857. // Note that this can happen for any field, not just cluster, so we again
  858. // need to track this on a per-cluster or per-node, per-allocation, per-resource basis.
  859. var idleFiltrationCoefficients map[string]map[string]map[string]float64
  860. if len(options.FilterFuncs) > 0 && options.ShareIdle == ShareNone {
  861. idleFiltrationCoefficients, _, err = computeIdleCoeffs(options, as, shareSet)
  862. if err != nil {
  863. return fmt.Errorf("error computing idle filtration coefficients: %s", err)
  864. }
  865. }
  866. // (2c) Convert SharedHourlyCosts to Allocations in the shareSet. This must
  867. // come after idle coefficients are computes so that allocations generated
  868. // by shared overhead do not skew the idle coefficient computation.
  869. for name, cost := range options.SharedHourlyCosts {
  870. if cost > 0.0 {
  871. hours := as.Resolution().Hours()
  872. // If set ends in the future, adjust hours accordingly
  873. diff := time.Since(as.End())
  874. if diff < 0.0 {
  875. hours += diff.Hours()
  876. }
  877. totalSharedCost := cost * hours
  878. shareSet.Insert(&Allocation{
  879. Name: fmt.Sprintf("%s/%s", name, SharedSuffix),
  880. Start: as.Start(),
  881. End: as.End(),
  882. SharedCost: totalSharedCost,
  883. Properties: &AllocationProperties{Cluster: SharedSuffix}, // The allocation needs to belong to a cluster,but it really doesn't matter which one, so just make it clear.
  884. })
  885. }
  886. }
  887. // (2d) Compute share coefficients for shared resources. These are computed
  888. // after idle coefficients, and are computed for the aggregated allocations
  889. // of the main allocation set. See above for details and an example.
  890. var shareCoefficients map[string]float64
  891. if shareSet.Length() > 0 {
  892. shareCoefficients, err = computeShareCoeffs(aggregateBy, options, as)
  893. if err != nil {
  894. return fmt.Errorf("error computing share coefficients: %s", err)
  895. }
  896. }
  897. // (3-5) Filter, distribute idle cost, and aggregate (in that order)
  898. for _, alloc := range as.allocations {
  899. idleId, err := alloc.getIdleId(options)
  900. if err != nil {
  901. log.DedupedWarningf(3, "AllocationSet.AggregateBy: missing idleId for allocation: %s", alloc.Name)
  902. }
  903. skip := false
  904. // (3) If any of the filter funcs fail, immediately skip the allocation.
  905. for _, ff := range options.FilterFuncs {
  906. if !ff(alloc) {
  907. skip = true
  908. break
  909. }
  910. }
  911. if skip {
  912. // If we are tracking idle filtration coefficients, delete the
  913. // entry corresponding to the filtered allocation. (Deleting the
  914. // entry will result in that proportional amount being removed
  915. // from the idle allocation at the end of the process.)
  916. if idleFiltrationCoefficients != nil {
  917. if ifcc, ok := idleFiltrationCoefficients[idleId]; ok {
  918. delete(ifcc, alloc.Name)
  919. }
  920. }
  921. continue
  922. }
  923. // (4) Distribute idle allocations according to the idle coefficients
  924. // NOTE: if idle allocation is off (i.e. ShareIdle == ShareNone) then
  925. // all idle allocations will be in the aggSet at this point, so idleSet
  926. // will be empty and we won't enter this block.
  927. if idleSet.Length() > 0 {
  928. // Distribute idle allocations by coefficient per-idleId, per-allocation
  929. for _, idleAlloc := range idleSet.allocations {
  930. // Only share idle if the idleId matches; i.e. the allocation
  931. // is from the same idleId as the idle costs
  932. iaidleId, err := idleAlloc.getIdleId(options)
  933. if err != nil {
  934. log.Errorf("AllocationSet.AggregateBy: Idle allocation is missing idleId %s", idleAlloc.Name)
  935. return err
  936. }
  937. if iaidleId != idleId {
  938. continue
  939. }
  940. // Make sure idle coefficients exist
  941. if _, ok := idleCoefficients[idleId]; !ok {
  942. log.Warningf("AllocationSet.AggregateBy: error getting idle coefficient: no idleId '%s' for '%s'", idleId, alloc.Name)
  943. continue
  944. }
  945. if _, ok := idleCoefficients[idleId][alloc.Name]; !ok {
  946. log.Warningf("AllocationSet.AggregateBy: error getting idle coefficient for '%s'", alloc.Name)
  947. continue
  948. }
  949. alloc.CPUCoreHours += idleAlloc.CPUCoreHours * idleCoefficients[idleId][alloc.Name]["cpu"]
  950. alloc.GPUHours += idleAlloc.GPUHours * idleCoefficients[idleId][alloc.Name]["gpu"]
  951. alloc.RAMByteHours += idleAlloc.RAMByteHours * idleCoefficients[idleId][alloc.Name]["ram"]
  952. idleCPUCost := idleAlloc.CPUCost * idleCoefficients[idleId][alloc.Name]["cpu"]
  953. idleGPUCost := idleAlloc.GPUCost * idleCoefficients[idleId][alloc.Name]["gpu"]
  954. idleRAMCost := idleAlloc.RAMCost * idleCoefficients[idleId][alloc.Name]["ram"]
  955. alloc.CPUCost += idleCPUCost
  956. alloc.GPUCost += idleGPUCost
  957. alloc.RAMCost += idleRAMCost
  958. }
  959. }
  960. // (5) generate key to use for aggregation-by-key and allocation name
  961. key := alloc.generateKey(aggregateBy, options.LabelConfig)
  962. alloc.Name = key
  963. if options.MergeUnallocated && alloc.IsUnallocated() {
  964. alloc.Name = UnallocatedSuffix
  965. }
  966. // Inserting the allocation with the generated key for a name will
  967. // perform the actual basic aggregation step.
  968. aggSet.Insert(alloc)
  969. }
  970. // (6) If idle is shared and resources are shared, it's possible that some
  971. // amount of idle cost will be shared with a shared resource. Distribute
  972. // that idle allocation, if it exists, to the respective shared allocations
  973. // before sharing with the aggregated allocations.
  974. if idleSet.Length() > 0 && shareSet.Length() > 0 {
  975. for _, alloc := range shareSet.allocations {
  976. idleId, err := alloc.getIdleId(options)
  977. if err != nil {
  978. log.DedupedWarningf(3, "AllocationSet.AggregateBy: missing idleId for allocation: %s", alloc.Name)
  979. }
  980. // Distribute idle allocations by coefficient per-idleId, per-allocation
  981. for _, idleAlloc := range idleSet.allocations {
  982. // Only share idle if the idleId matches; i.e. the allocation
  983. // is from the same idleId as the idle costs
  984. iaidleId, _ := idleAlloc.getIdleId(options)
  985. if iaidleId != idleId {
  986. continue
  987. }
  988. // Make sure idle coefficients exist
  989. if _, ok := idleCoefficients[idleId]; !ok {
  990. log.Warningf("AllocationSet.AggregateBy: error getting idle coefficient: no idleId '%s' for '%s'", idleId, alloc.Name)
  991. continue
  992. }
  993. if _, ok := idleCoefficients[idleId][alloc.Name]; !ok {
  994. log.Warningf("AllocationSet.AggregateBy: error getting idle coefficient for '%s'", alloc.Name)
  995. continue
  996. }
  997. alloc.CPUCoreHours += idleAlloc.CPUCoreHours * idleCoefficients[idleId][alloc.Name]["cpu"]
  998. alloc.GPUHours += idleAlloc.GPUHours * idleCoefficients[idleId][alloc.Name]["gpu"]
  999. alloc.RAMByteHours += idleAlloc.RAMByteHours * idleCoefficients[idleId][alloc.Name]["ram"]
  1000. idleCPUCost := idleAlloc.CPUCost * idleCoefficients[idleId][alloc.Name]["cpu"]
  1001. idleGPUCost := idleAlloc.GPUCost * idleCoefficients[idleId][alloc.Name]["gpu"]
  1002. idleRAMCost := idleAlloc.RAMCost * idleCoefficients[idleId][alloc.Name]["ram"]
  1003. alloc.CPUCost += idleCPUCost
  1004. alloc.GPUCost += idleGPUCost
  1005. alloc.RAMCost += idleRAMCost
  1006. }
  1007. }
  1008. }
  1009. // groupingIdleFiltrationCoeffs is used to track per-resource idle
  1010. // coefficients on a cluster-by-cluster or node-by-node basis depending
  1011. // on the IdleByNode option. It is, essentailly, an aggregation of
  1012. // idleFiltrationCoefficients after they have been
  1013. // filtered above (in step 3)
  1014. var groupingIdleFiltrationCoeffs map[string]map[string]float64
  1015. if idleFiltrationCoefficients != nil {
  1016. groupingIdleFiltrationCoeffs = map[string]map[string]float64{}
  1017. for idleId, m := range idleFiltrationCoefficients {
  1018. if _, ok := groupingIdleFiltrationCoeffs[idleId]; !ok {
  1019. groupingIdleFiltrationCoeffs[idleId] = map[string]float64{
  1020. "cpu": 0.0,
  1021. "gpu": 0.0,
  1022. "ram": 0.0,
  1023. }
  1024. }
  1025. for _, n := range m {
  1026. for resource, val := range n {
  1027. groupingIdleFiltrationCoeffs[idleId][resource] += val
  1028. }
  1029. }
  1030. }
  1031. }
  1032. // (7) If we have both un-shared idle allocations and idle filtration
  1033. // coefficients then apply those. See step (2b) for an example.
  1034. if len(aggSet.idleKeys) > 0 && groupingIdleFiltrationCoeffs != nil {
  1035. for idleKey := range aggSet.idleKeys {
  1036. idleAlloc := aggSet.Get(idleKey)
  1037. iaidleId, err := idleAlloc.getIdleId(options)
  1038. if err != nil {
  1039. log.Errorf("AllocationSet.AggregateBy: Idle allocation is missing idleId %s", idleAlloc.Name)
  1040. return err
  1041. }
  1042. if resourceCoeffs, ok := groupingIdleFiltrationCoeffs[iaidleId]; ok {
  1043. idleAlloc.CPUCost *= resourceCoeffs["cpu"]
  1044. idleAlloc.CPUCoreHours *= resourceCoeffs["cpu"]
  1045. idleAlloc.RAMCost *= resourceCoeffs["ram"]
  1046. idleAlloc.RAMByteHours *= resourceCoeffs["ram"]
  1047. }
  1048. }
  1049. }
  1050. // (8) Distribute shared allocations according to the share coefficients.
  1051. if shareSet.Length() > 0 {
  1052. for _, alloc := range aggSet.allocations {
  1053. for _, sharedAlloc := range shareSet.allocations {
  1054. if _, ok := shareCoefficients[alloc.Name]; !ok {
  1055. if !alloc.IsIdle() {
  1056. log.Warningf("AllocationSet.AggregateBy: error getting share coefficienct for '%s'", alloc.Name)
  1057. }
  1058. continue
  1059. }
  1060. alloc.SharedCost += sharedAlloc.TotalCost() * shareCoefficients[alloc.Name]
  1061. }
  1062. }
  1063. }
  1064. // (9) Aggregate external allocations into aggregated allocations. This may
  1065. // not be possible for every external allocation, but attempt to find an
  1066. // exact key match, given each external allocation's proerties, and
  1067. // aggregate if an exact match is found.
  1068. for _, alloc := range externalSet.allocations {
  1069. skip := false
  1070. for _, ff := range options.FilterFuncs {
  1071. if !ff(alloc) {
  1072. skip = true
  1073. break
  1074. }
  1075. }
  1076. if !skip {
  1077. key := alloc.generateKey(aggregateBy, options.LabelConfig)
  1078. alloc.Name = key
  1079. aggSet.Insert(alloc)
  1080. }
  1081. }
  1082. // (10) Combine all idle allocations into a single "__idle__" allocation
  1083. if !options.SplitIdle {
  1084. for _, idleAlloc := range aggSet.IdleAllocations() {
  1085. aggSet.Delete(idleAlloc.Name)
  1086. idleAlloc.Name = IdleSuffix
  1087. aggSet.Insert(idleAlloc)
  1088. }
  1089. }
  1090. // In the edge case that some idle has not been distributed because
  1091. // there is no usage of that resource type, add idle back to
  1092. // aggregations with only that cost applied.
  1093. // E.g. in the case where we have a result that looks like this on the
  1094. // frontend:
  1095. // Name CPU GPU RAM
  1096. // __idle__ $10 $12 $6
  1097. // kubecost $2 $0 $1
  1098. // Sharing idle weighted would result in no idle GPU cost being
  1099. // distributed, because the coefficient for the kubecost GPU cost would
  1100. // be zero. Thus, instead we re-add idle to the aggSet with distributed
  1101. // costs zeroed out but the undistributed costs left in.
  1102. // Name CPU GPU RAM
  1103. // __idle__ $0 $12 $0
  1104. // kubecost $12 $0 $7
  1105. if idleSet.Length() > 0 && !options.SplitIdle {
  1106. if undistributedIdleMap["cpu"] || undistributedIdleMap["gpu"] || undistributedIdleMap["ram"] {
  1107. for _, idleAlloc := range idleSet.allocations {
  1108. skip := false
  1109. // if the idle does not apply to the non-filtered values, skip it
  1110. for _, ff := range options.FilterFuncs {
  1111. if !ff(idleAlloc) {
  1112. skip = true
  1113. break
  1114. }
  1115. }
  1116. if skip {
  1117. continue
  1118. }
  1119. // if the idle doesn't have a cost to be shared, also skip it
  1120. if idleAlloc.CPUCost != 0 && idleAlloc.GPUCost != 0 && idleAlloc.RAMCost != 0 {
  1121. // artificially set the already shared costs to zero
  1122. if !undistributedIdleMap["cpu"] {
  1123. idleAlloc.CPUCost = 0
  1124. }
  1125. if !undistributedIdleMap["gpu"] {
  1126. idleAlloc.GPUCost = 0
  1127. }
  1128. if !undistributedIdleMap["ram"] {
  1129. idleAlloc.RAMCost = 0
  1130. }
  1131. idleAlloc.Name = IdleSuffix
  1132. aggSet.Insert(idleAlloc)
  1133. }
  1134. }
  1135. }
  1136. }
  1137. as.allocations = aggSet.allocations
  1138. return nil
  1139. }
  1140. func computeShareCoeffs(aggregateBy []string, options *AllocationAggregationOptions, as *AllocationSet) (map[string]float64, error) {
  1141. // Compute coeffs by totalling per-allocation, then dividing by the total.
  1142. coeffs := map[string]float64{}
  1143. // Compute totals for all allocations
  1144. total := 0.0
  1145. // ShareEven counts each aggregation with even weight, whereas ShareWeighted
  1146. // counts each aggregation proportionally to its respective costs
  1147. shareType := options.ShareSplit
  1148. // Record allocation values first, then normalize by totals to get percentages
  1149. for _, alloc := range as.allocations {
  1150. if alloc.IsIdle() {
  1151. // Skip idle allocations in coefficient calculation
  1152. continue
  1153. }
  1154. if alloc.IsUnmounted() {
  1155. // Skip unmounted allocations in coefficient calculation
  1156. continue
  1157. }
  1158. // Determine the post-aggregation key under which the allocation will
  1159. // be shared.
  1160. name := alloc.generateKey(aggregateBy, options.LabelConfig)
  1161. // If the current allocation will be filtered out in step 3, contribute
  1162. // its share of the shared coefficient to a "__filtered__" bin, which
  1163. // will ultimately be dropped. This step ensures that the shared cost
  1164. // of a non-filtered allocation will be conserved even when the filter
  1165. // is removed. (Otherwise, all the shared cost will get redistributed
  1166. // over the unfiltered results, inflating their shared costs.)
  1167. filtered := false
  1168. for _, ff := range options.FilterFuncs {
  1169. if !ff(alloc) {
  1170. filtered = true
  1171. break
  1172. }
  1173. }
  1174. if filtered {
  1175. name = "__filtered__"
  1176. }
  1177. if shareType == ShareEven {
  1178. // Even distribution is not additive - set to 1.0 for everything
  1179. coeffs[name] = 1.0
  1180. // Total for even distribution is always the number of coefficients
  1181. total = float64(len(coeffs))
  1182. } else {
  1183. // Both are additive for weighted distribution, where each
  1184. // cumulative coefficient will be divided by the total.
  1185. coeffs[name] += alloc.TotalCost()
  1186. total += alloc.TotalCost()
  1187. }
  1188. }
  1189. // Normalize coefficients by totals
  1190. for a := range coeffs {
  1191. if coeffs[a] > 0 && total > 0 {
  1192. coeffs[a] /= total
  1193. } else {
  1194. log.Warningf("ETL: invalid values for shared coefficients: %d, %d", coeffs[a], total)
  1195. coeffs[a] = 0.0
  1196. }
  1197. }
  1198. return coeffs, nil
  1199. }
  1200. func computeIdleCoeffs(options *AllocationAggregationOptions, as *AllocationSet, shareSet *AllocationSet) (map[string]map[string]map[string]float64, map[string]bool, error) {
  1201. types := []string{"cpu", "gpu", "ram"}
  1202. undistributedIdleMap := map[string]bool{
  1203. "cpu": true,
  1204. "gpu": true,
  1205. "ram": true,
  1206. }
  1207. // Compute idle coefficients, then save them in AllocationAggregationOptions
  1208. coeffs := map[string]map[string]map[string]float64{}
  1209. // Compute totals per resource for CPU, GPU, RAM, and PV
  1210. totals := map[string]map[string]float64{}
  1211. // ShareEven counts each allocation with even weight, whereas ShareWeighted
  1212. // counts each allocation proportionally to its respective costs
  1213. shareType := options.ShareIdle
  1214. // Record allocation values first, then normalize by totals to get percentages
  1215. for _, alloc := range as.allocations {
  1216. if alloc.IsIdle() {
  1217. // Skip idle allocations in coefficient calculation
  1218. continue
  1219. }
  1220. idleId, err := alloc.getIdleId(options)
  1221. if err != nil {
  1222. log.DedupedWarningf(3, "Missing Idle Key for %s", alloc.Name)
  1223. }
  1224. // get the name key for the allocation
  1225. name := alloc.Name
  1226. // Create key based tables if they don't exist
  1227. if _, ok := coeffs[idleId]; !ok {
  1228. coeffs[idleId] = map[string]map[string]float64{}
  1229. }
  1230. if _, ok := totals[idleId]; !ok {
  1231. totals[idleId] = map[string]float64{}
  1232. }
  1233. if _, ok := coeffs[idleId][name]; !ok {
  1234. coeffs[idleId][name] = map[string]float64{}
  1235. }
  1236. if shareType == ShareEven {
  1237. for _, r := range types {
  1238. // Not additive - hard set to 1.0
  1239. coeffs[idleId][name][r] = 1.0
  1240. // totals are additive
  1241. totals[idleId][r] += 1.0
  1242. }
  1243. } else {
  1244. coeffs[idleId][name]["cpu"] += alloc.CPUTotalCost()
  1245. coeffs[idleId][name]["gpu"] += alloc.GPUTotalCost()
  1246. coeffs[idleId][name]["ram"] += alloc.RAMTotalCost()
  1247. totals[idleId]["cpu"] += alloc.CPUTotalCost()
  1248. totals[idleId]["gpu"] += alloc.GPUTotalCost()
  1249. totals[idleId]["ram"] += alloc.RAMTotalCost()
  1250. }
  1251. }
  1252. // Do the same for shared allocations
  1253. for _, alloc := range shareSet.allocations {
  1254. if alloc.IsIdle() {
  1255. // Skip idle allocations in coefficient calculation
  1256. continue
  1257. }
  1258. // idleId will be providerId or cluster
  1259. idleId, err := alloc.getIdleId(options)
  1260. if err != nil {
  1261. log.DedupedWarningf(3, "Missing Idle Key in share set for %s", alloc.Name)
  1262. }
  1263. // get the name key for the allocation
  1264. name := alloc.Name
  1265. // Create idleId based tables if they don't exist
  1266. if _, ok := coeffs[idleId]; !ok {
  1267. coeffs[idleId] = map[string]map[string]float64{}
  1268. }
  1269. if _, ok := totals[idleId]; !ok {
  1270. totals[idleId] = map[string]float64{}
  1271. }
  1272. if _, ok := coeffs[idleId][name]; !ok {
  1273. coeffs[idleId][name] = map[string]float64{}
  1274. }
  1275. if shareType == ShareEven {
  1276. for _, r := range types {
  1277. // Not additive - hard set to 1.0
  1278. coeffs[idleId][name][r] = 1.0
  1279. // totals are additive
  1280. totals[idleId][r] += 1.0
  1281. }
  1282. } else {
  1283. coeffs[idleId][name]["cpu"] += alloc.CPUTotalCost()
  1284. coeffs[idleId][name]["gpu"] += alloc.GPUTotalCost()
  1285. coeffs[idleId][name]["ram"] += alloc.RAMTotalCost()
  1286. totals[idleId]["cpu"] += alloc.CPUTotalCost()
  1287. totals[idleId]["gpu"] += alloc.GPUTotalCost()
  1288. totals[idleId]["ram"] += alloc.RAMTotalCost()
  1289. }
  1290. }
  1291. // Normalize coefficients by totals
  1292. for c := range coeffs {
  1293. for a := range coeffs[c] {
  1294. for _, r := range types {
  1295. if coeffs[c][a][r] > 0 && totals[c][r] > 0 {
  1296. coeffs[c][a][r] /= totals[c][r]
  1297. undistributedIdleMap[r] = false
  1298. }
  1299. }
  1300. }
  1301. }
  1302. return coeffs, undistributedIdleMap, nil
  1303. }
  1304. // getIdleId returns the providerId or cluster of an Allocation depending on the IdleByNode
  1305. // option in the AllocationAggregationOptions and an error if the respective field is missing
  1306. func (a *Allocation) getIdleId(options *AllocationAggregationOptions) (string, error) {
  1307. var idleId string
  1308. if options.IdleByNode {
  1309. // Key allocations to ProviderId to match against node
  1310. idleId = a.Properties.ProviderID
  1311. if idleId == "" {
  1312. return idleId, fmt.Errorf("ProviderId is not set")
  1313. }
  1314. } else {
  1315. // key the allocations by cluster id
  1316. idleId = a.Properties.Cluster
  1317. if idleId == "" {
  1318. return idleId, fmt.Errorf("ClusterProp is not set")
  1319. }
  1320. }
  1321. return idleId, nil
  1322. }
  1323. func (a *Allocation) generateKey(aggregateBy []string, labelConfig *LabelConfig) string {
  1324. if a == nil {
  1325. return ""
  1326. }
  1327. if labelConfig == nil {
  1328. labelConfig = NewLabelConfig()
  1329. }
  1330. // Names will ultimately be joined into a single name, which uniquely
  1331. // identifies allocations.
  1332. names := []string{}
  1333. for _, agg := range aggregateBy {
  1334. switch true {
  1335. case agg == AllocationClusterProp:
  1336. names = append(names, a.Properties.Cluster)
  1337. case agg == AllocationNodeProp:
  1338. names = append(names, a.Properties.Node)
  1339. case agg == AllocationNamespaceProp:
  1340. names = append(names, a.Properties.Namespace)
  1341. case agg == AllocationControllerKindProp:
  1342. controllerKind := a.Properties.ControllerKind
  1343. if controllerKind == "" {
  1344. // Indicate that allocation has no controller
  1345. controllerKind = UnallocatedSuffix
  1346. }
  1347. names = append(names, controllerKind)
  1348. case agg == AllocationDaemonSetProp || agg == AllocationStatefulSetProp || agg == AllocationDeploymentProp || agg == AllocationJobProp:
  1349. controller := a.Properties.Controller
  1350. if agg != a.Properties.ControllerKind || controller == "" {
  1351. // The allocation does not have the specified controller kind
  1352. controller = UnallocatedSuffix
  1353. }
  1354. names = append(names, controller)
  1355. case agg == AllocationControllerProp:
  1356. controller := a.Properties.Controller
  1357. if controller == "" {
  1358. // Indicate that allocation has no controller
  1359. controller = UnallocatedSuffix
  1360. } else if a.Properties.ControllerKind != "" {
  1361. controller = fmt.Sprintf("%s:%s", a.Properties.ControllerKind, controller)
  1362. }
  1363. names = append(names, controller)
  1364. case agg == AllocationPodProp:
  1365. names = append(names, a.Properties.Pod)
  1366. case agg == AllocationContainerProp:
  1367. names = append(names, a.Properties.Container)
  1368. case agg == AllocationServiceProp:
  1369. services := a.Properties.Services
  1370. if len(services) == 0 {
  1371. // Indicate that allocation has no services
  1372. names = append(names, UnallocatedSuffix)
  1373. } else {
  1374. // This just uses the first service
  1375. for _, service := range services {
  1376. names = append(names, service)
  1377. break
  1378. }
  1379. }
  1380. case strings.HasPrefix(agg, "label:"):
  1381. labels := a.Properties.Labels
  1382. if labels == nil {
  1383. names = append(names, UnallocatedSuffix)
  1384. } else {
  1385. labelName := labelConfig.Sanitize(strings.TrimPrefix(agg, "label:"))
  1386. if labelValue, ok := labels[labelName]; ok {
  1387. names = append(names, fmt.Sprintf("%s=%s", labelName, labelValue))
  1388. } else {
  1389. names = append(names, UnallocatedSuffix)
  1390. }
  1391. }
  1392. case strings.HasPrefix(agg, "annotation:"):
  1393. annotations := a.Properties.Annotations
  1394. if annotations == nil {
  1395. names = append(names, UnallocatedSuffix)
  1396. } else {
  1397. annotationName := labelConfig.Sanitize(strings.TrimPrefix(agg, "annotation:"))
  1398. if annotationValue, ok := annotations[annotationName]; ok {
  1399. names = append(names, fmt.Sprintf("%s=%s", annotationName, annotationValue))
  1400. } else {
  1401. names = append(names, UnallocatedSuffix)
  1402. }
  1403. }
  1404. case agg == AllocationDepartmentProp:
  1405. labels := a.Properties.Labels
  1406. if labels == nil {
  1407. names = append(names, UnallocatedSuffix)
  1408. } else {
  1409. labelNames := strings.Split(labelConfig.DepartmentLabel, ",")
  1410. for _, labelName := range labelNames {
  1411. labelName = labelConfig.Sanitize(labelName)
  1412. if labelValue, ok := labels[labelName]; ok {
  1413. names = append(names, labelValue)
  1414. } else {
  1415. names = append(names, UnallocatedSuffix)
  1416. }
  1417. }
  1418. }
  1419. case agg == AllocationEnvironmentProp:
  1420. labels := a.Properties.Labels
  1421. if labels == nil {
  1422. names = append(names, UnallocatedSuffix)
  1423. } else {
  1424. labelNames := strings.Split(labelConfig.EnvironmentLabel, ",")
  1425. for _, labelName := range labelNames {
  1426. labelName = labelConfig.Sanitize(labelName)
  1427. if labelValue, ok := labels[labelName]; ok {
  1428. names = append(names, labelValue)
  1429. } else {
  1430. names = append(names, UnallocatedSuffix)
  1431. }
  1432. }
  1433. }
  1434. case agg == AllocationOwnerProp:
  1435. labels := a.Properties.Labels
  1436. if labels == nil {
  1437. names = append(names, UnallocatedSuffix)
  1438. } else {
  1439. labelNames := strings.Split(labelConfig.OwnerLabel, ",")
  1440. for _, labelName := range labelNames {
  1441. labelName = labelConfig.Sanitize(labelName)
  1442. if labelValue, ok := labels[labelName]; ok {
  1443. names = append(names, labelValue)
  1444. } else {
  1445. names = append(names, UnallocatedSuffix)
  1446. }
  1447. }
  1448. }
  1449. case agg == AllocationProductProp:
  1450. labels := a.Properties.Labels
  1451. if labels == nil {
  1452. names = append(names, UnallocatedSuffix)
  1453. } else {
  1454. labelNames := strings.Split(labelConfig.ProductLabel, ",")
  1455. for _, labelName := range labelNames {
  1456. labelName = labelConfig.Sanitize(labelName)
  1457. if labelValue, ok := labels[labelName]; ok {
  1458. names = append(names, labelValue)
  1459. } else {
  1460. names = append(names, UnallocatedSuffix)
  1461. }
  1462. }
  1463. }
  1464. case agg == AllocationTeamProp:
  1465. labels := a.Properties.Labels
  1466. if labels == nil {
  1467. names = append(names, UnallocatedSuffix)
  1468. } else {
  1469. labelNames := strings.Split(labelConfig.TeamLabel, ",")
  1470. for _, labelName := range labelNames {
  1471. labelName = labelConfig.Sanitize(labelName)
  1472. if labelValue, ok := labels[labelName]; ok {
  1473. names = append(names, labelValue)
  1474. } else {
  1475. names = append(names, UnallocatedSuffix)
  1476. }
  1477. }
  1478. }
  1479. default:
  1480. // This case should never be reached, as input up until this point
  1481. // should be checked and rejected if invalid. But if we do get a
  1482. // value we don't recognize, log a warning.
  1483. log.Warningf("AggregateBy: illegal aggregation parameter: %s", agg)
  1484. }
  1485. }
  1486. return strings.Join(names, "/")
  1487. }
  1488. // Clone returns a new AllocationSet with a deep copy of the given
  1489. // AllocationSet's allocations.
  1490. func (as *AllocationSet) Clone() *AllocationSet {
  1491. if as == nil {
  1492. return nil
  1493. }
  1494. as.RLock()
  1495. defer as.RUnlock()
  1496. allocs := map[string]*Allocation{}
  1497. for k, v := range as.allocations {
  1498. allocs[k] = v.Clone()
  1499. }
  1500. externalKeys := map[string]bool{}
  1501. for k, v := range as.externalKeys {
  1502. externalKeys[k] = v
  1503. }
  1504. idleKeys := map[string]bool{}
  1505. for k, v := range as.idleKeys {
  1506. idleKeys[k] = v
  1507. }
  1508. return &AllocationSet{
  1509. allocations: allocs,
  1510. externalKeys: externalKeys,
  1511. idleKeys: idleKeys,
  1512. Window: as.Window.Clone(),
  1513. }
  1514. }
  1515. // ComputeIdleAllocations computes the idle allocations for the AllocationSet,
  1516. // given a set of Assets. Ideally, assetSet should contain only Nodes, but if
  1517. // it contains other Assets, they will be ignored; only CPU, GPU and RAM are
  1518. // considered for idle allocation. If the Nodes have adjustments, then apply
  1519. // the adjustments proportionally to each of the resources so that total
  1520. // allocation with idle reflects the adjusted node costs. One idle allocation
  1521. // per-cluster will be computed and returned, keyed by cluster_id.
  1522. func (as *AllocationSet) ComputeIdleAllocations(assetSet *AssetSet) (map[string]*Allocation, error) {
  1523. if as == nil {
  1524. return nil, fmt.Errorf("cannot compute idle allocation for nil AllocationSet")
  1525. }
  1526. if assetSet == nil {
  1527. return nil, fmt.Errorf("cannot compute idle allocation with nil AssetSet")
  1528. }
  1529. if !as.Window.Equal(assetSet.Window) {
  1530. return nil, fmt.Errorf("cannot compute idle allocation for sets with mismatched windows: %s != %s", as.Window, assetSet.Window)
  1531. }
  1532. window := as.Window
  1533. // Build a map of cumulative cluster asset costs, per resource; i.e.
  1534. // cluster-to-{cpu|gpu|ram}-to-cost.
  1535. assetClusterResourceCosts := map[string]map[string]float64{}
  1536. assetSet.Each(func(key string, a Asset) {
  1537. if node, ok := a.(*Node); ok {
  1538. if _, ok := assetClusterResourceCosts[node.Properties().Cluster]; !ok {
  1539. assetClusterResourceCosts[node.Properties().Cluster] = map[string]float64{}
  1540. }
  1541. // adjustmentRate is used to scale resource costs proportionally
  1542. // by the adjustment. This is necessary because we only get one
  1543. // adjustment per Node, not one per-resource-per-Node.
  1544. //
  1545. // e.g. total cost = $90, adjustment = -$10 => 0.9
  1546. // e.g. total cost = $150, adjustment = -$300 => 0.3333
  1547. // e.g. total cost = $150, adjustment = $50 => 1.5
  1548. adjustmentRate := 1.0
  1549. if node.TotalCost()-node.Adjustment() == 0 {
  1550. // If (totalCost - adjustment) is 0.0 then adjustment cancels
  1551. // the entire node cost and we should make everything 0
  1552. // without dividing by 0.
  1553. adjustmentRate = 0.0
  1554. log.DedupedWarningf(5, "Compute Idle Allocations: Node Cost Adjusted to $0.00 for %s", node.properties.Name)
  1555. } else if node.Adjustment() != 0.0 {
  1556. // adjustmentRate is the ratio of cost-with-adjustment (i.e. TotalCost)
  1557. // to cost-without-adjustment (i.e. TotalCost - Adjustment).
  1558. adjustmentRate = node.TotalCost() / (node.TotalCost() - node.Adjustment())
  1559. }
  1560. cpuCost := node.CPUCost * (1.0 - node.Discount) * adjustmentRate
  1561. gpuCost := node.GPUCost * (1.0 - node.Discount) * adjustmentRate
  1562. ramCost := node.RAMCost * (1.0 - node.Discount) * adjustmentRate
  1563. assetClusterResourceCosts[node.Properties().Cluster]["cpu"] += cpuCost
  1564. assetClusterResourceCosts[node.Properties().Cluster]["gpu"] += gpuCost
  1565. assetClusterResourceCosts[node.Properties().Cluster]["ram"] += ramCost
  1566. }
  1567. })
  1568. // Determine start, end on a per-cluster basis
  1569. clusterStarts := map[string]time.Time{}
  1570. clusterEnds := map[string]time.Time{}
  1571. // Subtract allocated costs from asset costs, leaving only the remaining
  1572. // idle costs.
  1573. as.Each(func(name string, a *Allocation) {
  1574. cluster := a.Properties.Cluster
  1575. if cluster == "" {
  1576. // Failed to find allocation's cluster
  1577. return
  1578. }
  1579. if _, ok := assetClusterResourceCosts[cluster]; !ok {
  1580. // Failed to find assets for allocation's cluster
  1581. return
  1582. }
  1583. // Set cluster (start, end) if they are either not currently set,
  1584. // or if the detected (start, end) of the current allocation falls
  1585. // before or after, respectively, the current values.
  1586. if s, ok := clusterStarts[cluster]; !ok || a.Start.Before(s) {
  1587. clusterStarts[cluster] = a.Start
  1588. }
  1589. if e, ok := clusterEnds[cluster]; !ok || a.End.After(e) {
  1590. clusterEnds[cluster] = a.End
  1591. }
  1592. assetClusterResourceCosts[cluster]["cpu"] -= a.CPUTotalCost()
  1593. assetClusterResourceCosts[cluster]["gpu"] -= a.GPUTotalCost()
  1594. assetClusterResourceCosts[cluster]["ram"] -= a.RAMTotalCost()
  1595. })
  1596. // Turn remaining un-allocated asset costs into idle allocations
  1597. idleAllocs := map[string]*Allocation{}
  1598. for cluster, resources := range assetClusterResourceCosts {
  1599. // Default start and end to the (start, end) of the given window, but
  1600. // use the actual, detected (start, end) pair if they are available.
  1601. start := *window.Start()
  1602. if s, ok := clusterStarts[cluster]; ok && window.Contains(s) {
  1603. start = s
  1604. }
  1605. end := *window.End()
  1606. if e, ok := clusterEnds[cluster]; ok && window.Contains(e) {
  1607. end = e
  1608. }
  1609. idleAlloc := &Allocation{
  1610. Name: fmt.Sprintf("%s/%s", cluster, IdleSuffix),
  1611. Window: window.Clone(),
  1612. Properties: &AllocationProperties{Cluster: cluster},
  1613. Start: start,
  1614. End: end,
  1615. CPUCost: resources["cpu"],
  1616. GPUCost: resources["gpu"],
  1617. RAMCost: resources["ram"],
  1618. }
  1619. // Do not continue if multiple idle allocations are computed for a
  1620. // single cluster.
  1621. if _, ok := idleAllocs[cluster]; ok {
  1622. return nil, fmt.Errorf("duplicate idle allocations for cluster %s", cluster)
  1623. }
  1624. idleAllocs[cluster] = idleAlloc
  1625. }
  1626. return idleAllocs, nil
  1627. }
  1628. // ComputeIdleAllocationsByNode computes the idle allocations for the AllocationSet,
  1629. // given a set of Assets. Ideally, assetSet should contain only Nodes, but if
  1630. // it contains other Assets, they will be ignored; only CPU, GPU and RAM are
  1631. // considered for idle allocation. If the Nodes have adjustments, then apply
  1632. // the adjustments proportionally to each of the resources so that total
  1633. // allocation with idle reflects the adjusted node costs. One idle allocation
  1634. // per-node will be computed and returned, keyed by cluster_id.
  1635. func (as *AllocationSet) ComputeIdleAllocationsByNode(assetSet *AssetSet) (map[string]*Allocation, error) {
  1636. if as == nil {
  1637. return nil, fmt.Errorf("cannot compute idle allocation for nil AllocationSet")
  1638. }
  1639. if assetSet == nil {
  1640. return nil, fmt.Errorf("cannot compute idle allocation with nil AssetSet")
  1641. }
  1642. if !as.Window.Equal(assetSet.Window) {
  1643. return nil, fmt.Errorf("cannot compute idle allocation for sets with mismatched windows: %s != %s", as.Window, assetSet.Window)
  1644. }
  1645. window := as.Window
  1646. // Build a map of cumulative cluster asset costs, per resource; i.e.
  1647. // cluster-to-{cpu|gpu|ram}-to-cost.
  1648. assetNodeResourceCosts := map[string]map[string]float64{}
  1649. nodesByProviderId := map[string]*Node{}
  1650. assetSet.Each(func(key string, a Asset) {
  1651. if node, ok := a.(*Node); ok {
  1652. if _, ok := assetNodeResourceCosts[node.Properties().ProviderID]; ok || node.Properties().ProviderID == "" {
  1653. log.DedupedWarningf(5, "Compute Idle Allocations By Node: Node missing providerId: %s", node.properties.Name)
  1654. return
  1655. }
  1656. nodesByProviderId[node.Properties().ProviderID] = node
  1657. assetNodeResourceCosts[node.Properties().ProviderID] = map[string]float64{}
  1658. // adjustmentRate is used to scale resource costs proportionally
  1659. // by the adjustment. This is necessary because we only get one
  1660. // adjustment per Node, not one per-resource-per-Node.
  1661. //
  1662. // e.g. total cost = $90, adjustment = -$10 => 0.9
  1663. // e.g. total cost = $150, adjustment = -$300 => 0.3333
  1664. // e.g. total cost = $150, adjustment = $50 => 1.5
  1665. adjustmentRate := 1.0
  1666. if node.TotalCost()-node.Adjustment() == 0 {
  1667. // If (totalCost - adjustment) is 0.0 then adjustment cancels
  1668. // the entire node cost and we should make everything 0
  1669. // without dividing by 0.
  1670. adjustmentRate = 0.0
  1671. log.DedupedWarningf(5, "Compute Idle Allocations: Node Cost Adjusted to $0.00 for %s", node.properties.Name)
  1672. } else if node.Adjustment() != 0.0 {
  1673. // adjustmentRate is the ratio of cost-with-adjustment (i.e. TotalCost)
  1674. // to cost-without-adjustment (i.e. TotalCost - Adjustment).
  1675. adjustmentRate = node.TotalCost() / (node.TotalCost() - node.Adjustment())
  1676. }
  1677. cpuCost := node.CPUCost * (1.0 - node.Discount) * adjustmentRate
  1678. gpuCost := node.GPUCost * (1.0 - node.Discount) * adjustmentRate
  1679. ramCost := node.RAMCost * (1.0 - node.Discount) * adjustmentRate
  1680. assetNodeResourceCosts[node.Properties().ProviderID]["cpu"] += cpuCost
  1681. assetNodeResourceCosts[node.Properties().ProviderID]["gpu"] += gpuCost
  1682. assetNodeResourceCosts[node.Properties().ProviderID]["ram"] += ramCost
  1683. }
  1684. })
  1685. // Determine start, end on a per-cluster basis
  1686. nodeStarts := map[string]time.Time{}
  1687. nodeEnds := map[string]time.Time{}
  1688. // Subtract allocated costs from asset costs, leaving only the remaining
  1689. // idle costs.
  1690. as.Each(func(name string, a *Allocation) {
  1691. providerId := a.Properties.ProviderID
  1692. if providerId == "" {
  1693. // Failed to find allocation's node
  1694. log.DedupedWarningf(5, "Compute Idle Allocations By Node: Allocation missing providerId: %s", a.Name)
  1695. return
  1696. }
  1697. if _, ok := assetNodeResourceCosts[providerId]; !ok {
  1698. // Failed to find assets for allocation's node
  1699. return
  1700. }
  1701. // Set cluster (start, end) if they are either not currently set,
  1702. // or if the detected (start, end) of the current allocation falls
  1703. // before or after, respectively, the current values.
  1704. if s, ok := nodeStarts[providerId]; !ok || a.Start.Before(s) {
  1705. nodeStarts[providerId] = a.Start
  1706. }
  1707. if e, ok := nodeEnds[providerId]; !ok || a.End.After(e) {
  1708. nodeEnds[providerId] = a.End
  1709. }
  1710. assetNodeResourceCosts[providerId]["cpu"] -= a.CPUTotalCost()
  1711. assetNodeResourceCosts[providerId]["gpu"] -= a.GPUTotalCost()
  1712. assetNodeResourceCosts[providerId]["ram"] -= a.RAMTotalCost()
  1713. })
  1714. // Turn remaining un-allocated asset costs into idle allocations
  1715. idleAllocs := map[string]*Allocation{}
  1716. for providerId, resources := range assetNodeResourceCosts {
  1717. // Default start and end to the (start, end) of the given window, but
  1718. // use the actual, detected (start, end) pair if they are available.
  1719. start := *window.Start()
  1720. if s, ok := nodeStarts[providerId]; ok && window.Contains(s) {
  1721. start = s
  1722. }
  1723. end := *window.End()
  1724. if e, ok := nodeEnds[providerId]; ok && window.Contains(e) {
  1725. end = e
  1726. }
  1727. node := nodesByProviderId[providerId]
  1728. idleAlloc := &Allocation{
  1729. Name: fmt.Sprintf("%s/%s", node.properties.Name, IdleSuffix),
  1730. Window: window.Clone(),
  1731. Properties: &AllocationProperties{
  1732. Cluster: node.properties.Cluster,
  1733. Node: node.properties.Name,
  1734. ProviderID: providerId,
  1735. },
  1736. Start: start,
  1737. End: end,
  1738. CPUCost: resources["cpu"],
  1739. GPUCost: resources["gpu"],
  1740. RAMCost: resources["ram"],
  1741. }
  1742. // Do not continue if multiple idle allocations are computed for a
  1743. // single node.
  1744. if _, ok := idleAllocs[providerId]; ok {
  1745. return nil, fmt.Errorf("duplicate idle allocations for node Provider ID: %s", providerId)
  1746. }
  1747. idleAllocs[providerId] = idleAlloc
  1748. }
  1749. return idleAllocs, nil
  1750. }
  1751. // Delete removes the allocation with the given name from the set
  1752. func (as *AllocationSet) Delete(name string) {
  1753. if as == nil {
  1754. return
  1755. }
  1756. as.Lock()
  1757. defer as.Unlock()
  1758. delete(as.externalKeys, name)
  1759. delete(as.idleKeys, name)
  1760. delete(as.allocations, name)
  1761. }
  1762. // Each invokes the given function for each Allocation in the set
  1763. func (as *AllocationSet) Each(f func(string, *Allocation)) {
  1764. if as == nil {
  1765. return
  1766. }
  1767. for k, a := range as.allocations {
  1768. f(k, a)
  1769. }
  1770. }
  1771. // End returns the End time of the AllocationSet window
  1772. func (as *AllocationSet) End() time.Time {
  1773. if as == nil {
  1774. log.Warningf("AllocationSet: calling End on nil AllocationSet")
  1775. return time.Unix(0, 0)
  1776. }
  1777. if as.Window.End() == nil {
  1778. log.Warningf("AllocationSet: AllocationSet with illegal window: End is nil; len(as.allocations)=%d", len(as.allocations))
  1779. return time.Unix(0, 0)
  1780. }
  1781. return *as.Window.End()
  1782. }
  1783. // Get returns the Allocation at the given key in the AllocationSet
  1784. func (as *AllocationSet) Get(key string) *Allocation {
  1785. as.RLock()
  1786. defer as.RUnlock()
  1787. if alloc, ok := as.allocations[key]; ok {
  1788. return alloc
  1789. }
  1790. return nil
  1791. }
  1792. // ExternalAllocations returns a map of the external allocations in the set.
  1793. // Returns clones of the actual Allocations, so mutability is not a problem.
  1794. func (as *AllocationSet) ExternalAllocations() map[string]*Allocation {
  1795. externals := map[string]*Allocation{}
  1796. if as.IsEmpty() {
  1797. return externals
  1798. }
  1799. as.RLock()
  1800. defer as.RUnlock()
  1801. for key := range as.externalKeys {
  1802. if alloc, ok := as.allocations[key]; ok {
  1803. externals[key] = alloc.Clone()
  1804. }
  1805. }
  1806. return externals
  1807. }
  1808. // ExternalCost returns the total aggregated external costs of the set
  1809. func (as *AllocationSet) ExternalCost() float64 {
  1810. if as.IsEmpty() {
  1811. return 0.0
  1812. }
  1813. as.RLock()
  1814. defer as.RUnlock()
  1815. externalCost := 0.0
  1816. for _, alloc := range as.allocations {
  1817. externalCost += alloc.ExternalCost
  1818. }
  1819. return externalCost
  1820. }
  1821. // IdleAllocations returns a map of the idle allocations in the AllocationSet.
  1822. // Returns clones of the actual Allocations, so mutability is not a problem.
  1823. func (as *AllocationSet) IdleAllocations() map[string]*Allocation {
  1824. idles := map[string]*Allocation{}
  1825. if as.IsEmpty() {
  1826. return idles
  1827. }
  1828. as.RLock()
  1829. defer as.RUnlock()
  1830. for key := range as.idleKeys {
  1831. if alloc, ok := as.allocations[key]; ok {
  1832. idles[key] = alloc.Clone()
  1833. }
  1834. }
  1835. return idles
  1836. }
  1837. // Insert aggregates the current entry in the AllocationSet by the given Allocation,
  1838. // but only if the Allocation is valid, i.e. matches the AllocationSet's window. If
  1839. // there is no existing entry, one is created. Nil error response indicates success.
  1840. func (as *AllocationSet) Insert(that *Allocation) error {
  1841. return as.insert(that)
  1842. }
  1843. func (as *AllocationSet) insert(that *Allocation) error {
  1844. if as == nil {
  1845. return fmt.Errorf("cannot insert into nil AllocationSet")
  1846. }
  1847. as.Lock()
  1848. defer as.Unlock()
  1849. if as.allocations == nil {
  1850. as.allocations = map[string]*Allocation{}
  1851. }
  1852. if as.externalKeys == nil {
  1853. as.externalKeys = map[string]bool{}
  1854. }
  1855. if as.idleKeys == nil {
  1856. as.idleKeys = map[string]bool{}
  1857. }
  1858. // Add the given Allocation to the existing entry, if there is one;
  1859. // otherwise just set directly into allocations
  1860. if _, ok := as.allocations[that.Name]; !ok {
  1861. as.allocations[that.Name] = that
  1862. } else {
  1863. as.allocations[that.Name].add(that)
  1864. }
  1865. // If the given Allocation is an external one, record that
  1866. if that.IsExternal() {
  1867. as.externalKeys[that.Name] = true
  1868. }
  1869. // If the given Allocation is an idle one, record that
  1870. if that.IsIdle() {
  1871. as.idleKeys[that.Name] = true
  1872. }
  1873. return nil
  1874. }
  1875. // IsEmpty returns true if the AllocationSet is nil, or if it contains
  1876. // zero allocations.
  1877. func (as *AllocationSet) IsEmpty() bool {
  1878. if as == nil || len(as.allocations) == 0 {
  1879. return true
  1880. }
  1881. as.RLock()
  1882. defer as.RUnlock()
  1883. return as.allocations == nil || len(as.allocations) == 0
  1884. }
  1885. // Length returns the number of Allocations in the set
  1886. func (as *AllocationSet) Length() int {
  1887. if as == nil {
  1888. return 0
  1889. }
  1890. as.RLock()
  1891. defer as.RUnlock()
  1892. return len(as.allocations)
  1893. }
  1894. // Map clones and returns a map of the AllocationSet's Allocations
  1895. func (as *AllocationSet) Map() map[string]*Allocation {
  1896. if as.IsEmpty() {
  1897. return map[string]*Allocation{}
  1898. }
  1899. return as.Clone().allocations
  1900. }
  1901. // MarshalJSON JSON-encodes the AllocationSet
  1902. func (as *AllocationSet) MarshalJSON() ([]byte, error) {
  1903. as.RLock()
  1904. defer as.RUnlock()
  1905. return json.Marshal(as.allocations)
  1906. }
  1907. // Resolution returns the AllocationSet's window duration
  1908. func (as *AllocationSet) Resolution() time.Duration {
  1909. return as.Window.Duration()
  1910. }
  1911. // Set uses the given Allocation to overwrite the existing entry in the
  1912. // AllocationSet under the Allocation's name.
  1913. func (as *AllocationSet) Set(alloc *Allocation) error {
  1914. if as.IsEmpty() {
  1915. as.Lock()
  1916. as.allocations = map[string]*Allocation{}
  1917. as.externalKeys = map[string]bool{}
  1918. as.idleKeys = map[string]bool{}
  1919. as.Unlock()
  1920. }
  1921. as.Lock()
  1922. defer as.Unlock()
  1923. as.allocations[alloc.Name] = alloc
  1924. // If the given Allocation is an external one, record that
  1925. if alloc.IsExternal() {
  1926. as.externalKeys[alloc.Name] = true
  1927. }
  1928. // If the given Allocation is an idle one, record that
  1929. if alloc.IsIdle() {
  1930. as.idleKeys[alloc.Name] = true
  1931. }
  1932. return nil
  1933. }
  1934. // Start returns the Start time of the AllocationSet window
  1935. func (as *AllocationSet) Start() time.Time {
  1936. if as == nil {
  1937. log.Warningf("AllocationSet: calling Start on nil AllocationSet")
  1938. return time.Unix(0, 0)
  1939. }
  1940. if as.Window.Start() == nil {
  1941. log.Warningf("AllocationSet: AllocationSet with illegal window: Start is nil; len(as.allocations)=%d", len(as.allocations))
  1942. return time.Unix(0, 0)
  1943. }
  1944. return *as.Window.Start()
  1945. }
  1946. // String represents the given Allocation as a string
  1947. func (as *AllocationSet) String() string {
  1948. if as == nil {
  1949. return "<nil>"
  1950. }
  1951. return fmt.Sprintf("AllocationSet{length: %d; window: %s; totalCost: %.2f}",
  1952. as.Length(), as.Window, as.TotalCost())
  1953. }
  1954. // TotalCost returns the sum of all TotalCosts of the allocations contained
  1955. func (as *AllocationSet) TotalCost() float64 {
  1956. if as.IsEmpty() {
  1957. return 0.0
  1958. }
  1959. as.RLock()
  1960. defer as.RUnlock()
  1961. tc := 0.0
  1962. for _, a := range as.allocations {
  1963. tc += a.TotalCost()
  1964. }
  1965. return tc
  1966. }
  1967. // UTCOffset returns the AllocationSet's configured UTCOffset.
  1968. func (as *AllocationSet) UTCOffset() time.Duration {
  1969. _, zone := as.Start().Zone()
  1970. return time.Duration(zone) * time.Second
  1971. }
  1972. func (as *AllocationSet) accumulate(that *AllocationSet) (*AllocationSet, error) {
  1973. if as.IsEmpty() {
  1974. return that.Clone(), nil
  1975. }
  1976. if that.IsEmpty() {
  1977. return as.Clone(), nil
  1978. }
  1979. // Set start, end to min(start), max(end)
  1980. start := as.Start()
  1981. end := as.End()
  1982. if that.Start().Before(start) {
  1983. start = that.Start()
  1984. }
  1985. if that.End().After(end) {
  1986. end = that.End()
  1987. }
  1988. acc := NewAllocationSet(start, end)
  1989. as.RLock()
  1990. defer as.RUnlock()
  1991. that.RLock()
  1992. defer that.RUnlock()
  1993. for _, alloc := range as.allocations {
  1994. err := acc.insert(alloc)
  1995. if err != nil {
  1996. return nil, err
  1997. }
  1998. }
  1999. for _, alloc := range that.allocations {
  2000. err := acc.insert(alloc)
  2001. if err != nil {
  2002. return nil, err
  2003. }
  2004. }
  2005. return acc, nil
  2006. }
  2007. // AllocationSetRange is a thread-safe slice of AllocationSets. It is meant to
  2008. // be used such that the AllocationSets held are consecutive and coherent with
  2009. // respect to using the same aggregation properties, UTC offset, and
  2010. // resolution. However these rules are not necessarily enforced, so use wisely.
  2011. type AllocationSetRange struct {
  2012. sync.RWMutex
  2013. allocations []*AllocationSet
  2014. FromStore string // stores the name of the store used to retrieve the data
  2015. }
  2016. // NewAllocationSetRange instantiates a new range composed of the given
  2017. // AllocationSets in the order provided.
  2018. func NewAllocationSetRange(allocs ...*AllocationSet) *AllocationSetRange {
  2019. return &AllocationSetRange{
  2020. allocations: allocs,
  2021. }
  2022. }
  2023. // Accumulate sums each AllocationSet in the given range, returning a single cumulative
  2024. // AllocationSet for the entire range.
  2025. func (asr *AllocationSetRange) Accumulate() (*AllocationSet, error) {
  2026. var allocSet *AllocationSet
  2027. var err error
  2028. asr.RLock()
  2029. defer asr.RUnlock()
  2030. for _, as := range asr.allocations {
  2031. allocSet, err = allocSet.accumulate(as)
  2032. if err != nil {
  2033. return nil, err
  2034. }
  2035. }
  2036. return allocSet, nil
  2037. }
  2038. // TODO accumulate into lower-resolution chunks of the given resolution
  2039. // func (asr *AllocationSetRange) AccumulateBy(resolution time.Duration) *AllocationSetRange
  2040. // AggregateBy aggregates each AllocationSet in the range by the given
  2041. // properties and options.
  2042. func (asr *AllocationSetRange) AggregateBy(aggregateBy []string, options *AllocationAggregationOptions) error {
  2043. aggRange := &AllocationSetRange{allocations: []*AllocationSet{}}
  2044. asr.Lock()
  2045. defer asr.Unlock()
  2046. for _, as := range asr.allocations {
  2047. err := as.AggregateBy(aggregateBy, options)
  2048. if err != nil {
  2049. return err
  2050. }
  2051. aggRange.allocations = append(aggRange.allocations, as)
  2052. }
  2053. asr.allocations = aggRange.allocations
  2054. return nil
  2055. }
  2056. // Append appends the given AllocationSet to the end of the range. It does not
  2057. // validate whether or not that violates window continuity.
  2058. func (asr *AllocationSetRange) Append(that *AllocationSet) {
  2059. asr.Lock()
  2060. defer asr.Unlock()
  2061. asr.allocations = append(asr.allocations, that)
  2062. }
  2063. // Each invokes the given function for each AllocationSet in the range
  2064. func (asr *AllocationSetRange) Each(f func(int, *AllocationSet)) {
  2065. if asr == nil {
  2066. return
  2067. }
  2068. for i, as := range asr.allocations {
  2069. f(i, as)
  2070. }
  2071. }
  2072. // Get retrieves the AllocationSet at the given index of the range.
  2073. func (asr *AllocationSetRange) Get(i int) (*AllocationSet, error) {
  2074. if i < 0 || i >= len(asr.allocations) {
  2075. return nil, fmt.Errorf("AllocationSetRange: index out of range: %d", i)
  2076. }
  2077. asr.RLock()
  2078. defer asr.RUnlock()
  2079. return asr.allocations[i], nil
  2080. }
  2081. // InsertRange merges the given AllocationSetRange into the receiving one by
  2082. // lining up sets with matching windows, then inserting each allocation from
  2083. // the given ASR into the respective set in the receiving ASR. If the given
  2084. // ASR contains an AllocationSet from a window that does not exist in the
  2085. // receiving ASR, then an error is returned. However, the given ASR does not
  2086. // need to cover the full range of the receiver.
  2087. func (asr *AllocationSetRange) InsertRange(that *AllocationSetRange) error {
  2088. if asr == nil {
  2089. return fmt.Errorf("cannot insert range into nil AllocationSetRange")
  2090. }
  2091. // keys maps window to index in asr
  2092. keys := map[string]int{}
  2093. asr.Each(func(i int, as *AllocationSet) {
  2094. if as == nil {
  2095. return
  2096. }
  2097. keys[as.Window.String()] = i
  2098. })
  2099. // Nothing to merge, so simply return
  2100. if len(keys) == 0 {
  2101. return nil
  2102. }
  2103. var err error
  2104. that.Each(func(j int, thatAS *AllocationSet) {
  2105. if thatAS == nil || err != nil {
  2106. return
  2107. }
  2108. // Find matching AllocationSet in asr
  2109. i, ok := keys[thatAS.Window.String()]
  2110. if !ok {
  2111. err = fmt.Errorf("cannot merge AllocationSet into window that does not exist: %s", thatAS.Window.String())
  2112. return
  2113. }
  2114. as, err := asr.Get(i)
  2115. if err != nil {
  2116. err = fmt.Errorf("AllocationSetRange index does not exist: %d", i)
  2117. return
  2118. }
  2119. // Insert each Allocation from the given set
  2120. thatAS.Each(func(k string, alloc *Allocation) {
  2121. err = as.Insert(alloc)
  2122. if err != nil {
  2123. err = fmt.Errorf("error inserting allocation: %s", err)
  2124. return
  2125. }
  2126. })
  2127. })
  2128. // err might be nil
  2129. return err
  2130. }
  2131. // Length returns the length of the range, which is zero if nil
  2132. func (asr *AllocationSetRange) Length() int {
  2133. if asr == nil || asr.allocations == nil {
  2134. return 0
  2135. }
  2136. asr.RLock()
  2137. defer asr.RUnlock()
  2138. return len(asr.allocations)
  2139. }
  2140. // MarshalJSON JSON-encodes the range
  2141. func (asr *AllocationSetRange) MarshalJSON() ([]byte, error) {
  2142. asr.RLock()
  2143. defer asr.RUnlock()
  2144. return json.Marshal(asr.allocations)
  2145. }
  2146. // Slice copies the underlying slice of AllocationSets, maintaining order,
  2147. // and returns the copied slice.
  2148. func (asr *AllocationSetRange) Slice() []*AllocationSet {
  2149. if asr == nil || asr.allocations == nil {
  2150. return nil
  2151. }
  2152. asr.RLock()
  2153. defer asr.RUnlock()
  2154. copy := []*AllocationSet{}
  2155. for _, as := range asr.allocations {
  2156. copy = append(copy, as.Clone())
  2157. }
  2158. return copy
  2159. }
  2160. // String represents the given AllocationSetRange as a string
  2161. func (asr *AllocationSetRange) String() string {
  2162. if asr == nil {
  2163. return "<nil>"
  2164. }
  2165. return fmt.Sprintf("AllocationSetRange{length: %d}", asr.Length())
  2166. }
  2167. // UTCOffset returns the detected UTCOffset of the AllocationSets within the
  2168. // range. Defaults to 0 if the range is nil or empty. Does not warn if there
  2169. // are sets with conflicting UTCOffsets (just returns the first).
  2170. func (asr *AllocationSetRange) UTCOffset() time.Duration {
  2171. if asr.Length() == 0 {
  2172. return 0
  2173. }
  2174. as, err := asr.Get(0)
  2175. if err != nil {
  2176. return 0
  2177. }
  2178. return as.UTCOffset()
  2179. }
  2180. // Window returns the full window that the AllocationSetRange spans, from the
  2181. // start of the first AllocationSet to the end of the last one.
  2182. func (asr *AllocationSetRange) Window() Window {
  2183. if asr == nil || asr.Length() == 0 {
  2184. return NewWindow(nil, nil)
  2185. }
  2186. start := asr.allocations[0].Start()
  2187. end := asr.allocations[asr.Length()-1].End()
  2188. return NewWindow(&start, &end)
  2189. }
  2190. // Start returns the earliest start of all Allocations in the AllocationSetRange.
  2191. // It returns an error if there are no allocations.
  2192. func (asr *AllocationSetRange) Start() (time.Time, error) {
  2193. start := time.Time{}
  2194. firstStartNotSet := true
  2195. asr.Each(func(i int, as *AllocationSet) {
  2196. as.Each(func(s string, a *Allocation) {
  2197. if firstStartNotSet {
  2198. start = a.Start
  2199. firstStartNotSet = false
  2200. }
  2201. if a.Start.Before(start) {
  2202. start = a.Start
  2203. }
  2204. })
  2205. })
  2206. if firstStartNotSet {
  2207. return start, fmt.Errorf("had no data to compute a start from")
  2208. }
  2209. return start, nil
  2210. }
  2211. // End returns the latest end of all Allocations in the AllocationSetRange.
  2212. // It returns an error if there are no allocations.
  2213. func (asr *AllocationSetRange) End() (time.Time, error) {
  2214. end := time.Time{}
  2215. firstEndNotSet := true
  2216. asr.Each(func(i int, as *AllocationSet) {
  2217. as.Each(func(s string, a *Allocation) {
  2218. if firstEndNotSet {
  2219. end = a.End
  2220. firstEndNotSet = false
  2221. }
  2222. if a.End.After(end) {
  2223. end = a.End
  2224. }
  2225. })
  2226. })
  2227. if firstEndNotSet {
  2228. return end, fmt.Errorf("had no data to compute an end from")
  2229. }
  2230. return end, nil
  2231. }
  2232. // Minutes returns the duration, in minutes, between the earliest start
  2233. // and the latest end of all allocations in the AllocationSetRange.
  2234. func (asr *AllocationSetRange) Minutes() float64 {
  2235. start, err := asr.Start()
  2236. if err != nil {
  2237. return 0
  2238. }
  2239. end, err := asr.End()
  2240. if err != nil {
  2241. return 0
  2242. }
  2243. duration := end.Sub(start)
  2244. return duration.Minutes()
  2245. }