aggregation.go 82 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257
  1. package costmodel
  2. import (
  3. "fmt"
  4. "math"
  5. "net/http"
  6. "regexp"
  7. "sort"
  8. "strconv"
  9. "strings"
  10. "time"
  11. "github.com/julienschmidt/httprouter"
  12. "github.com/kubecost/cost-model/pkg/cloud"
  13. "github.com/kubecost/cost-model/pkg/env"
  14. "github.com/kubecost/cost-model/pkg/errors"
  15. "github.com/kubecost/cost-model/pkg/kubecost"
  16. "github.com/kubecost/cost-model/pkg/log"
  17. "github.com/kubecost/cost-model/pkg/prom"
  18. "github.com/kubecost/cost-model/pkg/thanos"
  19. "github.com/kubecost/cost-model/pkg/util"
  20. "github.com/kubecost/cost-model/pkg/util/json"
  21. "github.com/patrickmn/go-cache"
  22. prometheusClient "github.com/prometheus/client_golang/api"
  23. "k8s.io/klog"
  24. )
  25. const (
  26. // SplitTypeWeighted signals that shared costs should be shared
  27. // proportionally, rather than evenly
  28. SplitTypeWeighted = "weighted"
  29. // UnallocatedSubfield indicates an allocation datum that does not have the
  30. // chosen Aggregator; e.g. during aggregation by some label, there may be
  31. // cost data that do not have the given label.
  32. UnallocatedSubfield = "__unallocated__"
  33. clusterCostsCacheMinutes = 5.0
  34. )
  35. // Aggregation describes aggregated cost data, containing cumulative cost and
  36. // allocation data per resource, vectors of rate data per resource, efficiency
  37. // data, and metadata describing the type of aggregation operation.
  38. type Aggregation struct {
  39. Aggregator string `json:"aggregation"`
  40. Subfields []string `json:"subfields,omitempty"`
  41. Environment string `json:"environment"`
  42. Cluster string `json:"cluster,omitempty"`
  43. Properties *kubecost.AllocationProperties `json:"-"`
  44. Start time.Time `json:"-"`
  45. End time.Time `json:"-"`
  46. CPUAllocationHourlyAverage float64 `json:"cpuAllocationAverage"`
  47. CPUAllocationVectors []*util.Vector `json:"-"`
  48. CPUAllocationTotal float64 `json:"-"`
  49. CPUCost float64 `json:"cpuCost"`
  50. CPUCostVector []*util.Vector `json:"cpuCostVector,omitempty"`
  51. CPUEfficiency float64 `json:"cpuEfficiency"`
  52. CPURequestedVectors []*util.Vector `json:"-"`
  53. CPUUsedVectors []*util.Vector `json:"-"`
  54. Efficiency float64 `json:"efficiency"`
  55. GPUAllocationHourlyAverage float64 `json:"gpuAllocationAverage"`
  56. GPUAllocationVectors []*util.Vector `json:"-"`
  57. GPUCost float64 `json:"gpuCost"`
  58. GPUCostVector []*util.Vector `json:"gpuCostVector,omitempty"`
  59. GPUAllocationTotal float64 `json:"-"`
  60. RAMAllocationHourlyAverage float64 `json:"ramAllocationAverage"`
  61. RAMAllocationVectors []*util.Vector `json:"-"`
  62. RAMAllocationTotal float64 `json:"-"`
  63. RAMCost float64 `json:"ramCost"`
  64. RAMCostVector []*util.Vector `json:"ramCostVector,omitempty"`
  65. RAMEfficiency float64 `json:"ramEfficiency"`
  66. RAMRequestedVectors []*util.Vector `json:"-"`
  67. RAMUsedVectors []*util.Vector `json:"-"`
  68. PVAllocationHourlyAverage float64 `json:"pvAllocationAverage"`
  69. PVAllocationVectors []*util.Vector `json:"-"`
  70. PVAllocationTotal float64 `json:"-"`
  71. PVCost float64 `json:"pvCost"`
  72. PVCostVector []*util.Vector `json:"pvCostVector,omitempty"`
  73. NetworkCost float64 `json:"networkCost"`
  74. NetworkCostVector []*util.Vector `json:"networkCostVector,omitempty"`
  75. SharedCost float64 `json:"sharedCost"`
  76. TotalCost float64 `json:"totalCost"`
  77. TotalCostVector []*util.Vector `json:"totalCostVector,omitempty"`
  78. }
  79. // TotalHours determines the amount of hours the Aggregation covers, as a
  80. // function of the cost vectors and the resolution of those vectors' data
  81. func (a *Aggregation) TotalHours(resolutionHours float64) float64 {
  82. length := 1
  83. if length < len(a.CPUCostVector) {
  84. length = len(a.CPUCostVector)
  85. }
  86. if length < len(a.RAMCostVector) {
  87. length = len(a.RAMCostVector)
  88. }
  89. if length < len(a.PVCostVector) {
  90. length = len(a.PVCostVector)
  91. }
  92. if length < len(a.GPUCostVector) {
  93. length = len(a.GPUCostVector)
  94. }
  95. if length < len(a.NetworkCostVector) {
  96. length = len(a.NetworkCostVector)
  97. }
  98. return float64(length) * resolutionHours
  99. }
  100. // RateCoefficient computes the coefficient by which the total cost needs to be
  101. // multiplied in order to convert totals costs into per-rate costs.
  102. func (a *Aggregation) RateCoefficient(rateStr string, resolutionHours float64) float64 {
  103. // monthly rate = (730.0)*(total cost)/(total hours)
  104. // daily rate = (24.0)*(total cost)/(total hours)
  105. // hourly rate = (1.0)*(total cost)/(total hours)
  106. // default to hourly rate
  107. coeff := 1.0
  108. switch rateStr {
  109. case "daily":
  110. coeff = util.HoursPerDay
  111. case "monthly":
  112. coeff = util.HoursPerMonth
  113. }
  114. return coeff / a.TotalHours(resolutionHours)
  115. }
  116. type SharedResourceInfo struct {
  117. ShareResources bool
  118. SharedNamespace map[string]bool
  119. LabelSelectors map[string]map[string]bool
  120. }
  121. type SharedCostInfo struct {
  122. Name string
  123. Cost float64
  124. ShareType string
  125. }
  126. func (s *SharedResourceInfo) IsSharedResource(costDatum *CostData) bool {
  127. // exists in a shared namespace
  128. if _, ok := s.SharedNamespace[costDatum.Namespace]; ok {
  129. return true
  130. }
  131. // has at least one shared label (OR, not AND in the case of multiple labels)
  132. for labelName, labelValues := range s.LabelSelectors {
  133. if val, ok := costDatum.Labels[labelName]; ok && labelValues[val] {
  134. return true
  135. }
  136. }
  137. return false
  138. }
  139. func NewSharedResourceInfo(shareResources bool, sharedNamespaces []string, labelNames []string, labelValues []string) *SharedResourceInfo {
  140. sr := &SharedResourceInfo{
  141. ShareResources: shareResources,
  142. SharedNamespace: make(map[string]bool),
  143. LabelSelectors: make(map[string]map[string]bool),
  144. }
  145. for _, ns := range sharedNamespaces {
  146. sr.SharedNamespace[strings.Trim(ns, " ")] = true
  147. }
  148. // Creating a map of label name to label value, but only if
  149. // the cardinality matches
  150. if len(labelNames) == len(labelValues) {
  151. for i := range labelNames {
  152. cleanedLname := prom.SanitizeLabelName(strings.Trim(labelNames[i], " "))
  153. if values, ok := sr.LabelSelectors[cleanedLname]; ok {
  154. values[strings.Trim(labelValues[i], " ")] = true
  155. } else {
  156. sr.LabelSelectors[cleanedLname] = map[string]bool{strings.Trim(labelValues[i], " "): true}
  157. }
  158. }
  159. }
  160. return sr
  161. }
  162. func GetTotalContainerCost(costData map[string]*CostData, rate string, cp cloud.Provider, discount float64, customDiscount float64, idleCoefficients map[string]float64) float64 {
  163. totalContainerCost := 0.0
  164. for _, costDatum := range costData {
  165. clusterID := costDatum.ClusterID
  166. cpuv, ramv, gpuv, pvvs, netv := getPriceVectors(cp, costDatum, rate, discount, customDiscount, idleCoefficients[clusterID])
  167. totalContainerCost += totalVectors(cpuv)
  168. totalContainerCost += totalVectors(ramv)
  169. totalContainerCost += totalVectors(gpuv)
  170. for _, pv := range pvvs {
  171. totalContainerCost += totalVectors(pv)
  172. }
  173. totalContainerCost += totalVectors(netv)
  174. }
  175. return totalContainerCost
  176. }
  177. func (a *Accesses) ComputeIdleCoefficient(costData map[string]*CostData, cli prometheusClient.Client, cp cloud.Provider, discount float64, customDiscount float64, windowString, offset string) (map[string]float64, error) {
  178. coefficients := make(map[string]float64)
  179. profileName := "ComputeIdleCoefficient: ComputeClusterCosts"
  180. profileStart := time.Now()
  181. var clusterCosts map[string]*ClusterCosts
  182. var err error
  183. key := fmt.Sprintf("%s:%s", windowString, offset)
  184. if data, valid := a.ClusterCostsCache.Get(key); valid {
  185. clusterCosts = data.(map[string]*ClusterCosts)
  186. } else {
  187. clusterCosts, err = a.ComputeClusterCosts(cli, cp, windowString, offset, false)
  188. if err != nil {
  189. return nil, err
  190. }
  191. }
  192. measureTime(profileStart, profileThreshold, profileName)
  193. for cid, costs := range clusterCosts {
  194. if costs.CPUCumulative == 0 && costs.RAMCumulative == 0 && costs.StorageCumulative == 0 {
  195. klog.V(1).Infof("[Warning] No ClusterCosts data for cluster '%s'. Is it emitting data?", cid)
  196. coefficients[cid] = 1.0
  197. continue
  198. }
  199. if costs.TotalCumulative == 0 {
  200. return nil, fmt.Errorf("TotalCumulative cluster cost for cluster '%s' returned 0 over window '%s' offset '%s'", cid, windowString, offset)
  201. }
  202. totalContainerCost := 0.0
  203. for _, costDatum := range costData {
  204. if costDatum.ClusterID == cid {
  205. cpuv, ramv, gpuv, pvvs, _ := getPriceVectors(cp, costDatum, "", discount, customDiscount, 1)
  206. totalContainerCost += totalVectors(cpuv)
  207. totalContainerCost += totalVectors(ramv)
  208. totalContainerCost += totalVectors(gpuv)
  209. for _, pv := range pvvs {
  210. totalContainerCost += totalVectors(pv)
  211. }
  212. }
  213. }
  214. coeff := totalContainerCost / costs.TotalCumulative
  215. coefficients[cid] = coeff
  216. }
  217. return coefficients, nil
  218. }
  219. // AggregationOptions provides optional parameters to AggregateCostData, allowing callers to perform more complex operations
  220. type AggregationOptions struct {
  221. Discount float64 // percent by which to discount CPU, RAM, and GPU cost
  222. CustomDiscount float64 // additional custom discount applied to all prices
  223. IdleCoefficients map[string]float64 // scales costs by amount of idle resources on a per-cluster basis
  224. IncludeEfficiency bool // set to true to receive efficiency/usage data
  225. IncludeTimeSeries bool // set to true to receive time series data
  226. Rate string // set to "hourly", "daily", or "monthly" to receive cost rate, rather than cumulative cost
  227. ResolutionHours float64
  228. SharedResourceInfo *SharedResourceInfo
  229. SharedCosts map[string]*SharedCostInfo
  230. FilteredContainerCount int
  231. FilteredEnvironments map[string]int
  232. SharedSplit string
  233. TotalContainerCost float64
  234. }
  235. // Helper method to test request/usgae values against allocation averages for efficiency scores. Generate a warning log if
  236. // clamp is required
  237. func clampAverage(requestsAvg float64, usedAverage float64, allocationAvg float64, resource string) (float64, float64) {
  238. rAvg := requestsAvg
  239. if rAvg > allocationAvg {
  240. klog.V(4).Infof("[Warning] Average %s Requested (%f) > Average %s Allocated (%f). Clamping.", resource, rAvg, resource, allocationAvg)
  241. rAvg = allocationAvg
  242. }
  243. uAvg := usedAverage
  244. if uAvg > allocationAvg {
  245. klog.V(4).Infof("[Warning]: Average %s Used (%f) > Average %s Allocated (%f). Clamping.", resource, uAvg, resource, allocationAvg)
  246. uAvg = allocationAvg
  247. }
  248. return rAvg, uAvg
  249. }
  250. // AggregateCostData aggregates raw cost data by field; e.g. namespace, cluster, service, or label. In the case of label, callers
  251. // must pass a slice of subfields indicating the labels by which to group. Provider is used to define custom resource pricing.
  252. // See AggregationOptions for optional parameters.
  253. func AggregateCostData(costData map[string]*CostData, field string, subfields []string, cp cloud.Provider, opts *AggregationOptions) map[string]*Aggregation {
  254. discount := opts.Discount
  255. customDiscount := opts.CustomDiscount
  256. idleCoefficients := opts.IdleCoefficients
  257. includeTimeSeries := opts.IncludeTimeSeries
  258. includeEfficiency := opts.IncludeEfficiency
  259. rate := opts.Rate
  260. sr := opts.SharedResourceInfo
  261. resolutionHours := 1.0
  262. if opts.ResolutionHours > 0.0 {
  263. resolutionHours = opts.ResolutionHours
  264. }
  265. if idleCoefficients == nil {
  266. idleCoefficients = make(map[string]float64)
  267. }
  268. // aggregations collects key-value pairs of resource group-to-aggregated data
  269. // e.g. namespace-to-data or label-value-to-data
  270. aggregations := make(map[string]*Aggregation)
  271. // sharedResourceCost is the running total cost of resources that should be reported
  272. // as shared across all other resources, rather than reported as a stand-alone category
  273. sharedResourceCost := 0.0
  274. for _, costDatum := range costData {
  275. idleCoefficient, ok := idleCoefficients[costDatum.ClusterID]
  276. if !ok {
  277. idleCoefficient = 1.0
  278. }
  279. if sr != nil && sr.ShareResources && sr.IsSharedResource(costDatum) {
  280. cpuv, ramv, gpuv, pvvs, netv := getPriceVectors(cp, costDatum, rate, discount, customDiscount, idleCoefficient)
  281. sharedResourceCost += totalVectors(cpuv)
  282. sharedResourceCost += totalVectors(ramv)
  283. sharedResourceCost += totalVectors(gpuv)
  284. sharedResourceCost += totalVectors(netv)
  285. for _, pv := range pvvs {
  286. sharedResourceCost += totalVectors(pv)
  287. }
  288. } else {
  289. if field == "cluster" {
  290. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.ClusterID, discount, customDiscount, idleCoefficient, false)
  291. } else if field == "node" {
  292. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.NodeName, discount, customDiscount, idleCoefficient, false)
  293. } else if field == "namespace" {
  294. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace, discount, customDiscount, idleCoefficient, false)
  295. } else if field == "service" {
  296. if len(costDatum.Services) > 0 {
  297. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace+"/"+costDatum.Services[0], discount, customDiscount, idleCoefficient, false)
  298. } else {
  299. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, UnallocatedSubfield, discount, customDiscount, idleCoefficient, false)
  300. }
  301. } else if field == "deployment" {
  302. if len(costDatum.Deployments) > 0 {
  303. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace+"/"+costDatum.Deployments[0], discount, customDiscount, idleCoefficient, false)
  304. } else {
  305. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, UnallocatedSubfield, discount, customDiscount, idleCoefficient, false)
  306. }
  307. } else if field == "statefulset" {
  308. if len(costDatum.Statefulsets) > 0 {
  309. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace+"/"+costDatum.Statefulsets[0], discount, customDiscount, idleCoefficient, false)
  310. } else {
  311. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, UnallocatedSubfield, discount, customDiscount, idleCoefficient, false)
  312. }
  313. } else if field == "daemonset" {
  314. if len(costDatum.Daemonsets) > 0 {
  315. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace+"/"+costDatum.Daemonsets[0], discount, customDiscount, idleCoefficient, false)
  316. } else {
  317. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, UnallocatedSubfield, discount, customDiscount, idleCoefficient, false)
  318. }
  319. } else if field == "controller" {
  320. if controller, kind, hasController := costDatum.GetController(); hasController {
  321. key := fmt.Sprintf("%s/%s:%s", costDatum.Namespace, kind, controller)
  322. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, key, discount, customDiscount, idleCoefficient, false)
  323. } else {
  324. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, UnallocatedSubfield, discount, customDiscount, idleCoefficient, false)
  325. }
  326. } else if field == "label" {
  327. found := false
  328. if costDatum.Labels != nil {
  329. for _, sf := range subfields {
  330. if subfieldName, ok := costDatum.Labels[sf]; ok {
  331. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, subfieldName, discount, customDiscount, idleCoefficient, false)
  332. found = true
  333. break
  334. }
  335. }
  336. }
  337. if !found {
  338. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, UnallocatedSubfield, discount, customDiscount, idleCoefficient, false)
  339. }
  340. } else if field == "annotation" {
  341. found := false
  342. if costDatum.Annotations != nil {
  343. for _, sf := range subfields {
  344. if subfieldName, ok := costDatum.Annotations[sf]; ok {
  345. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, subfieldName, discount, customDiscount, idleCoefficient, false)
  346. found = true
  347. break
  348. }
  349. }
  350. }
  351. if !found {
  352. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, UnallocatedSubfield, discount, customDiscount, idleCoefficient, false)
  353. }
  354. } else if field == "pod" {
  355. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace+"/"+costDatum.PodName, discount, customDiscount, idleCoefficient, false)
  356. } else if field == "container" {
  357. key := fmt.Sprintf("%s/%s/%s/%s", costDatum.ClusterID, costDatum.Namespace, costDatum.PodName, costDatum.Name)
  358. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, key, discount, customDiscount, idleCoefficient, true)
  359. }
  360. }
  361. }
  362. for key, agg := range aggregations {
  363. sharedCoefficient := 1 / float64(len(opts.FilteredEnvironments)+len(aggregations))
  364. agg.CPUCost = totalVectors(agg.CPUCostVector)
  365. agg.RAMCost = totalVectors(agg.RAMCostVector)
  366. agg.GPUCost = totalVectors(agg.GPUCostVector)
  367. agg.PVCost = totalVectors(agg.PVCostVector)
  368. agg.NetworkCost = totalVectors(agg.NetworkCostVector)
  369. if opts.SharedSplit == SplitTypeWeighted {
  370. d := opts.TotalContainerCost - sharedResourceCost
  371. if d == 0 {
  372. klog.V(1).Infof("[Warning] Total container cost '%f' and shared resource cost '%f are the same'. Setting sharedCoefficient to 1", opts.TotalContainerCost, sharedResourceCost)
  373. sharedCoefficient = 1.0
  374. } else {
  375. sharedCoefficient = (agg.CPUCost + agg.RAMCost + agg.GPUCost + agg.PVCost + agg.NetworkCost) / d
  376. }
  377. }
  378. agg.SharedCost = sharedResourceCost * sharedCoefficient
  379. for _, v := range opts.SharedCosts {
  380. agg.SharedCost += v.Cost * sharedCoefficient
  381. }
  382. if rate != "" {
  383. rateCoeff := agg.RateCoefficient(rate, resolutionHours)
  384. agg.CPUCost *= rateCoeff
  385. agg.RAMCost *= rateCoeff
  386. agg.GPUCost *= rateCoeff
  387. agg.PVCost *= rateCoeff
  388. agg.NetworkCost *= rateCoeff
  389. agg.SharedCost *= rateCoeff
  390. }
  391. agg.TotalCost = agg.CPUCost + agg.RAMCost + agg.GPUCost + agg.PVCost + agg.NetworkCost + agg.SharedCost
  392. // Evicted and Completed Pods can still show up here, but have 0 cost.
  393. // Filter these by default. Any reason to keep them?
  394. if agg.TotalCost == 0 {
  395. delete(aggregations, key)
  396. continue
  397. }
  398. // CPU, RAM, and PV allocation are cumulative per-datum, whereas GPU is rate per-datum
  399. agg.CPUAllocationHourlyAverage = totalVectors(agg.CPUAllocationVectors) / agg.TotalHours(resolutionHours)
  400. agg.RAMAllocationHourlyAverage = totalVectors(agg.RAMAllocationVectors) / agg.TotalHours(resolutionHours)
  401. agg.GPUAllocationHourlyAverage = averageVectors(agg.GPUAllocationVectors)
  402. agg.PVAllocationHourlyAverage = totalVectors(agg.PVAllocationVectors) / agg.TotalHours(resolutionHours)
  403. // TODO niko/etl does this check out for GPU data? Do we need to rewrite GPU queries to be
  404. // culumative?
  405. agg.CPUAllocationTotal = totalVectors(agg.CPUAllocationVectors)
  406. agg.GPUAllocationTotal = totalVectors(agg.GPUAllocationVectors)
  407. agg.PVAllocationTotal = totalVectors(agg.PVAllocationVectors)
  408. agg.RAMAllocationTotal = totalVectors(agg.RAMAllocationVectors)
  409. if includeEfficiency {
  410. // Default both RAM and CPU to 0% efficiency so that a 0-requested, 0-allocated, 0-used situation
  411. // returns 0% efficiency, which should be a red-flag.
  412. //
  413. // If non-zero numbers are available, then efficiency is defined as:
  414. // idlePercentage = (requested - used) / allocated
  415. // efficiency = (1.0 - idlePercentage)
  416. //
  417. // It is possible to score > 100% efficiency, which is meant to be interpreted as a red flag.
  418. // It is not possible to score < 0% efficiency.
  419. agg.CPUEfficiency = 0.0
  420. CPUIdle := 0.0
  421. if agg.CPUAllocationHourlyAverage > 0.0 {
  422. avgCPURequested := averageVectors(agg.CPURequestedVectors)
  423. avgCPUUsed := averageVectors(agg.CPUUsedVectors)
  424. // Clamp averages, log range violations
  425. avgCPURequested, avgCPUUsed = clampAverage(avgCPURequested, avgCPUUsed, agg.CPUAllocationHourlyAverage, "CPU")
  426. CPUIdle = ((avgCPURequested - avgCPUUsed) / agg.CPUAllocationHourlyAverage)
  427. agg.CPUEfficiency = 1.0 - CPUIdle
  428. }
  429. agg.RAMEfficiency = 0.0
  430. RAMIdle := 0.0
  431. if agg.RAMAllocationHourlyAverage > 0.0 {
  432. avgRAMRequested := averageVectors(agg.RAMRequestedVectors)
  433. avgRAMUsed := averageVectors(agg.RAMUsedVectors)
  434. // Clamp averages, log range violations
  435. avgRAMRequested, avgRAMUsed = clampAverage(avgRAMRequested, avgRAMUsed, agg.RAMAllocationHourlyAverage, "RAM")
  436. RAMIdle = ((avgRAMRequested - avgRAMUsed) / agg.RAMAllocationHourlyAverage)
  437. agg.RAMEfficiency = 1.0 - RAMIdle
  438. }
  439. // Score total efficiency by the sum of CPU and RAM efficiency, weighted by their
  440. // respective total costs.
  441. agg.Efficiency = 0.0
  442. if (agg.CPUCost + agg.RAMCost) > 0 {
  443. agg.Efficiency = ((agg.CPUCost * agg.CPUEfficiency) + (agg.RAMCost * agg.RAMEfficiency)) / (agg.CPUCost + agg.RAMCost)
  444. }
  445. }
  446. // convert RAM from bytes to GiB
  447. agg.RAMAllocationHourlyAverage = agg.RAMAllocationHourlyAverage / 1024 / 1024 / 1024
  448. // convert storage from bytes to GiB
  449. agg.PVAllocationHourlyAverage = agg.PVAllocationHourlyAverage / 1024 / 1024 / 1024
  450. // remove time series data if it is not explicitly requested
  451. if !includeTimeSeries {
  452. agg.CPUCostVector = nil
  453. agg.RAMCostVector = nil
  454. agg.GPUCostVector = nil
  455. agg.PVCostVector = nil
  456. agg.NetworkCostVector = nil
  457. agg.TotalCostVector = nil
  458. } else { // otherwise compute a totalcostvector
  459. v1 := addVectors(agg.CPUCostVector, agg.RAMCostVector)
  460. v2 := addVectors(v1, agg.GPUCostVector)
  461. v3 := addVectors(v2, agg.PVCostVector)
  462. v4 := addVectors(v3, agg.NetworkCostVector)
  463. agg.TotalCostVector = v4
  464. }
  465. // Typesafety checks
  466. if math.IsNaN(agg.CPUAllocationHourlyAverage) || math.IsInf(agg.CPUAllocationHourlyAverage, 0) {
  467. klog.V(1).Infof("[Warning] CPUAllocationHourlyAverage is %f for '%s: %s/%s'", agg.CPUAllocationHourlyAverage, agg.Cluster, agg.Aggregator, agg.Environment)
  468. agg.CPUAllocationHourlyAverage = 0
  469. }
  470. if math.IsNaN(agg.CPUCost) || math.IsInf(agg.CPUCost, 0) {
  471. klog.V(1).Infof("[Warning] CPUCost is %f for '%s: %s/%s'", agg.CPUCost, agg.Cluster, agg.Aggregator, agg.Environment)
  472. agg.CPUCost = 0
  473. }
  474. if math.IsNaN(agg.CPUEfficiency) || math.IsInf(agg.CPUEfficiency, 0) {
  475. klog.V(1).Infof("[Warning] CPUEfficiency is %f for '%s: %s/%s'", agg.CPUEfficiency, agg.Cluster, agg.Aggregator, agg.Environment)
  476. agg.CPUEfficiency = 0
  477. }
  478. if math.IsNaN(agg.Efficiency) || math.IsInf(agg.Efficiency, 0) {
  479. klog.V(1).Infof("[Warning] Efficiency is %f for '%s: %s/%s'", agg.Efficiency, agg.Cluster, agg.Aggregator, agg.Environment)
  480. agg.Efficiency = 0
  481. }
  482. if math.IsNaN(agg.GPUAllocationHourlyAverage) || math.IsInf(agg.GPUAllocationHourlyAverage, 0) {
  483. klog.V(1).Infof("[Warning] GPUAllocationHourlyAverage is %f for '%s: %s/%s'", agg.GPUAllocationHourlyAverage, agg.Cluster, agg.Aggregator, agg.Environment)
  484. agg.GPUAllocationHourlyAverage = 0
  485. }
  486. if math.IsNaN(agg.GPUCost) || math.IsInf(agg.GPUCost, 0) {
  487. klog.V(1).Infof("[Warning] GPUCost is %f for '%s: %s/%s'", agg.GPUCost, agg.Cluster, agg.Aggregator, agg.Environment)
  488. agg.GPUCost = 0
  489. }
  490. if math.IsNaN(agg.RAMAllocationHourlyAverage) || math.IsInf(agg.RAMAllocationHourlyAverage, 0) {
  491. klog.V(1).Infof("[Warning] RAMAllocationHourlyAverage is %f for '%s: %s/%s'", agg.RAMAllocationHourlyAverage, agg.Cluster, agg.Aggregator, agg.Environment)
  492. agg.RAMAllocationHourlyAverage = 0
  493. }
  494. if math.IsNaN(agg.RAMCost) || math.IsInf(agg.RAMCost, 0) {
  495. klog.V(1).Infof("[Warning] RAMCost is %f for '%s: %s/%s'", agg.RAMCost, agg.Cluster, agg.Aggregator, agg.Environment)
  496. agg.RAMCost = 0
  497. }
  498. if math.IsNaN(agg.RAMEfficiency) || math.IsInf(agg.RAMEfficiency, 0) {
  499. klog.V(1).Infof("[Warning] RAMEfficiency is %f for '%s: %s/%s'", agg.RAMEfficiency, agg.Cluster, agg.Aggregator, agg.Environment)
  500. agg.RAMEfficiency = 0
  501. }
  502. if math.IsNaN(agg.PVAllocationHourlyAverage) || math.IsInf(agg.PVAllocationHourlyAverage, 0) {
  503. klog.V(1).Infof("[Warning] PVAllocationHourlyAverage is %f for '%s: %s/%s'", agg.PVAllocationHourlyAverage, agg.Cluster, agg.Aggregator, agg.Environment)
  504. agg.PVAllocationHourlyAverage = 0
  505. }
  506. if math.IsNaN(agg.PVCost) || math.IsInf(agg.PVCost, 0) {
  507. klog.V(1).Infof("[Warning] PVCost is %f for '%s: %s/%s'", agg.PVCost, agg.Cluster, agg.Aggregator, agg.Environment)
  508. agg.PVCost = 0
  509. }
  510. if math.IsNaN(agg.NetworkCost) || math.IsInf(agg.NetworkCost, 0) {
  511. klog.V(1).Infof("[Warning] NetworkCost is %f for '%s: %s/%s'", agg.NetworkCost, agg.Cluster, agg.Aggregator, agg.Environment)
  512. agg.NetworkCost = 0
  513. }
  514. if math.IsNaN(agg.SharedCost) || math.IsInf(agg.SharedCost, 0) {
  515. klog.V(1).Infof("[Warning] SharedCost is %f for '%s: %s/%s'", agg.SharedCost, agg.Cluster, agg.Aggregator, agg.Environment)
  516. agg.SharedCost = 0
  517. }
  518. if math.IsNaN(agg.TotalCost) || math.IsInf(agg.TotalCost, 0) {
  519. klog.V(1).Infof("[Warning] TotalCost is %f for '%s: %s/%s'", agg.TotalCost, agg.Cluster, agg.Aggregator, agg.Environment)
  520. agg.TotalCost = 0
  521. }
  522. }
  523. return aggregations
  524. }
  525. func aggregateDatum(cp cloud.Provider, aggregations map[string]*Aggregation, costDatum *CostData, field string, subfields []string, rate string, key string, discount float64, customDiscount float64, idleCoefficient float64, includeProperties bool) {
  526. // add new entry to aggregation results if a new key is encountered
  527. if _, ok := aggregations[key]; !ok {
  528. agg := &Aggregation{
  529. Aggregator: field,
  530. Environment: key,
  531. }
  532. if len(subfields) > 0 {
  533. agg.Subfields = subfields
  534. }
  535. if includeProperties {
  536. props := &kubecost.AllocationProperties{}
  537. props.Cluster = costDatum.ClusterID
  538. props.Node = costDatum.NodeName
  539. if controller, kind, hasController := costDatum.GetController(); hasController {
  540. props.Controller = controller
  541. props.ControllerKind = kind
  542. }
  543. props.Labels = costDatum.Labels
  544. props.Annotations = costDatum.Annotations
  545. props.Namespace = costDatum.Namespace
  546. props.Pod = costDatum.PodName
  547. props.Services = costDatum.Services
  548. props.Container = costDatum.Name
  549. agg.Properties = props
  550. }
  551. aggregations[key] = agg
  552. }
  553. mergeVectors(cp, costDatum, aggregations[key], rate, discount, customDiscount, idleCoefficient)
  554. }
  555. func mergeVectors(cp cloud.Provider, costDatum *CostData, aggregation *Aggregation, rate string, discount float64, customDiscount float64, idleCoefficient float64) {
  556. aggregation.CPUAllocationVectors = addVectors(costDatum.CPUAllocation, aggregation.CPUAllocationVectors)
  557. aggregation.CPURequestedVectors = addVectors(costDatum.CPUReq, aggregation.CPURequestedVectors)
  558. aggregation.CPUUsedVectors = addVectors(costDatum.CPUUsed, aggregation.CPUUsedVectors)
  559. aggregation.RAMAllocationVectors = addVectors(costDatum.RAMAllocation, aggregation.RAMAllocationVectors)
  560. aggregation.RAMRequestedVectors = addVectors(costDatum.RAMReq, aggregation.RAMRequestedVectors)
  561. aggregation.RAMUsedVectors = addVectors(costDatum.RAMUsed, aggregation.RAMUsedVectors)
  562. aggregation.GPUAllocationVectors = addVectors(costDatum.GPUReq, aggregation.GPUAllocationVectors)
  563. for _, pvcd := range costDatum.PVCData {
  564. aggregation.PVAllocationVectors = addVectors(pvcd.Values, aggregation.PVAllocationVectors)
  565. }
  566. cpuv, ramv, gpuv, pvvs, netv := getPriceVectors(cp, costDatum, rate, discount, customDiscount, idleCoefficient)
  567. aggregation.CPUCostVector = addVectors(cpuv, aggregation.CPUCostVector)
  568. aggregation.RAMCostVector = addVectors(ramv, aggregation.RAMCostVector)
  569. aggregation.GPUCostVector = addVectors(gpuv, aggregation.GPUCostVector)
  570. aggregation.NetworkCostVector = addVectors(netv, aggregation.NetworkCostVector)
  571. for _, vectorList := range pvvs {
  572. aggregation.PVCostVector = addVectors(aggregation.PVCostVector, vectorList)
  573. }
  574. }
  575. // Returns the blended discounts applied to the node as a result of global discounts and reserved instance
  576. // discounts
  577. func getDiscounts(costDatum *CostData, cpuCost float64, ramCost float64, discount float64) (float64, float64) {
  578. if costDatum.NodeData == nil {
  579. return discount, discount
  580. }
  581. if costDatum.NodeData.IsSpot() {
  582. return 0, 0
  583. }
  584. reserved := costDatum.NodeData.Reserved
  585. // blended discounts
  586. blendedCPUDiscount := discount
  587. blendedRAMDiscount := discount
  588. if reserved != nil && reserved.CPUCost > 0 && reserved.RAMCost > 0 {
  589. reservedCPUDiscount := 0.0
  590. if cpuCost == 0 {
  591. klog.V(1).Infof("[Warning] No cpu cost found for cluster '%s' node '%s'", costDatum.ClusterID, costDatum.NodeName)
  592. } else {
  593. reservedCPUDiscount = 1.0 - (reserved.CPUCost / cpuCost)
  594. }
  595. reservedRAMDiscount := 0.0
  596. if ramCost == 0 {
  597. klog.V(1).Infof("[Warning] No ram cost found for cluster '%s' node '%s'", costDatum.ClusterID, costDatum.NodeName)
  598. } else {
  599. reservedRAMDiscount = 1.0 - (reserved.RAMCost / ramCost)
  600. }
  601. // AWS passes the # of reserved CPU and RAM as -1 to represent "All"
  602. if reserved.ReservedCPU < 0 && reserved.ReservedRAM < 0 {
  603. blendedCPUDiscount = reservedCPUDiscount
  604. blendedRAMDiscount = reservedRAMDiscount
  605. } else {
  606. nodeCPU, ierr := strconv.ParseInt(costDatum.NodeData.VCPU, 10, 64)
  607. nodeRAM, ferr := strconv.ParseFloat(costDatum.NodeData.RAMBytes, 64)
  608. if ierr == nil && ferr == nil {
  609. nodeRAMGB := nodeRAM / 1024 / 1024 / 1024
  610. reservedRAMGB := float64(reserved.ReservedRAM) / 1024 / 1024 / 1024
  611. nonReservedCPU := nodeCPU - reserved.ReservedCPU
  612. nonReservedRAM := nodeRAMGB - reservedRAMGB
  613. if nonReservedCPU == 0 {
  614. blendedCPUDiscount = reservedCPUDiscount
  615. } else {
  616. if nodeCPU == 0 {
  617. klog.V(1).Infof("[Warning] No ram found for cluster '%s' node '%s'", costDatum.ClusterID, costDatum.NodeName)
  618. } else {
  619. blendedCPUDiscount = (float64(reserved.ReservedCPU) * reservedCPUDiscount) + (float64(nonReservedCPU)*discount)/float64(nodeCPU)
  620. }
  621. }
  622. if nonReservedRAM == 0 {
  623. blendedRAMDiscount = reservedRAMDiscount
  624. } else {
  625. if nodeRAMGB == 0 {
  626. klog.V(1).Infof("[Warning] No ram found for cluster '%s' node '%s'", costDatum.ClusterID, costDatum.NodeName)
  627. } else {
  628. blendedRAMDiscount = (reservedRAMGB * reservedRAMDiscount) + (nonReservedRAM*discount)/nodeRAMGB
  629. }
  630. }
  631. }
  632. }
  633. }
  634. return blendedCPUDiscount, blendedRAMDiscount
  635. }
  636. func parseVectorPricing(cfg *cloud.CustomPricing, costDatum *CostData, cpuCostStr, ramCostStr, gpuCostStr, pvCostStr string) (float64, float64, float64, float64, bool) {
  637. usesCustom := false
  638. cpuCost, err := strconv.ParseFloat(cpuCostStr, 64)
  639. if err != nil || math.IsNaN(cpuCost) || math.IsInf(cpuCost, 0) || cpuCost == 0 {
  640. cpuCost, err = strconv.ParseFloat(cfg.CPU, 64)
  641. usesCustom = true
  642. if err != nil || math.IsNaN(cpuCost) || math.IsInf(cpuCost, 0) {
  643. cpuCost = 0
  644. }
  645. }
  646. ramCost, err := strconv.ParseFloat(ramCostStr, 64)
  647. if err != nil || math.IsNaN(ramCost) || math.IsInf(ramCost, 0) || ramCost == 0 {
  648. ramCost, err = strconv.ParseFloat(cfg.RAM, 64)
  649. usesCustom = true
  650. if err != nil || math.IsNaN(ramCost) || math.IsInf(ramCost, 0) {
  651. ramCost = 0
  652. }
  653. }
  654. gpuCost, err := strconv.ParseFloat(gpuCostStr, 64)
  655. if err != nil || math.IsNaN(gpuCost) || math.IsInf(gpuCost, 0) {
  656. gpuCost, err = strconv.ParseFloat(cfg.GPU, 64)
  657. if err != nil || math.IsNaN(gpuCost) || math.IsInf(gpuCost, 0) {
  658. gpuCost = 0
  659. }
  660. }
  661. pvCost, err := strconv.ParseFloat(pvCostStr, 64)
  662. if err != nil || math.IsNaN(cpuCost) || math.IsInf(cpuCost, 0) {
  663. pvCost, err = strconv.ParseFloat(cfg.Storage, 64)
  664. if err != nil || math.IsNaN(pvCost) || math.IsInf(pvCost, 0) {
  665. pvCost = 0
  666. }
  667. }
  668. return cpuCost, ramCost, gpuCost, pvCost, usesCustom
  669. }
  670. func getPriceVectors(cp cloud.Provider, costDatum *CostData, rate string, discount float64, customDiscount float64, idleCoefficient float64) ([]*util.Vector, []*util.Vector, []*util.Vector, [][]*util.Vector, []*util.Vector) {
  671. var cpuCost float64
  672. var ramCost float64
  673. var gpuCost float64
  674. var pvCost float64
  675. var usesCustom bool
  676. // If custom pricing is enabled and can be retrieved, replace
  677. // default cost values with custom values
  678. customPricing, err := cp.GetConfig()
  679. if err != nil {
  680. klog.Errorf("failed to load custom pricing: %s", err)
  681. }
  682. if cloud.CustomPricesEnabled(cp) && err == nil {
  683. var cpuCostStr string
  684. var ramCostStr string
  685. var gpuCostStr string
  686. var pvCostStr string
  687. if costDatum.NodeData.IsSpot() {
  688. cpuCostStr = customPricing.SpotCPU
  689. ramCostStr = customPricing.SpotRAM
  690. gpuCostStr = customPricing.SpotGPU
  691. } else {
  692. cpuCostStr = customPricing.CPU
  693. ramCostStr = customPricing.RAM
  694. gpuCostStr = customPricing.GPU
  695. }
  696. pvCostStr = customPricing.Storage
  697. cpuCost, ramCost, gpuCost, pvCost, usesCustom = parseVectorPricing(customPricing, costDatum, cpuCostStr, ramCostStr, gpuCostStr, pvCostStr)
  698. } else if costDatum.NodeData == nil && err == nil {
  699. cpuCostStr := customPricing.CPU
  700. ramCostStr := customPricing.RAM
  701. gpuCostStr := customPricing.GPU
  702. pvCostStr := customPricing.Storage
  703. cpuCost, ramCost, gpuCost, pvCost, usesCustom = parseVectorPricing(customPricing, costDatum, cpuCostStr, ramCostStr, gpuCostStr, pvCostStr)
  704. } else {
  705. cpuCostStr := costDatum.NodeData.VCPUCost
  706. ramCostStr := costDatum.NodeData.RAMCost
  707. gpuCostStr := costDatum.NodeData.GPUCost
  708. pvCostStr := costDatum.NodeData.StorageCost
  709. cpuCost, ramCost, gpuCost, pvCost, usesCustom = parseVectorPricing(customPricing, costDatum, cpuCostStr, ramCostStr, gpuCostStr, pvCostStr)
  710. }
  711. if usesCustom {
  712. log.DedupedWarningf(5, "No pricing data found for node `%s` , using custom pricing", costDatum.NodeName)
  713. }
  714. cpuDiscount, ramDiscount := getDiscounts(costDatum, cpuCost, ramCost, discount)
  715. klog.V(4).Infof("Node Name: %s", costDatum.NodeName)
  716. klog.V(4).Infof("Blended CPU Discount: %f", cpuDiscount)
  717. klog.V(4).Infof("Blended RAM Discount: %f", ramDiscount)
  718. // TODO should we try to apply the rate coefficient here or leave it as a totals-only metric?
  719. rateCoeff := 1.0
  720. if idleCoefficient == 0 {
  721. idleCoefficient = 1.0
  722. }
  723. cpuv := make([]*util.Vector, 0, len(costDatum.CPUAllocation))
  724. for _, val := range costDatum.CPUAllocation {
  725. cpuv = append(cpuv, &util.Vector{
  726. Timestamp: math.Round(val.Timestamp/10) * 10,
  727. Value: (val.Value * cpuCost * (1 - cpuDiscount) * (1 - customDiscount) / idleCoefficient) * rateCoeff,
  728. })
  729. }
  730. ramv := make([]*util.Vector, 0, len(costDatum.RAMAllocation))
  731. for _, val := range costDatum.RAMAllocation {
  732. ramv = append(ramv, &util.Vector{
  733. Timestamp: math.Round(val.Timestamp/10) * 10,
  734. Value: ((val.Value / 1024 / 1024 / 1024) * ramCost * (1 - ramDiscount) * (1 - customDiscount) / idleCoefficient) * rateCoeff,
  735. })
  736. }
  737. gpuv := make([]*util.Vector, 0, len(costDatum.GPUReq))
  738. for _, val := range costDatum.GPUReq {
  739. gpuv = append(gpuv, &util.Vector{
  740. Timestamp: math.Round(val.Timestamp/10) * 10,
  741. Value: (val.Value * gpuCost * (1 - discount) * (1 - customDiscount) / idleCoefficient) * rateCoeff,
  742. })
  743. }
  744. pvvs := make([][]*util.Vector, 0, len(costDatum.PVCData))
  745. for _, pvcData := range costDatum.PVCData {
  746. pvv := make([]*util.Vector, 0, len(pvcData.Values))
  747. if pvcData.Volume != nil {
  748. cost, _ := strconv.ParseFloat(pvcData.Volume.Cost, 64)
  749. // override with custom pricing if enabled
  750. if cloud.CustomPricesEnabled(cp) {
  751. cost = pvCost
  752. }
  753. for _, val := range pvcData.Values {
  754. pvv = append(pvv, &util.Vector{
  755. Timestamp: math.Round(val.Timestamp/10) * 10,
  756. Value: ((val.Value / 1024 / 1024 / 1024) * cost * (1 - customDiscount) / idleCoefficient) * rateCoeff,
  757. })
  758. }
  759. pvvs = append(pvvs, pvv)
  760. }
  761. }
  762. netv := make([]*util.Vector, 0, len(costDatum.NetworkData))
  763. for _, val := range costDatum.NetworkData {
  764. netv = append(netv, &util.Vector{
  765. Timestamp: math.Round(val.Timestamp/10) * 10,
  766. Value: val.Value,
  767. })
  768. }
  769. return cpuv, ramv, gpuv, pvvs, netv
  770. }
  771. func averageVectors(vectors []*util.Vector) float64 {
  772. if len(vectors) == 0 {
  773. return 0.0
  774. }
  775. return totalVectors(vectors) / float64(len(vectors))
  776. }
  777. func totalVectors(vectors []*util.Vector) float64 {
  778. total := 0.0
  779. for _, vector := range vectors {
  780. total += vector.Value
  781. }
  782. return total
  783. }
  784. // addVectors adds two slices of Vectors. Vector timestamps are rounded to the
  785. // nearest ten seconds to allow matching of Vectors within a delta allowance.
  786. // Matching Vectors are summed, while unmatched Vectors are passed through.
  787. // e.g. [(t=1, 1), (t=2, 2)] + [(t=2, 2), (t=3, 3)] = [(t=1, 1), (t=2, 4), (t=3, 3)]
  788. func addVectors(xvs []*util.Vector, yvs []*util.Vector) []*util.Vector {
  789. sumOp := func(result *util.Vector, x *float64, y *float64) bool {
  790. if x != nil && y != nil {
  791. result.Value = *x + *y
  792. } else if y != nil {
  793. result.Value = *y
  794. } else if x != nil {
  795. result.Value = *x
  796. }
  797. return true
  798. }
  799. return util.ApplyVectorOp(xvs, yvs, sumOp)
  800. }
  801. // minCostDataLength sets the minimum number of time series data required to
  802. // cache both raw and aggregated cost data
  803. const minCostDataLength = 2
  804. // EmptyDataError describes an error caused by empty cost data for some
  805. // defined interval
  806. type EmptyDataError struct {
  807. err error
  808. window kubecost.Window
  809. }
  810. // Error implements the error interface
  811. func (ede *EmptyDataError) Error() string {
  812. err := fmt.Sprintf("empty data for range: %s", ede.window)
  813. if ede.err != nil {
  814. err += fmt.Sprintf(": %s", ede.err)
  815. }
  816. return err
  817. }
  818. func costDataTimeSeriesLength(costData map[string]*CostData) int {
  819. l := 0
  820. for _, cd := range costData {
  821. if l < len(cd.RAMAllocation) {
  822. l = len(cd.RAMAllocation)
  823. }
  824. if l < len(cd.CPUAllocation) {
  825. l = len(cd.CPUAllocation)
  826. }
  827. }
  828. return l
  829. }
  830. // ScaleHourlyCostData converts per-hour cost data to per-resolution data. If the target resolution is higher (i.e. < 1.0h)
  831. // then we can do simple multiplication by the fraction-of-an-hour and retain accuracy. If the target resolution is
  832. // lower (i.e. > 1.0h) then we sum groups of hourly data by resolution to maintain fidelity.
  833. // e.g. (100 hours of per-hour hourly data, resolutionHours=10) => 10 data points, grouped and summed by 10-hour window
  834. // e.g. (20 minutes of per-minute hourly data, resolutionHours=1/60) => 20 data points, scaled down by a factor of 60
  835. func ScaleHourlyCostData(data map[string]*CostData, resolutionHours float64) map[string]*CostData {
  836. scaled := map[string]*CostData{}
  837. for key, datum := range data {
  838. datum.RAMReq = scaleVectorSeries(datum.RAMReq, resolutionHours)
  839. datum.RAMUsed = scaleVectorSeries(datum.RAMUsed, resolutionHours)
  840. datum.RAMAllocation = scaleVectorSeries(datum.RAMAllocation, resolutionHours)
  841. datum.CPUReq = scaleVectorSeries(datum.CPUReq, resolutionHours)
  842. datum.CPUUsed = scaleVectorSeries(datum.CPUUsed, resolutionHours)
  843. datum.CPUAllocation = scaleVectorSeries(datum.CPUAllocation, resolutionHours)
  844. datum.GPUReq = scaleVectorSeries(datum.GPUReq, resolutionHours)
  845. datum.NetworkData = scaleVectorSeries(datum.NetworkData, resolutionHours)
  846. for _, pvcDatum := range datum.PVCData {
  847. pvcDatum.Values = scaleVectorSeries(pvcDatum.Values, resolutionHours)
  848. }
  849. scaled[key] = datum
  850. }
  851. return scaled
  852. }
  853. func scaleVectorSeries(vs []*util.Vector, resolutionHours float64) []*util.Vector {
  854. // if scaling to a lower resolution, compress the hourly data for maximum accuracy
  855. if resolutionHours > 1.0 {
  856. return compressVectorSeries(vs, resolutionHours)
  857. }
  858. // if scaling to a higher resolution, simply scale each value down by the fraction of an hour
  859. for _, v := range vs {
  860. v.Value *= resolutionHours
  861. }
  862. return vs
  863. }
  864. func compressVectorSeries(vs []*util.Vector, resolutionHours float64) []*util.Vector {
  865. if len(vs) == 0 {
  866. return vs
  867. }
  868. compressed := []*util.Vector{}
  869. threshold := float64(60 * 60 * resolutionHours)
  870. var acc *util.Vector
  871. for i, v := range vs {
  872. if acc == nil {
  873. // start a new accumulation from current datum
  874. acc = &util.Vector{
  875. Value: vs[i].Value,
  876. Timestamp: vs[i].Timestamp,
  877. }
  878. continue
  879. }
  880. if v.Timestamp-acc.Timestamp < threshold {
  881. // v should be accumulated in current datum
  882. acc.Value += v.Value
  883. } else {
  884. // v falls outside current datum's threshold; append and start a new one
  885. compressed = append(compressed, acc)
  886. acc = &util.Vector{
  887. Value: vs[i].Value,
  888. Timestamp: vs[i].Timestamp,
  889. }
  890. }
  891. }
  892. // append any remaining, incomplete accumulation
  893. if acc != nil {
  894. compressed = append(compressed, acc)
  895. }
  896. return compressed
  897. }
  898. type AggregateQueryOpts struct {
  899. Rate string
  900. Filters map[string]string
  901. SharedResources *SharedResourceInfo
  902. ShareSplit string
  903. AllocateIdle bool
  904. IncludeTimeSeries bool
  905. IncludeEfficiency bool
  906. DisableCache bool
  907. ClearCache bool
  908. NoCache bool
  909. NoExpireCache bool
  910. RemoteEnabled bool
  911. DisableSharedOverhead bool
  912. UseETLAdapter bool
  913. }
  914. func DefaultAggregateQueryOpts() *AggregateQueryOpts {
  915. return &AggregateQueryOpts{
  916. Rate: "",
  917. Filters: map[string]string{},
  918. SharedResources: nil,
  919. ShareSplit: SplitTypeWeighted,
  920. AllocateIdle: false,
  921. IncludeTimeSeries: true,
  922. IncludeEfficiency: true,
  923. DisableCache: false,
  924. ClearCache: false,
  925. NoCache: false,
  926. NoExpireCache: false,
  927. RemoteEnabled: env.IsRemoteEnabled(),
  928. DisableSharedOverhead: false,
  929. UseETLAdapter: false,
  930. }
  931. }
  932. // ComputeAggregateCostModel computes cost data for the given window, then aggregates it by the given fields.
  933. // Data is cached on two levels: the aggregation is cached as well as the underlying cost data.
  934. func (a *Accesses) ComputeAggregateCostModel(promClient prometheusClient.Client, window kubecost.Window, field string, subfields []string, opts *AggregateQueryOpts) (map[string]*Aggregation, string, error) {
  935. // Window is the range of the query, i.e. (start, end)
  936. // It must be closed, i.e. neither start nor end can be nil
  937. if window.IsOpen() {
  938. return nil, "", fmt.Errorf("illegal window: %s", window)
  939. }
  940. // Resolution is the duration of each datum in the cost model range query,
  941. // which corresponds to both the step size given to Prometheus query_range
  942. // and to the window passed to the range queries.
  943. // i.e. by default, we support 1h resolution for queries of windows defined
  944. // in terms of days or integer multiples of hours (e.g. 1d, 12h)
  945. resolution := time.Hour
  946. // Determine resolution by size of duration and divisibility of window.
  947. // By default, resolution is 1hr. If the window is smaller than 1hr, then
  948. // resolution goes down to 1m. If the window is not a multiple of 1hr, then
  949. // resolution goes down to 1m. If the window is greater than 1d, then
  950. // resolution gets scaled up to improve performance by reducing the amount
  951. // of data being computed.
  952. durMins := int64(math.Trunc(window.Minutes()))
  953. if durMins < 24*60 { // less than 1d
  954. // TODO should we have additional options for going by
  955. // e.g. 30m? 10m? 5m?
  956. if durMins%60 != 0 || durMins < 3*60 { // not divisible by 1h or less than 3h
  957. resolution = time.Minute
  958. }
  959. } else { // greater than 1d
  960. if durMins >= 7*24*60 { // greater than (or equal to) 7 days
  961. resolution = 24.0 * time.Hour
  962. } else if durMins >= 2*24*60 { // greater than (or equal to) 2 days
  963. resolution = 2.0 * time.Hour
  964. }
  965. }
  966. // Parse options
  967. if opts == nil {
  968. opts = DefaultAggregateQueryOpts()
  969. }
  970. rate := opts.Rate
  971. filters := opts.Filters
  972. sri := opts.SharedResources
  973. shared := opts.ShareSplit
  974. allocateIdle := opts.AllocateIdle
  975. includeTimeSeries := opts.IncludeTimeSeries
  976. includeEfficiency := opts.IncludeEfficiency
  977. disableCache := opts.DisableCache
  978. clearCache := opts.ClearCache
  979. noCache := opts.NoCache
  980. noExpireCache := opts.NoExpireCache
  981. remoteEnabled := opts.RemoteEnabled
  982. disableSharedOverhead := opts.DisableSharedOverhead
  983. // retainFuncs override filterFuncs. Make sure shared resources do not
  984. // get filtered out.
  985. retainFuncs := []FilterFunc{}
  986. retainFuncs = append(retainFuncs, func(cd *CostData) (bool, string) {
  987. if sri != nil {
  988. return sri.IsSharedResource(cd), ""
  989. }
  990. return false, ""
  991. })
  992. // Parse cost data filters into FilterFuncs
  993. filterFuncs := []FilterFunc{}
  994. aggregateEnvironment := func(costDatum *CostData) string {
  995. if field == "cluster" {
  996. return costDatum.ClusterID
  997. } else if field == "node" {
  998. return costDatum.NodeName
  999. } else if field == "namespace" {
  1000. return costDatum.Namespace
  1001. } else if field == "service" {
  1002. if len(costDatum.Services) > 0 {
  1003. return costDatum.Namespace + "/" + costDatum.Services[0]
  1004. }
  1005. } else if field == "deployment" {
  1006. if len(costDatum.Deployments) > 0 {
  1007. return costDatum.Namespace + "/" + costDatum.Deployments[0]
  1008. }
  1009. } else if field == "daemonset" {
  1010. if len(costDatum.Daemonsets) > 0 {
  1011. return costDatum.Namespace + "/" + costDatum.Daemonsets[0]
  1012. }
  1013. } else if field == "statefulset" {
  1014. if len(costDatum.Statefulsets) > 0 {
  1015. return costDatum.Namespace + "/" + costDatum.Statefulsets[0]
  1016. }
  1017. } else if field == "label" {
  1018. if costDatum.Labels != nil {
  1019. for _, sf := range subfields {
  1020. if subfieldName, ok := costDatum.Labels[sf]; ok {
  1021. return fmt.Sprintf("%s=%s", sf, subfieldName)
  1022. }
  1023. }
  1024. }
  1025. } else if field == "annotation" {
  1026. if costDatum.Annotations != nil {
  1027. for _, sf := range subfields {
  1028. if subfieldName, ok := costDatum.Annotations[sf]; ok {
  1029. return fmt.Sprintf("%s=%s", sf, subfieldName)
  1030. }
  1031. }
  1032. }
  1033. } else if field == "pod" {
  1034. return costDatum.Namespace + "/" + costDatum.PodName
  1035. } else if field == "container" {
  1036. return costDatum.Namespace + "/" + costDatum.PodName + "/" + costDatum.Name
  1037. }
  1038. return ""
  1039. }
  1040. if filters["podprefix"] != "" {
  1041. pps := []string{}
  1042. for _, fp := range strings.Split(filters["podprefix"], ",") {
  1043. if fp != "" {
  1044. cleanedFilter := strings.TrimSpace(fp)
  1045. pps = append(pps, cleanedFilter)
  1046. }
  1047. }
  1048. filterFuncs = append(filterFuncs, func(cd *CostData) (bool, string) {
  1049. aggEnv := aggregateEnvironment(cd)
  1050. for _, pp := range pps {
  1051. cleanedFilter := strings.TrimSpace(pp)
  1052. if strings.HasPrefix(cd.PodName, cleanedFilter) {
  1053. return true, aggEnv
  1054. }
  1055. }
  1056. return false, aggEnv
  1057. })
  1058. }
  1059. if filters["namespace"] != "" {
  1060. // namespaces may be comma-separated, e.g. kubecost,default
  1061. // multiple namespaces are evaluated as an OR relationship
  1062. nss := strings.Split(filters["namespace"], ",")
  1063. filterFuncs = append(filterFuncs, func(cd *CostData) (bool, string) {
  1064. aggEnv := aggregateEnvironment(cd)
  1065. for _, ns := range nss {
  1066. nsTrim := strings.TrimSpace(ns)
  1067. if cd.Namespace == nsTrim {
  1068. return true, aggEnv
  1069. } else if strings.HasSuffix(nsTrim, "*") { // trigger wildcard prefix filtering
  1070. nsTrimAsterisk := strings.TrimSuffix(nsTrim, "*")
  1071. if strings.HasPrefix(cd.Namespace, nsTrimAsterisk) {
  1072. return true, aggEnv
  1073. }
  1074. }
  1075. }
  1076. return false, aggEnv
  1077. })
  1078. }
  1079. if filters["node"] != "" {
  1080. // nodes may be comma-separated, e.g. aws-node-1,aws-node-2
  1081. // multiple nodes are evaluated as an OR relationship
  1082. nodes := strings.Split(filters["node"], ",")
  1083. filterFuncs = append(filterFuncs, func(cd *CostData) (bool, string) {
  1084. aggEnv := aggregateEnvironment(cd)
  1085. for _, node := range nodes {
  1086. nodeTrim := strings.TrimSpace(node)
  1087. if cd.NodeName == nodeTrim {
  1088. return true, aggEnv
  1089. } else if strings.HasSuffix(nodeTrim, "*") { // trigger wildcard prefix filtering
  1090. nodeTrimAsterisk := strings.TrimSuffix(nodeTrim, "*")
  1091. if strings.HasPrefix(cd.NodeName, nodeTrimAsterisk) {
  1092. return true, aggEnv
  1093. }
  1094. }
  1095. }
  1096. return false, aggEnv
  1097. })
  1098. }
  1099. if filters["cluster"] != "" {
  1100. // clusters may be comma-separated, e.g. cluster-one,cluster-two
  1101. // multiple clusters are evaluated as an OR relationship
  1102. cs := strings.Split(filters["cluster"], ",")
  1103. filterFuncs = append(filterFuncs, func(cd *CostData) (bool, string) {
  1104. aggEnv := aggregateEnvironment(cd)
  1105. for _, c := range cs {
  1106. cTrim := strings.TrimSpace(c)
  1107. id, name := cd.ClusterID, cd.ClusterName
  1108. if id == cTrim || name == cTrim {
  1109. return true, aggEnv
  1110. } else if strings.HasSuffix(cTrim, "*") { // trigger wildcard prefix filtering
  1111. cTrimAsterisk := strings.TrimSuffix(cTrim, "*")
  1112. if strings.HasPrefix(id, cTrimAsterisk) || strings.HasPrefix(name, cTrimAsterisk) {
  1113. return true, aggEnv
  1114. }
  1115. }
  1116. }
  1117. return false, aggEnv
  1118. })
  1119. }
  1120. if filters["labels"] != "" {
  1121. // labels are expected to be comma-separated and to take the form key=value
  1122. // e.g. app=cost-analyzer,app.kubernetes.io/instance=kubecost
  1123. // each different label will be applied as an AND
  1124. // multiple values for a single label will be evaluated as an OR
  1125. labelValues := map[string][]string{}
  1126. ls := strings.Split(filters["labels"], ",")
  1127. for _, l := range ls {
  1128. lTrim := strings.TrimSpace(l)
  1129. label := strings.Split(lTrim, "=")
  1130. if len(label) == 2 {
  1131. ln := prom.SanitizeLabelName(strings.TrimSpace(label[0]))
  1132. lv := strings.TrimSpace(label[1])
  1133. labelValues[ln] = append(labelValues[ln], lv)
  1134. } else {
  1135. // label is not of the form name=value, so log it and move on
  1136. log.Warningf("ComputeAggregateCostModel: skipping illegal label filter: %s", l)
  1137. }
  1138. }
  1139. // Generate FilterFunc for each set of label filters by invoking a function instead of accessing
  1140. // values by closure to prevent reference-type looping bug.
  1141. // (see https://github.com/golang/go/wiki/CommonMistakes#using-reference-to-loop-iterator-variable)
  1142. for label, values := range labelValues {
  1143. ff := (func(l string, vs []string) FilterFunc {
  1144. return func(cd *CostData) (bool, string) {
  1145. ae := aggregateEnvironment(cd)
  1146. for _, v := range vs {
  1147. if v == "__unallocated__" { // Special case. __unallocated__ means return all pods without the attached label
  1148. if _, ok := cd.Labels[l]; !ok {
  1149. return true, ae
  1150. }
  1151. }
  1152. if cd.Labels[l] == v {
  1153. return true, ae
  1154. } else if strings.HasSuffix(v, "*") { // trigger wildcard prefix filtering
  1155. vTrim := strings.TrimSuffix(v, "*")
  1156. if strings.HasPrefix(cd.Labels[l], vTrim) {
  1157. return true, ae
  1158. }
  1159. }
  1160. }
  1161. return false, ae
  1162. }
  1163. })(label, values)
  1164. filterFuncs = append(filterFuncs, ff)
  1165. }
  1166. }
  1167. if filters["annotations"] != "" {
  1168. // annotations are expected to be comma-separated and to take the form key=value
  1169. // e.g. app=cost-analyzer,app.kubernetes.io/instance=kubecost
  1170. // each different annotation will be applied as an AND
  1171. // multiple values for a single annotation will be evaluated as an OR
  1172. annotationValues := map[string][]string{}
  1173. as := strings.Split(filters["annotations"], ",")
  1174. for _, annot := range as {
  1175. aTrim := strings.TrimSpace(annot)
  1176. annotation := strings.Split(aTrim, "=")
  1177. if len(annotation) == 2 {
  1178. an := prom.SanitizeLabelName(strings.TrimSpace(annotation[0]))
  1179. av := strings.TrimSpace(annotation[1])
  1180. annotationValues[an] = append(annotationValues[an], av)
  1181. } else {
  1182. // annotation is not of the form name=value, so log it and move on
  1183. log.Warningf("ComputeAggregateCostModel: skipping illegal annotation filter: %s", annot)
  1184. }
  1185. }
  1186. // Generate FilterFunc for each set of annotation filters by invoking a function instead of accessing
  1187. // values by closure to prevent reference-type looping bug.
  1188. // (see https://github.com/golang/go/wiki/CommonMistakes#using-reference-to-loop-iterator-variable)
  1189. for annotation, values := range annotationValues {
  1190. ff := (func(l string, vs []string) FilterFunc {
  1191. return func(cd *CostData) (bool, string) {
  1192. ae := aggregateEnvironment(cd)
  1193. for _, v := range vs {
  1194. if v == "__unallocated__" { // Special case. __unallocated__ means return all pods without the attached label
  1195. if _, ok := cd.Annotations[l]; !ok {
  1196. return true, ae
  1197. }
  1198. }
  1199. if cd.Annotations[l] == v {
  1200. return true, ae
  1201. } else if strings.HasSuffix(v, "*") { // trigger wildcard prefix filtering
  1202. vTrim := strings.TrimSuffix(v, "*")
  1203. if strings.HasPrefix(cd.Annotations[l], vTrim) {
  1204. return true, ae
  1205. }
  1206. }
  1207. }
  1208. return false, ae
  1209. }
  1210. })(annotation, values)
  1211. filterFuncs = append(filterFuncs, ff)
  1212. }
  1213. }
  1214. // clear cache prior to checking the cache so that a clearCache=true
  1215. // request always returns a freshly computed value
  1216. if clearCache {
  1217. a.AggregateCache.Flush()
  1218. a.CostDataCache.Flush()
  1219. }
  1220. cacheExpiry := a.GetCacheExpiration(window.Duration())
  1221. if noExpireCache {
  1222. cacheExpiry = cache.NoExpiration
  1223. }
  1224. // parametrize cache key by all request parameters
  1225. aggKey := GenerateAggKey(window, field, subfields, opts)
  1226. thanosOffset := time.Now().Add(-thanos.OffsetDuration())
  1227. if a.ThanosClient != nil && window.End().After(thanosOffset) {
  1228. log.Infof("ComputeAggregateCostModel: setting end time backwards to first present data")
  1229. // Apply offsets to both end and start times to maintain correct time range
  1230. deltaDuration := window.End().Sub(thanosOffset)
  1231. s := window.Start().Add(-1 * deltaDuration)
  1232. e := time.Now().Add(-thanos.OffsetDuration())
  1233. window.Set(&s, &e)
  1234. }
  1235. dur, off := window.DurationOffsetStrings()
  1236. key := fmt.Sprintf(`%s:%s:%fh:%t`, dur, off, resolution.Hours(), remoteEnabled)
  1237. // report message about which of the two caches hit. by default report a miss
  1238. cacheMessage := fmt.Sprintf("ComputeAggregateCostModel: L1 cache miss: %s L2 cache miss: %s", aggKey, key)
  1239. // check the cache for aggregated response; if cache is hit and not disabled, return response
  1240. if value, found := a.AggregateCache.Get(aggKey); found && !disableCache && !noCache {
  1241. result, ok := value.(map[string]*Aggregation)
  1242. if !ok {
  1243. // disable cache and recompute if type cast fails
  1244. log.Errorf("ComputeAggregateCostModel: caching error: failed to cast aggregate data to struct: %s", aggKey)
  1245. return a.ComputeAggregateCostModel(promClient, window, field, subfields, opts)
  1246. }
  1247. return result, fmt.Sprintf("aggregate cache hit: %s", aggKey), nil
  1248. }
  1249. if window.Hours() >= 1.0 {
  1250. // exclude the last window of the time frame to match Prometheus definitions of range, offset, and resolution
  1251. start := window.Start().Add(resolution)
  1252. window.Set(&start, window.End())
  1253. } else {
  1254. // don't cache requests for durations of less than one hour
  1255. disableCache = true
  1256. }
  1257. // attempt to retrieve cost data from cache
  1258. var costData map[string]*CostData
  1259. var err error
  1260. cacheData, found := a.CostDataCache.Get(key)
  1261. if found && !disableCache && !noCache {
  1262. ok := false
  1263. costData, ok = cacheData.(map[string]*CostData)
  1264. cacheMessage = fmt.Sprintf("ComputeAggregateCostModel: L1 cache miss: %s, L2 cost data cache hit: %s", aggKey, key)
  1265. if !ok {
  1266. log.Errorf("ComputeAggregateCostModel: caching error: failed to cast cost data to struct: %s", key)
  1267. }
  1268. } else {
  1269. log.Infof("ComputeAggregateCostModel: missed cache: %s (found %t, disableCache %t, noCache %t)", key, found, disableCache, noCache)
  1270. costData, err = a.Model.ComputeCostDataRange(promClient, a.CloudProvider, window, resolution, "", "", remoteEnabled)
  1271. if err != nil {
  1272. if prom.IsErrorCollection(err) {
  1273. return nil, "", err
  1274. }
  1275. if pce, ok := err.(prom.CommError); ok {
  1276. return nil, "", pce
  1277. }
  1278. if strings.Contains(err.Error(), "data is empty") {
  1279. return nil, "", &EmptyDataError{err: err, window: window}
  1280. }
  1281. return nil, "", err
  1282. }
  1283. // compute length of the time series in the cost data and only compute
  1284. // aggregates and cache if the length is sufficiently high
  1285. costDataLen := costDataTimeSeriesLength(costData)
  1286. if costDataLen == 0 {
  1287. return nil, "", &EmptyDataError{window: window}
  1288. }
  1289. if costDataLen >= minCostDataLength && !noCache {
  1290. log.Infof("ComputeAggregateCostModel: setting L2 cache: %s", key)
  1291. a.CostDataCache.Set(key, costData, cacheExpiry)
  1292. }
  1293. }
  1294. c, err := a.CloudProvider.GetConfig()
  1295. if err != nil {
  1296. return nil, "", err
  1297. }
  1298. discount, err := ParsePercentString(c.Discount)
  1299. if err != nil {
  1300. return nil, "", err
  1301. }
  1302. customDiscount, err := ParsePercentString(c.NegotiatedDiscount)
  1303. if err != nil {
  1304. return nil, "", err
  1305. }
  1306. sc := make(map[string]*SharedCostInfo)
  1307. if !disableSharedOverhead {
  1308. for key, val := range c.SharedCosts {
  1309. cost, err := strconv.ParseFloat(val, 64)
  1310. durationCoefficient := window.Hours() / util.HoursPerMonth
  1311. if err != nil {
  1312. return nil, "", fmt.Errorf("unable to parse shared cost %s: %s", val, err)
  1313. }
  1314. sc[key] = &SharedCostInfo{
  1315. Name: key,
  1316. Cost: cost * durationCoefficient,
  1317. }
  1318. }
  1319. }
  1320. idleCoefficients := make(map[string]float64)
  1321. if allocateIdle {
  1322. dur, off, err := window.DurationOffset()
  1323. if err != nil {
  1324. log.Errorf("ComputeAggregateCostModel: error computing idle coefficient: illegal window: %s (%s)", window, err)
  1325. return nil, "", err
  1326. }
  1327. if a.ThanosClient != nil && off < thanos.OffsetDuration() {
  1328. // Determine difference between the Thanos offset and the requested
  1329. // offset; e.g. off=1h, thanosOffsetDuration=3h => diff=2h
  1330. diff := thanos.OffsetDuration() - off
  1331. // Reduce duration by difference and increase offset by difference
  1332. // e.g. 24h offset 0h => 21h offset 3h
  1333. dur = dur - diff
  1334. off = thanos.OffsetDuration()
  1335. log.Infof("ComputeAggregateCostModel: setting duration, offset to %s, %s due to Thanos", dur, off)
  1336. // Idle computation cannot be fulfilled for some windows, specifically
  1337. // those with sum(duration, offset) < Thanos offset, because there is
  1338. // no data within that window.
  1339. if dur <= 0 {
  1340. return nil, "", fmt.Errorf("requested idle coefficients from Thanos for illegal duration, offset: %s, %s (original window %s)", dur, off, window)
  1341. }
  1342. }
  1343. // Convert to Prometheus-compatible strings
  1344. durStr, offStr := util.DurationOffsetStrings(dur, off)
  1345. idleCoefficients, err = a.ComputeIdleCoefficient(costData, promClient, a.CloudProvider, discount, customDiscount, durStr, offStr)
  1346. if err != nil {
  1347. log.Errorf("ComputeAggregateCostModel: error computing idle coefficient: duration=%s, offset=%s, err=%s", durStr, offStr, err)
  1348. return nil, "", err
  1349. }
  1350. }
  1351. totalContainerCost := 0.0
  1352. if shared == SplitTypeWeighted {
  1353. totalContainerCost = GetTotalContainerCost(costData, rate, a.CloudProvider, discount, customDiscount, idleCoefficients)
  1354. }
  1355. // filter cost data by namespace and cluster after caching for maximal cache hits
  1356. costData, filteredContainerCount, filteredEnvironments := FilterCostData(costData, retainFuncs, filterFuncs)
  1357. // aggregate cost model data by given fields and cache the result for the default expiration
  1358. aggOpts := &AggregationOptions{
  1359. Discount: discount,
  1360. CustomDiscount: customDiscount,
  1361. IdleCoefficients: idleCoefficients,
  1362. IncludeEfficiency: includeEfficiency,
  1363. IncludeTimeSeries: includeTimeSeries,
  1364. Rate: rate,
  1365. ResolutionHours: resolution.Hours(),
  1366. SharedResourceInfo: sri,
  1367. SharedCosts: sc,
  1368. FilteredContainerCount: filteredContainerCount,
  1369. FilteredEnvironments: filteredEnvironments,
  1370. TotalContainerCost: totalContainerCost,
  1371. SharedSplit: shared,
  1372. }
  1373. result := AggregateCostData(costData, field, subfields, a.CloudProvider, aggOpts)
  1374. // If sending time series data back, switch scale back to hourly data. At this point,
  1375. // resolutionHours may have converted our hourly data to more- or less-than hourly data.
  1376. if includeTimeSeries {
  1377. for _, aggs := range result {
  1378. ScaleAggregationTimeSeries(aggs, resolution.Hours())
  1379. }
  1380. }
  1381. // compute length of the time series in the cost data and only cache
  1382. // aggregation results if the length is sufficiently high
  1383. costDataLen := costDataTimeSeriesLength(costData)
  1384. if costDataLen >= minCostDataLength && window.Hours() > 1.0 && !noCache {
  1385. // Set the result map (rather than a pointer to it) because map is a reference type
  1386. log.Infof("ComputeAggregateCostModel: setting aggregate cache: %s", aggKey)
  1387. a.AggregateCache.Set(aggKey, result, cacheExpiry)
  1388. } else {
  1389. log.Infof("ComputeAggregateCostModel: not setting aggregate cache: %s (not enough data: %t; duration less than 1h: %t; noCache: %t)", key, costDataLen < minCostDataLength, window.Hours() < 1, noCache)
  1390. }
  1391. return result, cacheMessage, nil
  1392. }
  1393. // ScaleAggregationTimeSeries reverses the scaling done by ScaleHourlyCostData, returning
  1394. // the aggregation's time series to hourly data.
  1395. func ScaleAggregationTimeSeries(aggregation *Aggregation, resolutionHours float64) {
  1396. for _, v := range aggregation.CPUCostVector {
  1397. v.Value /= resolutionHours
  1398. }
  1399. for _, v := range aggregation.GPUCostVector {
  1400. v.Value /= resolutionHours
  1401. }
  1402. for _, v := range aggregation.RAMCostVector {
  1403. v.Value /= resolutionHours
  1404. }
  1405. for _, v := range aggregation.PVCostVector {
  1406. v.Value /= resolutionHours
  1407. }
  1408. for _, v := range aggregation.NetworkCostVector {
  1409. v.Value /= resolutionHours
  1410. }
  1411. for _, v := range aggregation.TotalCostVector {
  1412. v.Value /= resolutionHours
  1413. }
  1414. return
  1415. }
  1416. // String returns a string representation of the encapsulated shared resources, which
  1417. // can be used to uniquely identify a set of shared resources. Sorting sets of shared
  1418. // resources ensures that strings representing permutations of the same combination match.
  1419. func (s *SharedResourceInfo) String() string {
  1420. if s == nil {
  1421. return ""
  1422. }
  1423. nss := []string{}
  1424. for ns := range s.SharedNamespace {
  1425. nss = append(nss, ns)
  1426. }
  1427. sort.Strings(nss)
  1428. nsStr := strings.Join(nss, ",")
  1429. labels := []string{}
  1430. for lbl, vals := range s.LabelSelectors {
  1431. for val := range vals {
  1432. if lbl != "" && val != "" {
  1433. labels = append(labels, fmt.Sprintf("%s=%s", lbl, val))
  1434. }
  1435. }
  1436. }
  1437. sort.Strings(labels)
  1438. labelStr := strings.Join(labels, ",")
  1439. return fmt.Sprintf("%s:%s", nsStr, labelStr)
  1440. }
  1441. type aggKeyParams struct {
  1442. duration string
  1443. offset string
  1444. filters map[string]string
  1445. field string
  1446. subfields []string
  1447. rate string
  1448. sri *SharedResourceInfo
  1449. shareType string
  1450. idle bool
  1451. timeSeries bool
  1452. efficiency bool
  1453. }
  1454. // GenerateAggKey generates a parameter-unique key for caching the aggregate cost model
  1455. func GenerateAggKey(window kubecost.Window, field string, subfields []string, opts *AggregateQueryOpts) string {
  1456. if opts == nil {
  1457. opts = DefaultAggregateQueryOpts()
  1458. }
  1459. // Covert to duration, offset so that cache hits occur, even when timestamps have
  1460. // shifted slightly.
  1461. duration, offset := window.DurationOffsetStrings()
  1462. // parse, trim, and sort podprefix filters
  1463. podPrefixFilters := []string{}
  1464. if ppfs, ok := opts.Filters["podprefix"]; ok && ppfs != "" {
  1465. for _, psf := range strings.Split(ppfs, ",") {
  1466. podPrefixFilters = append(podPrefixFilters, strings.TrimSpace(psf))
  1467. }
  1468. }
  1469. sort.Strings(podPrefixFilters)
  1470. podPrefixFiltersStr := strings.Join(podPrefixFilters, ",")
  1471. // parse, trim, and sort namespace filters
  1472. nsFilters := []string{}
  1473. if nsfs, ok := opts.Filters["namespace"]; ok && nsfs != "" {
  1474. for _, nsf := range strings.Split(nsfs, ",") {
  1475. nsFilters = append(nsFilters, strings.TrimSpace(nsf))
  1476. }
  1477. }
  1478. sort.Strings(nsFilters)
  1479. nsFilterStr := strings.Join(nsFilters, ",")
  1480. // parse, trim, and sort node filters
  1481. nodeFilters := []string{}
  1482. if nodefs, ok := opts.Filters["node"]; ok && nodefs != "" {
  1483. for _, nodef := range strings.Split(nodefs, ",") {
  1484. nodeFilters = append(nodeFilters, strings.TrimSpace(nodef))
  1485. }
  1486. }
  1487. sort.Strings(nodeFilters)
  1488. nodeFilterStr := strings.Join(nodeFilters, ",")
  1489. // parse, trim, and sort cluster filters
  1490. cFilters := []string{}
  1491. if cfs, ok := opts.Filters["cluster"]; ok && cfs != "" {
  1492. for _, cf := range strings.Split(cfs, ",") {
  1493. cFilters = append(cFilters, strings.TrimSpace(cf))
  1494. }
  1495. }
  1496. sort.Strings(cFilters)
  1497. cFilterStr := strings.Join(cFilters, ",")
  1498. // parse, trim, and sort label filters
  1499. lFilters := []string{}
  1500. if lfs, ok := opts.Filters["labels"]; ok && lfs != "" {
  1501. for _, lf := range strings.Split(lfs, ",") {
  1502. // trim whitespace from the label name and the label value
  1503. // of each label name/value pair, then reconstruct
  1504. // e.g. "tier = frontend, app = kubecost" == "app=kubecost,tier=frontend"
  1505. lfa := strings.Split(lf, "=")
  1506. if len(lfa) == 2 {
  1507. lfn := strings.TrimSpace(lfa[0])
  1508. lfv := strings.TrimSpace(lfa[1])
  1509. lFilters = append(lFilters, fmt.Sprintf("%s=%s", lfn, lfv))
  1510. } else {
  1511. // label is not of the form name=value, so log it and move on
  1512. klog.V(2).Infof("[Warning] GenerateAggKey: skipping illegal label filter: %s", lf)
  1513. }
  1514. }
  1515. }
  1516. sort.Strings(lFilters)
  1517. lFilterStr := strings.Join(lFilters, ",")
  1518. // parse, trim, and sort annotation filters
  1519. aFilters := []string{}
  1520. if afs, ok := opts.Filters["annotations"]; ok && afs != "" {
  1521. for _, af := range strings.Split(afs, ",") {
  1522. // trim whitespace from the annotation name and the annotation value
  1523. // of each annotation name/value pair, then reconstruct
  1524. // e.g. "tier = frontend, app = kubecost" == "app=kubecost,tier=frontend"
  1525. afa := strings.Split(af, "=")
  1526. if len(afa) == 2 {
  1527. afn := strings.TrimSpace(afa[0])
  1528. afv := strings.TrimSpace(afa[1])
  1529. aFilters = append(aFilters, fmt.Sprintf("%s=%s", afn, afv))
  1530. } else {
  1531. // annotation is not of the form name=value, so log it and move on
  1532. klog.V(2).Infof("[Warning] GenerateAggKey: skipping illegal annotation filter: %s", af)
  1533. }
  1534. }
  1535. }
  1536. sort.Strings(aFilters)
  1537. aFilterStr := strings.Join(aFilters, ",")
  1538. filterStr := fmt.Sprintf("%s:%s:%s:%s:%s:%s", nsFilterStr, nodeFilterStr, cFilterStr, lFilterStr, aFilterStr, podPrefixFiltersStr)
  1539. sort.Strings(subfields)
  1540. fieldStr := fmt.Sprintf("%s:%s", field, strings.Join(subfields, ","))
  1541. if offset == "1m" {
  1542. offset = ""
  1543. }
  1544. return fmt.Sprintf("%s:%s:%s:%s:%s:%s:%s:%t:%t:%t", duration, offset, filterStr, fieldStr, opts.Rate,
  1545. opts.SharedResources, opts.ShareSplit, opts.AllocateIdle, opts.IncludeTimeSeries,
  1546. opts.IncludeEfficiency)
  1547. }
  1548. // Aggregator is capable of computing the aggregated cost model. This is
  1549. // a brutal interface, which should be cleaned up, but it's necessary for
  1550. // being able to swap in an ETL-backed implementation.
  1551. type Aggregator interface {
  1552. ComputeAggregateCostModel(promClient prometheusClient.Client, window kubecost.Window, field string, subfields []string, opts *AggregateQueryOpts) (map[string]*Aggregation, string, error)
  1553. }
  1554. func (a *Accesses) warmAggregateCostModelCache() {
  1555. // Only allow one concurrent cache-warming operation
  1556. sem := util.NewSemaphore(1)
  1557. // Set default values, pulling them from application settings where applicable, and warm the cache
  1558. // for the given duration. Cache is intentionally set to expire (i.e. noExpireCache=false) so that
  1559. // if the default parameters change, the old cached defaults with eventually expire. Thus, the
  1560. // timing of the cache expiry/refresh is the only mechanism ensuring 100% cache warmth.
  1561. warmFunc := func(duration, durationHrs, offset string, cacheEfficiencyData bool) (error, error) {
  1562. promClient := a.GetPrometheusClient(true)
  1563. windowStr := fmt.Sprintf("%s offset %s", duration, offset)
  1564. window, err := kubecost.ParseWindowUTC(windowStr)
  1565. if err != nil {
  1566. return nil, fmt.Errorf("invalid window from window string: %s", windowStr)
  1567. }
  1568. field := "namespace"
  1569. subfields := []string{}
  1570. aggOpts := DefaultAggregateQueryOpts()
  1571. aggOpts.Rate = ""
  1572. aggOpts.Filters = map[string]string{}
  1573. aggOpts.IncludeTimeSeries = false
  1574. aggOpts.IncludeEfficiency = true
  1575. aggOpts.DisableCache = true
  1576. aggOpts.ClearCache = false
  1577. aggOpts.NoCache = false
  1578. aggOpts.NoExpireCache = false
  1579. aggOpts.ShareSplit = SplitTypeWeighted
  1580. aggOpts.RemoteEnabled = env.IsRemoteEnabled()
  1581. aggOpts.AllocateIdle = cloud.AllocateIdleByDefault(a.CloudProvider)
  1582. sharedNamespaces := cloud.SharedNamespaces(a.CloudProvider)
  1583. sharedLabelNames, sharedLabelValues := cloud.SharedLabels(a.CloudProvider)
  1584. if len(sharedNamespaces) > 0 || len(sharedLabelNames) > 0 {
  1585. aggOpts.SharedResources = NewSharedResourceInfo(true, sharedNamespaces, sharedLabelNames, sharedLabelValues)
  1586. }
  1587. aggKey := GenerateAggKey(window, field, subfields, aggOpts)
  1588. log.Infof("aggregation: cache warming defaults: %s", aggKey)
  1589. key := fmt.Sprintf("%s:%s", durationHrs, offset)
  1590. _, _, aggErr := a.ComputeAggregateCostModel(promClient, window, field, subfields, aggOpts)
  1591. if aggErr != nil {
  1592. log.Infof("Error building cache %s: %s", window, aggErr)
  1593. }
  1594. if a.ThanosClient != nil {
  1595. offset = thanos.Offset()
  1596. log.Infof("Setting offset to %s", offset)
  1597. }
  1598. totals, err := a.ComputeClusterCosts(promClient, a.CloudProvider, durationHrs, offset, cacheEfficiencyData)
  1599. if err != nil {
  1600. log.Infof("Error building cluster costs cache %s", key)
  1601. }
  1602. maxMinutesWithData := 0.0
  1603. for _, cluster := range totals {
  1604. if cluster.DataMinutes > maxMinutesWithData {
  1605. maxMinutesWithData = cluster.DataMinutes
  1606. }
  1607. }
  1608. if len(totals) > 0 && maxMinutesWithData > clusterCostsCacheMinutes {
  1609. a.ClusterCostsCache.Set(key, totals, a.GetCacheExpiration(window.Duration()))
  1610. log.Infof("caching %s cluster costs for %s", duration, a.GetCacheExpiration(window.Duration()))
  1611. } else {
  1612. log.Warningf("not caching %s cluster costs: no data or less than %f minutes data ", duration, clusterCostsCacheMinutes)
  1613. }
  1614. return aggErr, err
  1615. }
  1616. // 1 day
  1617. go func(sem *util.Semaphore) {
  1618. defer errors.HandlePanic()
  1619. duration := "1d"
  1620. offset := "1m"
  1621. durHrs := "24h"
  1622. dur := 24 * time.Hour
  1623. for {
  1624. sem.Acquire()
  1625. warmFunc(duration, durHrs, offset, true)
  1626. sem.Return()
  1627. log.Infof("aggregation: warm cache: %s", duration)
  1628. time.Sleep(a.GetCacheRefresh(dur))
  1629. }
  1630. }(sem)
  1631. if !env.IsETLEnabled() {
  1632. // 2 day
  1633. go func(sem *util.Semaphore) {
  1634. defer errors.HandlePanic()
  1635. duration := "2d"
  1636. offset := "1m"
  1637. durHrs := "48h"
  1638. dur := 2 * 24 * time.Hour
  1639. for {
  1640. sem.Acquire()
  1641. warmFunc(duration, durHrs, offset, false)
  1642. sem.Return()
  1643. log.Infof("aggregation: warm cache: %s", duration)
  1644. time.Sleep(a.GetCacheRefresh(dur))
  1645. }
  1646. }(sem)
  1647. // 7 day
  1648. go func(sem *util.Semaphore) {
  1649. defer errors.HandlePanic()
  1650. duration := "7d"
  1651. offset := "1m"
  1652. durHrs := "168h"
  1653. dur := 7 * 24 * time.Hour
  1654. for {
  1655. sem.Acquire()
  1656. aggErr, err := warmFunc(duration, durHrs, offset, false)
  1657. sem.Return()
  1658. log.Infof("aggregation: warm cache: %s", duration)
  1659. if aggErr == nil && err == nil {
  1660. time.Sleep(a.GetCacheRefresh(dur))
  1661. } else {
  1662. time.Sleep(5 * time.Minute)
  1663. }
  1664. }
  1665. }(sem)
  1666. // 30 day
  1667. go func(sem *util.Semaphore) {
  1668. defer errors.HandlePanic()
  1669. for {
  1670. duration := "30d"
  1671. offset := "1m"
  1672. durHrs := "720h"
  1673. dur := 30 * 24 * time.Hour
  1674. sem.Acquire()
  1675. aggErr, err := warmFunc(duration, durHrs, offset, false)
  1676. sem.Return()
  1677. if aggErr == nil && err == nil {
  1678. time.Sleep(a.GetCacheRefresh(dur))
  1679. } else {
  1680. time.Sleep(5 * time.Minute)
  1681. }
  1682. }
  1683. }(sem)
  1684. }
  1685. }
  1686. // AggregateCostModelHandler handles requests to the aggregated cost model API. See
  1687. // ComputeAggregateCostModel for details.
  1688. func (a *Accesses) AggregateCostModelHandler(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  1689. w.Header().Set("Content-Type", "application/json")
  1690. windowStr := r.URL.Query().Get("window")
  1691. // Convert UTC-RFC3339 pairs to configured UTC offset
  1692. // e.g. with UTC offset of -0600, 2020-07-01T00:00:00Z becomes
  1693. // 2020-07-01T06:00:00Z == 2020-07-01T00:00:00-0600
  1694. // TODO niko/etl fix the frontend because this is confusing if you're
  1695. // actually asking for UTC time (...Z) and we swap that "Z" out for the
  1696. // configured UTC offset without asking
  1697. rfc3339 := `\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\dZ`
  1698. regex := regexp.MustCompile(fmt.Sprintf(`(%s),(%s)`, rfc3339, rfc3339))
  1699. match := regex.FindStringSubmatch(windowStr)
  1700. if match != nil {
  1701. start, _ := time.Parse(time.RFC3339, match[1])
  1702. start = start.Add(-env.GetParsedUTCOffset()).In(time.UTC)
  1703. end, _ := time.Parse(time.RFC3339, match[2])
  1704. end = end.Add(-env.GetParsedUTCOffset()).In(time.UTC)
  1705. windowStr = fmt.Sprintf("%sZ,%sZ", start.Format("2006-01-02T15:04:05"), end.Format("2006-01-02T15:04:05Z"))
  1706. }
  1707. // determine duration and offset from query parameters
  1708. window, err := kubecost.ParseWindowWithOffset(windowStr, env.GetParsedUTCOffset())
  1709. if err != nil || window.Start() == nil {
  1710. WriteError(w, BadRequest(fmt.Sprintf("invalid window: %s", err)))
  1711. return
  1712. }
  1713. durRegex := regexp.MustCompile(`^(\d+)(m|h|d|s)$`)
  1714. isDurationStr := durRegex.MatchString(windowStr)
  1715. // legacy offset option should override window offset
  1716. if r.URL.Query().Get("offset") != "" {
  1717. offset := r.URL.Query().Get("offset")
  1718. // Shift window by offset, but only when manually set with separate
  1719. // parameter and window was provided as a duration string. Otherwise,
  1720. // do not alter the (duration, offset) from ParseWindowWithOffset.
  1721. if offset != "1m" && isDurationStr {
  1722. match := durRegex.FindStringSubmatch(offset)
  1723. if match != nil && len(match) == 3 {
  1724. dur := time.Minute
  1725. if match[2] == "h" {
  1726. dur = time.Hour
  1727. }
  1728. if match[2] == "d" {
  1729. dur = 24 * time.Hour
  1730. }
  1731. if match[2] == "s" {
  1732. dur = time.Second
  1733. }
  1734. num, _ := strconv.ParseInt(match[1], 10, 64)
  1735. window = window.Shift(-time.Duration(num) * dur)
  1736. }
  1737. }
  1738. }
  1739. opts := DefaultAggregateQueryOpts()
  1740. // parse remaining query parameters
  1741. namespace := r.URL.Query().Get("namespace")
  1742. cluster := r.URL.Query().Get("cluster")
  1743. labels := r.URL.Query().Get("labels")
  1744. annotations := r.URL.Query().Get("annotations")
  1745. podprefix := r.URL.Query().Get("podprefix")
  1746. field := r.URL.Query().Get("aggregation")
  1747. sharedNamespaces := r.URL.Query().Get("sharedNamespaces")
  1748. sharedLabelNames := r.URL.Query().Get("sharedLabelNames")
  1749. sharedLabelValues := r.URL.Query().Get("sharedLabelValues")
  1750. remote := r.URL.Query().Get("remote") != "false"
  1751. subfieldStr := r.URL.Query().Get("aggregationSubfield")
  1752. subfields := []string{}
  1753. if len(subfieldStr) > 0 {
  1754. s := strings.Split(r.URL.Query().Get("aggregationSubfield"), ",")
  1755. for _, rawLabel := range s {
  1756. subfields = append(subfields, prom.SanitizeLabelName(rawLabel))
  1757. }
  1758. }
  1759. idleFlag := r.URL.Query().Get("allocateIdle")
  1760. if idleFlag == "default" {
  1761. c, _ := a.CloudProvider.GetConfig()
  1762. opts.AllocateIdle = (c.DefaultIdle == "true")
  1763. } else {
  1764. opts.AllocateIdle = (idleFlag == "true")
  1765. }
  1766. opts.Rate = r.URL.Query().Get("rate")
  1767. opts.ShareSplit = r.URL.Query().Get("sharedSplit")
  1768. // timeSeries == true maintains the time series dimension of the data,
  1769. // which by default gets summed over the entire interval
  1770. opts.IncludeTimeSeries = r.URL.Query().Get("timeSeries") == "true"
  1771. // efficiency has been deprecated in favor of a default to always send efficiency
  1772. opts.IncludeEfficiency = true
  1773. // TODO niko/caching rename "recomputeCache"
  1774. // disableCache, if set to "true", tells this function to recompute and
  1775. // cache the requested data
  1776. opts.DisableCache = r.URL.Query().Get("disableCache") == "true"
  1777. // clearCache, if set to "true", tells this function to flush the cache,
  1778. // then recompute and cache the requested data
  1779. opts.ClearCache = r.URL.Query().Get("clearCache") == "true"
  1780. // noCache avoids the cache altogether, both reading from and writing to
  1781. opts.NoCache = r.URL.Query().Get("noCache") == "true"
  1782. // noExpireCache should only be used by cache warming to set non-expiring caches
  1783. opts.NoExpireCache = false
  1784. // etl triggers ETL adapter
  1785. opts.UseETLAdapter = r.URL.Query().Get("etl") == "true"
  1786. // aggregation field is required
  1787. if field == "" {
  1788. WriteError(w, BadRequest("Missing aggregation field parameter"))
  1789. return
  1790. }
  1791. // aggregation subfield is required when aggregation field is "label"
  1792. if (field == "label" || field == "annotation") && len(subfields) == 0 {
  1793. WriteError(w, BadRequest("Missing aggregation subfield parameter"))
  1794. return
  1795. }
  1796. // enforce one of the available rate options
  1797. if opts.Rate != "" && opts.Rate != "hourly" && opts.Rate != "daily" && opts.Rate != "monthly" {
  1798. WriteError(w, BadRequest("Rate parameter only supports: hourly, daily, monthly or empty"))
  1799. return
  1800. }
  1801. // parse cost data filters
  1802. // namespace and cluster are exact-string-matches
  1803. // labels are expected to be comma-separated and to take the form key=value
  1804. // e.g. app=cost-analyzer,app.kubernetes.io/instance=kubecost
  1805. opts.Filters = map[string]string{
  1806. "namespace": namespace,
  1807. "cluster": cluster,
  1808. "labels": labels,
  1809. "annotations": annotations,
  1810. "podprefix": podprefix,
  1811. }
  1812. // parse shared resources
  1813. sn := []string{}
  1814. sln := []string{}
  1815. slv := []string{}
  1816. if sharedNamespaces != "" {
  1817. sn = strings.Split(sharedNamespaces, ",")
  1818. }
  1819. if sharedLabelNames != "" {
  1820. sln = strings.Split(sharedLabelNames, ",")
  1821. slv = strings.Split(sharedLabelValues, ",")
  1822. if len(sln) != len(slv) || slv[0] == "" {
  1823. WriteError(w, BadRequest("Supply exactly one shared label value per shared label name"))
  1824. return
  1825. }
  1826. }
  1827. if len(sn) > 0 || len(sln) > 0 {
  1828. opts.SharedResources = NewSharedResourceInfo(true, sn, sln, slv)
  1829. }
  1830. // enable remote if it is available and not disabled
  1831. opts.RemoteEnabled = remote && env.IsRemoteEnabled()
  1832. promClient := a.GetPrometheusClient(remote)
  1833. var data map[string]*Aggregation
  1834. var message string
  1835. data, message, err = a.AggAPI.ComputeAggregateCostModel(promClient, window, field, subfields, opts)
  1836. // Find any warnings in http request context
  1837. warning, _ := util.GetWarning(r)
  1838. if err != nil {
  1839. if emptyErr, ok := err.(*EmptyDataError); ok {
  1840. if warning == "" {
  1841. w.Write(WrapData(map[string]interface{}{}, emptyErr))
  1842. } else {
  1843. w.Write(WrapDataWithWarning(map[string]interface{}{}, emptyErr, warning))
  1844. }
  1845. return
  1846. }
  1847. if boundaryErr, ok := err.(*kubecost.BoundaryError); ok {
  1848. if window.Start() != nil && window.Start().After(time.Now().Add(-90*24*time.Hour)) {
  1849. // Asking for data within a 90 day period: it will be available
  1850. // after the pipeline builds
  1851. msg := "Data will be available after ETL is built"
  1852. rex := regexp.MustCompile(`(\d+\.*\d*)%`)
  1853. match := rex.FindStringSubmatch(boundaryErr.Message)
  1854. if len(match) > 1 {
  1855. completionPct, err := strconv.ParseFloat(match[1], 64)
  1856. if err == nil {
  1857. msg = fmt.Sprintf("%s (%.1f%% complete)", msg, completionPct)
  1858. }
  1859. }
  1860. WriteError(w, InternalServerError(msg))
  1861. } else {
  1862. // Boundary error outside of 90 day period; may not be available
  1863. WriteError(w, InternalServerError(boundaryErr.Error()))
  1864. }
  1865. return
  1866. }
  1867. errStr := fmt.Sprintf("error computing aggregate cost model: %s", err)
  1868. WriteError(w, InternalServerError(errStr))
  1869. return
  1870. }
  1871. if warning == "" {
  1872. w.Write(WrapDataWithMessage(data, nil, message))
  1873. } else {
  1874. w.Write(WrapDataWithMessageAndWarning(data, nil, message, warning))
  1875. }
  1876. }
  1877. // ParseAggregationProperties attempts to parse and return aggregation properties
  1878. // encoded under the given key. If none exist, or if parsing fails, an error
  1879. // is returned with empty AllocationProperties.
  1880. func ParseAggregationProperties(qp util.QueryParams, key string) ([]string, error) {
  1881. aggregateBy := []string{}
  1882. for _, agg := range qp.GetList(key, ",") {
  1883. aggregate := strings.TrimSpace(agg)
  1884. if aggregate != "" {
  1885. if prop, err := kubecost.ParseProperty(aggregate); err == nil {
  1886. aggregateBy = append(aggregateBy, string(prop))
  1887. } else if strings.HasPrefix(aggregate, "label:") {
  1888. aggregateBy = append(aggregateBy, aggregate)
  1889. } else if strings.HasPrefix(aggregate, "annotation:") {
  1890. aggregateBy = append(aggregateBy, aggregate)
  1891. }
  1892. }
  1893. }
  1894. return aggregateBy, nil
  1895. }
  1896. // ComputeAllocationHandler computes an AllocationSetRange from the CostModel.
  1897. func (a *Accesses) ComputeAllocationHandler(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  1898. w.Header().Set("Content-Type", "application/json")
  1899. qp := util.NewQueryParams(r.URL.Query())
  1900. // Window is a required field describing the window of time over which to
  1901. // compute allocation data.
  1902. window, err := kubecost.ParseWindowWithOffset(qp.Get("window", ""), env.GetParsedUTCOffset())
  1903. if err != nil {
  1904. http.Error(w, fmt.Sprintf("Invalid 'window' parameter: %s", err), http.StatusBadRequest)
  1905. }
  1906. // Step is an optional parameter that defines the duration per-set, i.e.
  1907. // the window for an AllocationSet, of the AllocationSetRange to be
  1908. // computed. Defaults to the window size, making one set.
  1909. step := qp.GetDuration("step", window.Duration())
  1910. // Resolution is an optional parameter, defaulting to the configured ETL
  1911. // resolution.
  1912. resolution := qp.GetDuration("resolution", env.GetETLResolution())
  1913. // Aggregation is a required comma-separated list of fields by which to
  1914. // aggregate results. Some fields allow a sub-field, which is distinguished
  1915. // with a colon; e.g. "label:app".
  1916. // Examples: "namespace", "namespace,label:app"
  1917. aggregateBy, err := ParseAggregationProperties(qp, "aggregate")
  1918. if err != nil {
  1919. http.Error(w, fmt.Sprintf("Invalid 'aggregate' parameter: %s", err), http.StatusBadRequest)
  1920. }
  1921. // Accumulate is an optional parameter, defaulting to false, which if true
  1922. // sums each Set in the Range, producing one Set.
  1923. accumulate := qp.GetBool("accumulate", false)
  1924. // Query for AllocationSets in increments of the given step duration,
  1925. // appending each to the AllocationSetRange.
  1926. asr := kubecost.NewAllocationSetRange()
  1927. stepStart := *window.Start()
  1928. for window.End().After(stepStart) {
  1929. stepEnd := stepStart.Add(step)
  1930. stepWindow := kubecost.NewWindow(&stepStart, &stepEnd)
  1931. as, err := a.Model.ComputeAllocation(*stepWindow.Start(), *stepWindow.End(), resolution)
  1932. if err != nil {
  1933. WriteError(w, InternalServerError(err.Error()))
  1934. return
  1935. }
  1936. asr.Append(as)
  1937. stepStart = stepEnd
  1938. }
  1939. // Aggregate, if requested
  1940. if len(aggregateBy) > 0 {
  1941. err = asr.AggregateBy(aggregateBy, nil)
  1942. if err != nil {
  1943. WriteError(w, InternalServerError(err.Error()))
  1944. return
  1945. }
  1946. }
  1947. // Accumulate, if requested
  1948. if accumulate {
  1949. as, err := asr.Accumulate()
  1950. if err != nil {
  1951. WriteError(w, InternalServerError(err.Error()))
  1952. return
  1953. }
  1954. asr = kubecost.NewAllocationSetRange(as)
  1955. }
  1956. w.Write(WrapData(asr, nil))
  1957. }
  1958. // The below was transferred from a different package in order to maintain
  1959. // previous behavior. Ultimately, we should clean this up at some point.
  1960. // TODO move to util and/or standardize everything
  1961. type Error struct {
  1962. StatusCode int
  1963. Body string
  1964. }
  1965. func WriteError(w http.ResponseWriter, err Error) {
  1966. status := err.StatusCode
  1967. if status == 0 {
  1968. status = http.StatusInternalServerError
  1969. }
  1970. w.WriteHeader(status)
  1971. resp, _ := json.Marshal(&Response{
  1972. Code: status,
  1973. Message: fmt.Sprintf("Error: %s", err.Body),
  1974. })
  1975. w.Write(resp)
  1976. }
  1977. func BadRequest(message string) Error {
  1978. return Error{
  1979. StatusCode: http.StatusBadRequest,
  1980. Body: message,
  1981. }
  1982. }
  1983. func InternalServerError(message string) Error {
  1984. if message == "" {
  1985. message = "Internal Server Error"
  1986. }
  1987. return Error{
  1988. StatusCode: http.StatusInternalServerError,
  1989. Body: message,
  1990. }
  1991. }
  1992. func NotFound() Error {
  1993. return Error{
  1994. StatusCode: http.StatusNotFound,
  1995. Body: "Not Found",
  1996. }
  1997. }