aggregation.go 73 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002
  1. package costmodel
  2. import (
  3. "fmt"
  4. "math"
  5. "net/http"
  6. "regexp"
  7. "sort"
  8. "strconv"
  9. "strings"
  10. "time"
  11. "github.com/julienschmidt/httprouter"
  12. "github.com/kubecost/cost-model/pkg/cloud"
  13. "github.com/kubecost/cost-model/pkg/env"
  14. "github.com/kubecost/cost-model/pkg/errors"
  15. "github.com/kubecost/cost-model/pkg/kubecost"
  16. "github.com/kubecost/cost-model/pkg/log"
  17. "github.com/kubecost/cost-model/pkg/prom"
  18. "github.com/kubecost/cost-model/pkg/thanos"
  19. "github.com/kubecost/cost-model/pkg/util"
  20. "github.com/patrickmn/go-cache"
  21. prometheusClient "github.com/prometheus/client_golang/api"
  22. "k8s.io/klog"
  23. )
  24. const (
  25. // SplitTypeWeighted signals that shared costs should be shared
  26. // proportionally, rather than evenly
  27. SplitTypeWeighted = "weighted"
  28. // UnallocatedSubfield indicates an allocation datum that does not have the
  29. // chosen Aggregator; e.g. during aggregation by some label, there may be
  30. // cost data that do not have the given label.
  31. UnallocatedSubfield = "__unallocated__"
  32. clusterCostsCacheMinutes = 5.0
  33. )
  34. // Aggregation describes aggregated cost data, containing cumulative cost and
  35. // allocation data per resource, vectors of rate data per resource, efficiency
  36. // data, and metadata describing the type of aggregation operation.
  37. type Aggregation struct {
  38. Aggregator string `json:"aggregation"`
  39. Subfields []string `json:"subfields,omitempty"`
  40. Environment string `json:"environment"`
  41. Cluster string `json:"cluster,omitempty"`
  42. Properties *kubecost.Properties `json:"-"`
  43. CPUAllocationHourlyAverage float64 `json:"cpuAllocationAverage"`
  44. CPUAllocationVectors []*util.Vector `json:"-"`
  45. CPUAllocationTotal float64 `json:"-"`
  46. CPUCost float64 `json:"cpuCost"`
  47. CPUCostVector []*util.Vector `json:"cpuCostVector,omitempty"`
  48. CPUEfficiency float64 `json:"cpuEfficiency"`
  49. CPURequestedVectors []*util.Vector `json:"-"`
  50. CPUUsedVectors []*util.Vector `json:"-"`
  51. Efficiency float64 `json:"efficiency"`
  52. GPUAllocationHourlyAverage float64 `json:"gpuAllocationAverage"`
  53. GPUAllocationVectors []*util.Vector `json:"-"`
  54. GPUCost float64 `json:"gpuCost"`
  55. GPUCostVector []*util.Vector `json:"gpuCostVector,omitempty"`
  56. GPUAllocationTotal float64 `json:"-"`
  57. RAMAllocationHourlyAverage float64 `json:"ramAllocationAverage"`
  58. RAMAllocationVectors []*util.Vector `json:"-"`
  59. RAMAllocationTotal float64 `json:"-"`
  60. RAMCost float64 `json:"ramCost"`
  61. RAMCostVector []*util.Vector `json:"ramCostVector,omitempty"`
  62. RAMEfficiency float64 `json:"ramEfficiency"`
  63. RAMRequestedVectors []*util.Vector `json:"-"`
  64. RAMUsedVectors []*util.Vector `json:"-"`
  65. PVAllocationHourlyAverage float64 `json:"pvAllocationAverage"`
  66. PVAllocationVectors []*util.Vector `json:"-"`
  67. PVAllocationTotal float64 `json:"-"`
  68. PVCost float64 `json:"pvCost"`
  69. PVCostVector []*util.Vector `json:"pvCostVector,omitempty"`
  70. NetworkCost float64 `json:"networkCost"`
  71. NetworkCostVector []*util.Vector `json:"networkCostVector,omitempty"`
  72. SharedCost float64 `json:"sharedCost"`
  73. TotalCost float64 `json:"totalCost"`
  74. TotalCostVector []*util.Vector `json:"totalCostVector,omitempty"`
  75. }
  76. // TotalHours determines the amount of hours the Aggregation covers, as a
  77. // function of the cost vectors and the resolution of those vectors' data
  78. func (a *Aggregation) TotalHours(resolutionHours float64) float64 {
  79. length := 1
  80. if length < len(a.CPUCostVector) {
  81. length = len(a.CPUCostVector)
  82. }
  83. if length < len(a.RAMCostVector) {
  84. length = len(a.RAMCostVector)
  85. }
  86. if length < len(a.PVCostVector) {
  87. length = len(a.PVCostVector)
  88. }
  89. if length < len(a.GPUCostVector) {
  90. length = len(a.GPUCostVector)
  91. }
  92. if length < len(a.NetworkCostVector) {
  93. length = len(a.NetworkCostVector)
  94. }
  95. return float64(length) * resolutionHours
  96. }
  97. // RateCoefficient computes the coefficient by which the total cost needs to be
  98. // multiplied in order to convert totals costs into per-rate costs.
  99. func (a *Aggregation) RateCoefficient(rateStr string, resolutionHours float64) float64 {
  100. // monthly rate = (730.0)*(total cost)/(total hours)
  101. // daily rate = (24.0)*(total cost)/(total hours)
  102. // hourly rate = (1.0)*(total cost)/(total hours)
  103. // default to hourly rate
  104. coeff := 1.0
  105. switch rateStr {
  106. case "daily":
  107. coeff = util.HoursPerDay
  108. case "monthly":
  109. coeff = util.HoursPerMonth
  110. }
  111. return coeff / a.TotalHours(resolutionHours)
  112. }
  113. type SharedResourceInfo struct {
  114. ShareResources bool
  115. SharedNamespace map[string]bool
  116. LabelSelectors map[string]map[string]bool
  117. }
  118. type SharedCostInfo struct {
  119. Name string
  120. Cost float64
  121. ShareType string
  122. }
  123. func (s *SharedResourceInfo) IsSharedResource(costDatum *CostData) bool {
  124. // exists in a shared namespace
  125. if _, ok := s.SharedNamespace[costDatum.Namespace]; ok {
  126. return true
  127. }
  128. // has at least one shared label (OR, not AND in the case of multiple labels)
  129. for labelName, labelValues := range s.LabelSelectors {
  130. if val, ok := costDatum.Labels[labelName]; ok && labelValues[val] {
  131. return true
  132. }
  133. }
  134. return false
  135. }
  136. func NewSharedResourceInfo(shareResources bool, sharedNamespaces []string, labelNames []string, labelValues []string) *SharedResourceInfo {
  137. sr := &SharedResourceInfo{
  138. ShareResources: shareResources,
  139. SharedNamespace: make(map[string]bool),
  140. LabelSelectors: make(map[string]map[string]bool),
  141. }
  142. for _, ns := range sharedNamespaces {
  143. sr.SharedNamespace[strings.Trim(ns, " ")] = true
  144. }
  145. // Creating a map of label name to label value, but only if
  146. // the cardinality matches
  147. if len(labelNames) == len(labelValues) {
  148. for i := range labelNames {
  149. cleanedLname := prom.SanitizeLabelName(strings.Trim(labelNames[i], " "))
  150. if values, ok := sr.LabelSelectors[cleanedLname]; ok {
  151. values[strings.Trim(labelValues[i], " ")] = true
  152. } else {
  153. sr.LabelSelectors[cleanedLname] = map[string]bool{strings.Trim(labelValues[i], " "): true}
  154. }
  155. }
  156. }
  157. return sr
  158. }
  159. func GetTotalContainerCost(costData map[string]*CostData, rate string, cp cloud.Provider, discount float64, customDiscount float64, idleCoefficients map[string]float64) float64 {
  160. totalContainerCost := 0.0
  161. for _, costDatum := range costData {
  162. clusterID := costDatum.ClusterID
  163. cpuv, ramv, gpuv, pvvs, netv := getPriceVectors(cp, costDatum, rate, discount, customDiscount, idleCoefficients[clusterID])
  164. totalContainerCost += totalVectors(cpuv)
  165. totalContainerCost += totalVectors(ramv)
  166. totalContainerCost += totalVectors(gpuv)
  167. for _, pv := range pvvs {
  168. totalContainerCost += totalVectors(pv)
  169. }
  170. totalContainerCost += totalVectors(netv)
  171. }
  172. return totalContainerCost
  173. }
  174. func (a *Accesses) ComputeIdleCoefficient(costData map[string]*CostData, cli prometheusClient.Client, cp cloud.Provider, discount float64, customDiscount float64, windowString, offset string) (map[string]float64, error) {
  175. coefficients := make(map[string]float64)
  176. profileName := "ComputeIdleCoefficient: ComputeClusterCosts"
  177. profileStart := time.Now()
  178. var clusterCosts map[string]*ClusterCosts
  179. var err error
  180. key := fmt.Sprintf("%s:%s", windowString, offset)
  181. if data, valid := a.ClusterCostsCache.Get(key); valid {
  182. clusterCosts = data.(map[string]*ClusterCosts)
  183. } else {
  184. clusterCosts, err = a.ComputeClusterCosts(cli, cp, windowString, offset, false)
  185. if err != nil {
  186. return nil, err
  187. }
  188. }
  189. measureTime(profileStart, profileThreshold, profileName)
  190. for cid, costs := range clusterCosts {
  191. if costs.CPUCumulative == 0 && costs.RAMCumulative == 0 && costs.StorageCumulative == 0 {
  192. klog.V(1).Infof("[Warning] No ClusterCosts data for cluster '%s'. Is it emitting data?", cid)
  193. coefficients[cid] = 1.0
  194. continue
  195. }
  196. if costs.TotalCumulative == 0 {
  197. return nil, fmt.Errorf("TotalCumulative cluster cost for cluster '%s' returned 0 over window '%s' offset '%s'", cid, windowString, offset)
  198. }
  199. totalContainerCost := 0.0
  200. for _, costDatum := range costData {
  201. if costDatum.ClusterID == cid {
  202. cpuv, ramv, gpuv, pvvs, _ := getPriceVectors(cp, costDatum, "", discount, customDiscount, 1)
  203. totalContainerCost += totalVectors(cpuv)
  204. totalContainerCost += totalVectors(ramv)
  205. totalContainerCost += totalVectors(gpuv)
  206. for _, pv := range pvvs {
  207. totalContainerCost += totalVectors(pv)
  208. }
  209. }
  210. }
  211. coeff := totalContainerCost / costs.TotalCumulative
  212. coefficients[cid] = coeff
  213. }
  214. return coefficients, nil
  215. }
  216. // AggregationOptions provides optional parameters to AggregateCostData, allowing callers to perform more complex operations
  217. type AggregationOptions struct {
  218. Discount float64 // percent by which to discount CPU, RAM, and GPU cost
  219. CustomDiscount float64 // additional custom discount applied to all prices
  220. IdleCoefficients map[string]float64 // scales costs by amount of idle resources on a per-cluster basis
  221. IncludeEfficiency bool // set to true to receive efficiency/usage data
  222. IncludeTimeSeries bool // set to true to receive time series data
  223. Rate string // set to "hourly", "daily", or "monthly" to receive cost rate, rather than cumulative cost
  224. ResolutionHours float64
  225. SharedResourceInfo *SharedResourceInfo
  226. SharedCosts map[string]*SharedCostInfo
  227. FilteredContainerCount int
  228. FilteredEnvironments map[string]int
  229. SharedSplit string
  230. TotalContainerCost float64
  231. }
  232. // Helper method to test request/usgae values against allocation averages for efficiency scores. Generate a warning log if
  233. // clamp is required
  234. func clampAverage(requestsAvg float64, usedAverage float64, allocationAvg float64, resource string) (float64, float64) {
  235. rAvg := requestsAvg
  236. if rAvg > allocationAvg {
  237. klog.V(4).Infof("[Warning] Average %s Requested (%f) > Average %s Allocated (%f). Clamping.", resource, rAvg, resource, allocationAvg)
  238. rAvg = allocationAvg
  239. }
  240. uAvg := usedAverage
  241. if uAvg > allocationAvg {
  242. klog.V(4).Infof("[Warning]: Average %s Used (%f) > Average %s Allocated (%f). Clamping.", resource, uAvg, resource, allocationAvg)
  243. uAvg = allocationAvg
  244. }
  245. return rAvg, uAvg
  246. }
  247. // AggregateCostData aggregates raw cost data by field; e.g. namespace, cluster, service, or label. In the case of label, callers
  248. // must pass a slice of subfields indicating the labels by which to group. Provider is used to define custom resource pricing.
  249. // See AggregationOptions for optional parameters.
  250. func AggregateCostData(costData map[string]*CostData, field string, subfields []string, cp cloud.Provider, opts *AggregationOptions) map[string]*Aggregation {
  251. discount := opts.Discount
  252. customDiscount := opts.CustomDiscount
  253. idleCoefficients := opts.IdleCoefficients
  254. includeTimeSeries := opts.IncludeTimeSeries
  255. includeEfficiency := opts.IncludeEfficiency
  256. rate := opts.Rate
  257. sr := opts.SharedResourceInfo
  258. resolutionHours := 1.0
  259. if opts.ResolutionHours > 0.0 {
  260. resolutionHours = opts.ResolutionHours
  261. }
  262. if idleCoefficients == nil {
  263. idleCoefficients = make(map[string]float64)
  264. }
  265. // aggregations collects key-value pairs of resource group-to-aggregated data
  266. // e.g. namespace-to-data or label-value-to-data
  267. aggregations := make(map[string]*Aggregation)
  268. // sharedResourceCost is the running total cost of resources that should be reported
  269. // as shared across all other resources, rather than reported as a stand-alone category
  270. sharedResourceCost := 0.0
  271. for _, costDatum := range costData {
  272. idleCoefficient, ok := idleCoefficients[costDatum.ClusterID]
  273. if !ok {
  274. idleCoefficient = 1.0
  275. }
  276. if sr != nil && sr.ShareResources && sr.IsSharedResource(costDatum) {
  277. cpuv, ramv, gpuv, pvvs, netv := getPriceVectors(cp, costDatum, rate, discount, customDiscount, idleCoefficient)
  278. sharedResourceCost += totalVectors(cpuv)
  279. sharedResourceCost += totalVectors(ramv)
  280. sharedResourceCost += totalVectors(gpuv)
  281. sharedResourceCost += totalVectors(netv)
  282. for _, pv := range pvvs {
  283. sharedResourceCost += totalVectors(pv)
  284. }
  285. } else {
  286. if field == "cluster" {
  287. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.ClusterID, discount, customDiscount, idleCoefficient, false)
  288. } else if field == "node" {
  289. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.NodeName, discount, customDiscount, idleCoefficient, false)
  290. } else if field == "namespace" {
  291. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace, discount, customDiscount, idleCoefficient, false)
  292. } else if field == "service" {
  293. if len(costDatum.Services) > 0 {
  294. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace+"/"+costDatum.Services[0], discount, customDiscount, idleCoefficient, false)
  295. } else {
  296. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, UnallocatedSubfield, discount, customDiscount, idleCoefficient, false)
  297. }
  298. } else if field == "deployment" {
  299. if len(costDatum.Deployments) > 0 {
  300. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace+"/"+costDatum.Deployments[0], discount, customDiscount, idleCoefficient, false)
  301. } else {
  302. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, UnallocatedSubfield, discount, customDiscount, idleCoefficient, false)
  303. }
  304. } else if field == "statefulset" {
  305. if len(costDatum.Statefulsets) > 0 {
  306. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace+"/"+costDatum.Statefulsets[0], discount, customDiscount, idleCoefficient, false)
  307. } else {
  308. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, UnallocatedSubfield, discount, customDiscount, idleCoefficient, false)
  309. }
  310. } else if field == "daemonset" {
  311. if len(costDatum.Daemonsets) > 0 {
  312. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace+"/"+costDatum.Daemonsets[0], discount, customDiscount, idleCoefficient, false)
  313. } else {
  314. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, UnallocatedSubfield, discount, customDiscount, idleCoefficient, false)
  315. }
  316. } else if field == "controller" {
  317. if controller, kind, hasController := costDatum.GetController(); hasController {
  318. key := fmt.Sprintf("%s/%s:%s", costDatum.Namespace, kind, controller)
  319. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, key, discount, customDiscount, idleCoefficient, false)
  320. } else {
  321. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, UnallocatedSubfield, discount, customDiscount, idleCoefficient, false)
  322. }
  323. } else if field == "label" {
  324. found := false
  325. if costDatum.Labels != nil {
  326. for _, sf := range subfields {
  327. if subfieldName, ok := costDatum.Labels[sf]; ok {
  328. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, subfieldName, discount, customDiscount, idleCoefficient, false)
  329. found = true
  330. break
  331. }
  332. }
  333. }
  334. if !found {
  335. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, UnallocatedSubfield, discount, customDiscount, idleCoefficient, false)
  336. }
  337. } else if field == "pod" {
  338. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace+"/"+costDatum.PodName, discount, customDiscount, idleCoefficient, false)
  339. } else if field == "container" {
  340. key := fmt.Sprintf("%s/%s/%s/%s", costDatum.ClusterID, costDatum.Namespace, costDatum.PodName, costDatum.Name)
  341. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, key, discount, customDiscount, idleCoefficient, true)
  342. }
  343. }
  344. }
  345. for key, agg := range aggregations {
  346. sharedCoefficient := 1 / float64(len(opts.FilteredEnvironments)+len(aggregations))
  347. agg.CPUCost = totalVectors(agg.CPUCostVector)
  348. agg.RAMCost = totalVectors(agg.RAMCostVector)
  349. agg.GPUCost = totalVectors(agg.GPUCostVector)
  350. agg.PVCost = totalVectors(agg.PVCostVector)
  351. agg.NetworkCost = totalVectors(agg.NetworkCostVector)
  352. if opts.SharedSplit == SplitTypeWeighted {
  353. d := opts.TotalContainerCost - sharedResourceCost
  354. if d == 0 {
  355. klog.V(1).Infof("[Warning] Total container cost '%f' and shared resource cost '%f are the same'. Setting sharedCoefficient to 1", opts.TotalContainerCost, sharedResourceCost)
  356. sharedCoefficient = 1.0
  357. } else {
  358. sharedCoefficient = (agg.CPUCost + agg.RAMCost + agg.GPUCost + agg.PVCost + agg.NetworkCost) / d
  359. }
  360. }
  361. agg.SharedCost = sharedResourceCost * sharedCoefficient
  362. for _, v := range opts.SharedCosts {
  363. agg.SharedCost += v.Cost * sharedCoefficient
  364. }
  365. if rate != "" {
  366. rateCoeff := agg.RateCoefficient(rate, resolutionHours)
  367. agg.CPUCost *= rateCoeff
  368. agg.RAMCost *= rateCoeff
  369. agg.GPUCost *= rateCoeff
  370. agg.PVCost *= rateCoeff
  371. agg.NetworkCost *= rateCoeff
  372. agg.SharedCost *= rateCoeff
  373. }
  374. agg.TotalCost = agg.CPUCost + agg.RAMCost + agg.GPUCost + agg.PVCost + agg.NetworkCost + agg.SharedCost
  375. // Evicted and Completed Pods can still show up here, but have 0 cost.
  376. // Filter these by default. Any reason to keep them?
  377. if agg.TotalCost == 0 {
  378. delete(aggregations, key)
  379. continue
  380. }
  381. // CPU, RAM, and PV allocation are cumulative per-datum, whereas GPU is rate per-datum
  382. agg.CPUAllocationHourlyAverage = totalVectors(agg.CPUAllocationVectors) / agg.TotalHours(resolutionHours)
  383. agg.RAMAllocationHourlyAverage = totalVectors(agg.RAMAllocationVectors) / agg.TotalHours(resolutionHours)
  384. agg.GPUAllocationHourlyAverage = averageVectors(agg.GPUAllocationVectors)
  385. agg.PVAllocationHourlyAverage = totalVectors(agg.PVAllocationVectors) / agg.TotalHours(resolutionHours)
  386. // TODO niko/etl does this check out for GPU data? Do we need to rewrite GPU queries to be
  387. // culumative?
  388. agg.CPUAllocationTotal = totalVectors(agg.CPUAllocationVectors)
  389. agg.GPUAllocationTotal = totalVectors(agg.GPUAllocationVectors)
  390. agg.PVAllocationTotal = totalVectors(agg.PVAllocationVectors)
  391. agg.RAMAllocationTotal = totalVectors(agg.RAMAllocationVectors)
  392. if includeEfficiency {
  393. // Default both RAM and CPU to 0% efficiency so that a 0-requested, 0-allocated, 0-used situation
  394. // returns 0% efficiency, which should be a red-flag.
  395. //
  396. // If non-zero numbers are available, then efficiency is defined as:
  397. // idlePercentage = (requested - used) / allocated
  398. // efficiency = (1.0 - idlePercentage)
  399. //
  400. // It is possible to score > 100% efficiency, which is meant to be interpreted as a red flag.
  401. // It is not possible to score < 0% efficiency.
  402. agg.CPUEfficiency = 0.0
  403. CPUIdle := 0.0
  404. if agg.CPUAllocationHourlyAverage > 0.0 {
  405. avgCPURequested := averageVectors(agg.CPURequestedVectors)
  406. avgCPUUsed := averageVectors(agg.CPUUsedVectors)
  407. // Clamp averages, log range violations
  408. avgCPURequested, avgCPUUsed = clampAverage(avgCPURequested, avgCPUUsed, agg.CPUAllocationHourlyAverage, "CPU")
  409. CPUIdle = ((avgCPURequested - avgCPUUsed) / agg.CPUAllocationHourlyAverage)
  410. agg.CPUEfficiency = 1.0 - CPUIdle
  411. }
  412. agg.RAMEfficiency = 0.0
  413. RAMIdle := 0.0
  414. if agg.RAMAllocationHourlyAverage > 0.0 {
  415. avgRAMRequested := averageVectors(agg.RAMRequestedVectors)
  416. avgRAMUsed := averageVectors(agg.RAMUsedVectors)
  417. // Clamp averages, log range violations
  418. avgRAMRequested, avgRAMUsed = clampAverage(avgRAMRequested, avgRAMUsed, agg.RAMAllocationHourlyAverage, "RAM")
  419. RAMIdle = ((avgRAMRequested - avgRAMUsed) / agg.RAMAllocationHourlyAverage)
  420. agg.RAMEfficiency = 1.0 - RAMIdle
  421. }
  422. // Score total efficiency by the sum of CPU and RAM efficiency, weighted by their
  423. // respective total costs.
  424. agg.Efficiency = 0.0
  425. if (agg.CPUCost + agg.RAMCost) > 0 {
  426. agg.Efficiency = ((agg.CPUCost * agg.CPUEfficiency) + (agg.RAMCost * agg.RAMEfficiency)) / (agg.CPUCost + agg.RAMCost)
  427. }
  428. }
  429. // convert RAM from bytes to GiB
  430. agg.RAMAllocationHourlyAverage = agg.RAMAllocationHourlyAverage / 1024 / 1024 / 1024
  431. // convert storage from bytes to GiB
  432. agg.PVAllocationHourlyAverage = agg.PVAllocationHourlyAverage / 1024 / 1024 / 1024
  433. // remove time series data if it is not explicitly requested
  434. if !includeTimeSeries {
  435. agg.CPUCostVector = nil
  436. agg.RAMCostVector = nil
  437. agg.GPUCostVector = nil
  438. agg.PVCostVector = nil
  439. agg.NetworkCostVector = nil
  440. agg.TotalCostVector = nil
  441. } else { // otherwise compute a totalcostvector
  442. v1 := addVectors(agg.CPUCostVector, agg.RAMCostVector)
  443. v2 := addVectors(v1, agg.GPUCostVector)
  444. v3 := addVectors(v2, agg.PVCostVector)
  445. v4 := addVectors(v3, agg.NetworkCostVector)
  446. agg.TotalCostVector = v4
  447. }
  448. // Typesafety checks
  449. if math.IsNaN(agg.CPUAllocationHourlyAverage) || math.IsInf(agg.CPUAllocationHourlyAverage, 0) {
  450. klog.V(1).Infof("[Warning] CPUAllocationHourlyAverage is %f for '%s: %s/%s'", agg.CPUAllocationHourlyAverage, agg.Cluster, agg.Aggregator, agg.Environment)
  451. agg.CPUAllocationHourlyAverage = 0
  452. }
  453. if math.IsNaN(agg.CPUCost) || math.IsInf(agg.CPUCost, 0) {
  454. klog.V(1).Infof("[Warning] CPUCost is %f for '%s: %s/%s'", agg.CPUCost, agg.Cluster, agg.Aggregator, agg.Environment)
  455. agg.CPUCost = 0
  456. }
  457. if math.IsNaN(agg.CPUEfficiency) || math.IsInf(agg.CPUEfficiency, 0) {
  458. klog.V(1).Infof("[Warning] CPUEfficiency is %f for '%s: %s/%s'", agg.CPUEfficiency, agg.Cluster, agg.Aggregator, agg.Environment)
  459. agg.CPUEfficiency = 0
  460. }
  461. if math.IsNaN(agg.Efficiency) || math.IsInf(agg.Efficiency, 0) {
  462. klog.V(1).Infof("[Warning] Efficiency is %f for '%s: %s/%s'", agg.Efficiency, agg.Cluster, agg.Aggregator, agg.Environment)
  463. agg.Efficiency = 0
  464. }
  465. if math.IsNaN(agg.GPUAllocationHourlyAverage) || math.IsInf(agg.GPUAllocationHourlyAverage, 0) {
  466. klog.V(1).Infof("[Warning] GPUAllocationHourlyAverage is %f for '%s: %s/%s'", agg.GPUAllocationHourlyAverage, agg.Cluster, agg.Aggregator, agg.Environment)
  467. agg.GPUAllocationHourlyAverage = 0
  468. }
  469. if math.IsNaN(agg.GPUCost) || math.IsInf(agg.GPUCost, 0) {
  470. klog.V(1).Infof("[Warning] GPUCost is %f for '%s: %s/%s'", agg.GPUCost, agg.Cluster, agg.Aggregator, agg.Environment)
  471. agg.GPUCost = 0
  472. }
  473. if math.IsNaN(agg.RAMAllocationHourlyAverage) || math.IsInf(agg.RAMAllocationHourlyAverage, 0) {
  474. klog.V(1).Infof("[Warning] RAMAllocationHourlyAverage is %f for '%s: %s/%s'", agg.RAMAllocationHourlyAverage, agg.Cluster, agg.Aggregator, agg.Environment)
  475. agg.RAMAllocationHourlyAverage = 0
  476. }
  477. if math.IsNaN(agg.RAMCost) || math.IsInf(agg.RAMCost, 0) {
  478. klog.V(1).Infof("[Warning] RAMCost is %f for '%s: %s/%s'", agg.RAMCost, agg.Cluster, agg.Aggregator, agg.Environment)
  479. agg.RAMCost = 0
  480. }
  481. if math.IsNaN(agg.RAMEfficiency) || math.IsInf(agg.RAMEfficiency, 0) {
  482. klog.V(1).Infof("[Warning] RAMEfficiency is %f for '%s: %s/%s'", agg.RAMEfficiency, agg.Cluster, agg.Aggregator, agg.Environment)
  483. agg.RAMEfficiency = 0
  484. }
  485. if math.IsNaN(agg.PVAllocationHourlyAverage) || math.IsInf(agg.PVAllocationHourlyAverage, 0) {
  486. klog.V(1).Infof("[Warning] PVAllocationHourlyAverage is %f for '%s: %s/%s'", agg.PVAllocationHourlyAverage, agg.Cluster, agg.Aggregator, agg.Environment)
  487. agg.PVAllocationHourlyAverage = 0
  488. }
  489. if math.IsNaN(agg.PVCost) || math.IsInf(agg.PVCost, 0) {
  490. klog.V(1).Infof("[Warning] PVCost is %f for '%s: %s/%s'", agg.PVCost, agg.Cluster, agg.Aggregator, agg.Environment)
  491. agg.PVCost = 0
  492. }
  493. if math.IsNaN(agg.NetworkCost) || math.IsInf(agg.NetworkCost, 0) {
  494. klog.V(1).Infof("[Warning] NetworkCost is %f for '%s: %s/%s'", agg.NetworkCost, agg.Cluster, agg.Aggregator, agg.Environment)
  495. agg.NetworkCost = 0
  496. }
  497. if math.IsNaN(agg.SharedCost) || math.IsInf(agg.SharedCost, 0) {
  498. klog.V(1).Infof("[Warning] SharedCost is %f for '%s: %s/%s'", agg.SharedCost, agg.Cluster, agg.Aggregator, agg.Environment)
  499. agg.SharedCost = 0
  500. }
  501. if math.IsNaN(agg.TotalCost) || math.IsInf(agg.TotalCost, 0) {
  502. klog.V(1).Infof("[Warning] TotalCost is %f for '%s: %s/%s'", agg.TotalCost, agg.Cluster, agg.Aggregator, agg.Environment)
  503. agg.TotalCost = 0
  504. }
  505. }
  506. return aggregations
  507. }
  508. func aggregateDatum(cp cloud.Provider, aggregations map[string]*Aggregation, costDatum *CostData, field string, subfields []string, rate string, key string, discount float64, customDiscount float64, idleCoefficient float64, includeProperties bool) {
  509. // add new entry to aggregation results if a new key is encountered
  510. if _, ok := aggregations[key]; !ok {
  511. agg := &Aggregation{
  512. Aggregator: field,
  513. Environment: key,
  514. }
  515. if len(subfields) > 0 {
  516. agg.Subfields = subfields
  517. }
  518. if includeProperties {
  519. props := &kubecost.Properties{}
  520. props.SetCluster(costDatum.ClusterID)
  521. props.SetNode(costDatum.NodeName)
  522. if controller, kind, hasController := costDatum.GetController(); hasController {
  523. props.SetController(controller)
  524. props.SetControllerKind(kind)
  525. }
  526. props.SetLabels(costDatum.Labels)
  527. props.SetNamespace(costDatum.Namespace)
  528. props.SetPod(costDatum.PodName)
  529. props.SetServices(costDatum.Services)
  530. props.SetContainer(costDatum.Name)
  531. agg.Properties = props
  532. }
  533. aggregations[key] = agg
  534. }
  535. mergeVectors(cp, costDatum, aggregations[key], rate, discount, customDiscount, idleCoefficient)
  536. }
  537. func mergeVectors(cp cloud.Provider, costDatum *CostData, aggregation *Aggregation, rate string, discount float64, customDiscount float64, idleCoefficient float64) {
  538. aggregation.CPUAllocationVectors = addVectors(costDatum.CPUAllocation, aggregation.CPUAllocationVectors)
  539. aggregation.CPURequestedVectors = addVectors(costDatum.CPUReq, aggregation.CPURequestedVectors)
  540. aggregation.CPUUsedVectors = addVectors(costDatum.CPUUsed, aggregation.CPUUsedVectors)
  541. aggregation.RAMAllocationVectors = addVectors(costDatum.RAMAllocation, aggregation.RAMAllocationVectors)
  542. aggregation.RAMRequestedVectors = addVectors(costDatum.RAMReq, aggregation.RAMRequestedVectors)
  543. aggregation.RAMUsedVectors = addVectors(costDatum.RAMUsed, aggregation.RAMUsedVectors)
  544. aggregation.GPUAllocationVectors = addVectors(costDatum.GPUReq, aggregation.GPUAllocationVectors)
  545. for _, pvcd := range costDatum.PVCData {
  546. aggregation.PVAllocationVectors = addVectors(pvcd.Values, aggregation.PVAllocationVectors)
  547. }
  548. cpuv, ramv, gpuv, pvvs, netv := getPriceVectors(cp, costDatum, rate, discount, customDiscount, idleCoefficient)
  549. aggregation.CPUCostVector = addVectors(cpuv, aggregation.CPUCostVector)
  550. aggregation.RAMCostVector = addVectors(ramv, aggregation.RAMCostVector)
  551. aggregation.GPUCostVector = addVectors(gpuv, aggregation.GPUCostVector)
  552. aggregation.NetworkCostVector = addVectors(netv, aggregation.NetworkCostVector)
  553. for _, vectorList := range pvvs {
  554. aggregation.PVCostVector = addVectors(aggregation.PVCostVector, vectorList)
  555. }
  556. }
  557. // Returns the blended discounts applied to the node as a result of global discounts and reserved instance
  558. // discounts
  559. func getDiscounts(costDatum *CostData, cpuCost float64, ramCost float64, discount float64) (float64, float64) {
  560. if costDatum.NodeData == nil {
  561. return discount, discount
  562. }
  563. if costDatum.NodeData.IsSpot() {
  564. return 0, 0
  565. }
  566. reserved := costDatum.NodeData.Reserved
  567. // blended discounts
  568. blendedCPUDiscount := discount
  569. blendedRAMDiscount := discount
  570. if reserved != nil && reserved.CPUCost > 0 && reserved.RAMCost > 0 {
  571. reservedCPUDiscount := 0.0
  572. if cpuCost == 0 {
  573. klog.V(1).Infof("[Warning] No cpu cost found for cluster '%s' node '%s'", costDatum.ClusterID, costDatum.NodeName)
  574. } else {
  575. reservedCPUDiscount = 1.0 - (reserved.CPUCost / cpuCost)
  576. }
  577. reservedRAMDiscount := 0.0
  578. if ramCost == 0 {
  579. klog.V(1).Infof("[Warning] No ram cost found for cluster '%s' node '%s'", costDatum.ClusterID, costDatum.NodeName)
  580. } else {
  581. reservedRAMDiscount = 1.0 - (reserved.RAMCost / ramCost)
  582. }
  583. // AWS passes the # of reserved CPU and RAM as -1 to represent "All"
  584. if reserved.ReservedCPU < 0 && reserved.ReservedRAM < 0 {
  585. blendedCPUDiscount = reservedCPUDiscount
  586. blendedRAMDiscount = reservedRAMDiscount
  587. } else {
  588. nodeCPU, ierr := strconv.ParseInt(costDatum.NodeData.VCPU, 10, 64)
  589. nodeRAM, ferr := strconv.ParseFloat(costDatum.NodeData.RAMBytes, 64)
  590. if ierr == nil && ferr == nil {
  591. nodeRAMGB := nodeRAM / 1024 / 1024 / 1024
  592. reservedRAMGB := float64(reserved.ReservedRAM) / 1024 / 1024 / 1024
  593. nonReservedCPU := nodeCPU - reserved.ReservedCPU
  594. nonReservedRAM := nodeRAMGB - reservedRAMGB
  595. if nonReservedCPU == 0 {
  596. blendedCPUDiscount = reservedCPUDiscount
  597. } else {
  598. if nodeCPU == 0 {
  599. klog.V(1).Infof("[Warning] No ram found for cluster '%s' node '%s'", costDatum.ClusterID, costDatum.NodeName)
  600. } else {
  601. blendedCPUDiscount = (float64(reserved.ReservedCPU) * reservedCPUDiscount) + (float64(nonReservedCPU)*discount)/float64(nodeCPU)
  602. }
  603. }
  604. if nonReservedRAM == 0 {
  605. blendedRAMDiscount = reservedRAMDiscount
  606. } else {
  607. if nodeRAMGB == 0 {
  608. klog.V(1).Infof("[Warning] No ram found for cluster '%s' node '%s'", costDatum.ClusterID, costDatum.NodeName)
  609. } else {
  610. blendedRAMDiscount = (reservedRAMGB * reservedRAMDiscount) + (nonReservedRAM*discount)/nodeRAMGB
  611. }
  612. }
  613. }
  614. }
  615. }
  616. return blendedCPUDiscount, blendedRAMDiscount
  617. }
  618. func parseVectorPricing(cfg *cloud.CustomPricing, costDatum *CostData, cpuCostStr, ramCostStr, gpuCostStr, pvCostStr string) (float64, float64, float64, float64, bool) {
  619. usesCustom := false
  620. cpuCost, err := strconv.ParseFloat(cpuCostStr, 64)
  621. if err != nil || math.IsNaN(cpuCost) || math.IsInf(cpuCost, 0) || cpuCost == 0 {
  622. cpuCost, err = strconv.ParseFloat(cfg.CPU, 64)
  623. usesCustom = true
  624. if err != nil || math.IsNaN(cpuCost) || math.IsInf(cpuCost, 0) {
  625. cpuCost = 0
  626. }
  627. }
  628. ramCost, err := strconv.ParseFloat(ramCostStr, 64)
  629. if err != nil || math.IsNaN(ramCost) || math.IsInf(ramCost, 0) || ramCost == 0 {
  630. ramCost, err = strconv.ParseFloat(cfg.RAM, 64)
  631. usesCustom = true
  632. if err != nil || math.IsNaN(ramCost) || math.IsInf(ramCost, 0) {
  633. ramCost = 0
  634. }
  635. }
  636. gpuCost, err := strconv.ParseFloat(gpuCostStr, 64)
  637. if err != nil || math.IsNaN(gpuCost) || math.IsInf(gpuCost, 0) {
  638. gpuCost, err = strconv.ParseFloat(cfg.GPU, 64)
  639. if err != nil || math.IsNaN(gpuCost) || math.IsInf(gpuCost, 0) {
  640. gpuCost = 0
  641. }
  642. }
  643. pvCost, err := strconv.ParseFloat(pvCostStr, 64)
  644. if err != nil || math.IsNaN(cpuCost) || math.IsInf(cpuCost, 0) {
  645. pvCost, err = strconv.ParseFloat(cfg.Storage, 64)
  646. if err != nil || math.IsNaN(pvCost) || math.IsInf(pvCost, 0) {
  647. pvCost = 0
  648. }
  649. }
  650. return cpuCost, ramCost, gpuCost, pvCost, usesCustom
  651. }
  652. func getPriceVectors(cp cloud.Provider, costDatum *CostData, rate string, discount float64, customDiscount float64, idleCoefficient float64) ([]*util.Vector, []*util.Vector, []*util.Vector, [][]*util.Vector, []*util.Vector) {
  653. var cpuCost float64
  654. var ramCost float64
  655. var gpuCost float64
  656. var pvCost float64
  657. var usesCustom bool
  658. // If custom pricing is enabled and can be retrieved, replace
  659. // default cost values with custom values
  660. customPricing, err := cp.GetConfig()
  661. if err != nil {
  662. klog.Errorf("failed to load custom pricing: %s", err)
  663. }
  664. if cloud.CustomPricesEnabled(cp) && err == nil {
  665. var cpuCostStr string
  666. var ramCostStr string
  667. var gpuCostStr string
  668. var pvCostStr string
  669. if costDatum.NodeData.IsSpot() {
  670. cpuCostStr = customPricing.SpotCPU
  671. ramCostStr = customPricing.SpotRAM
  672. gpuCostStr = customPricing.SpotGPU
  673. } else {
  674. cpuCostStr = customPricing.CPU
  675. ramCostStr = customPricing.RAM
  676. gpuCostStr = customPricing.GPU
  677. }
  678. pvCostStr = customPricing.Storage
  679. cpuCost, ramCost, gpuCost, pvCost, usesCustom = parseVectorPricing(customPricing, costDatum, cpuCostStr, ramCostStr, gpuCostStr, pvCostStr)
  680. } else if costDatum.NodeData == nil && err == nil {
  681. cpuCostStr := customPricing.CPU
  682. ramCostStr := customPricing.RAM
  683. gpuCostStr := customPricing.GPU
  684. pvCostStr := customPricing.Storage
  685. cpuCost, ramCost, gpuCost, pvCost, usesCustom = parseVectorPricing(customPricing, costDatum, cpuCostStr, ramCostStr, gpuCostStr, pvCostStr)
  686. } else {
  687. cpuCostStr := costDatum.NodeData.VCPUCost
  688. ramCostStr := costDatum.NodeData.RAMCost
  689. gpuCostStr := costDatum.NodeData.GPUCost
  690. pvCostStr := costDatum.NodeData.StorageCost
  691. cpuCost, ramCost, gpuCost, pvCost, usesCustom = parseVectorPricing(customPricing, costDatum, cpuCostStr, ramCostStr, gpuCostStr, pvCostStr)
  692. }
  693. if usesCustom {
  694. log.DedupedWarningf(5, "No pricing data found for node `%s` , using custom pricing", costDatum.NodeName)
  695. }
  696. cpuDiscount, ramDiscount := getDiscounts(costDatum, cpuCost, ramCost, discount)
  697. klog.V(4).Infof("Node Name: %s", costDatum.NodeName)
  698. klog.V(4).Infof("Blended CPU Discount: %f", cpuDiscount)
  699. klog.V(4).Infof("Blended RAM Discount: %f", ramDiscount)
  700. // TODO should we try to apply the rate coefficient here or leave it as a totals-only metric?
  701. rateCoeff := 1.0
  702. if idleCoefficient == 0 {
  703. idleCoefficient = 1.0
  704. }
  705. cpuv := make([]*util.Vector, 0, len(costDatum.CPUAllocation))
  706. for _, val := range costDatum.CPUAllocation {
  707. cpuv = append(cpuv, &util.Vector{
  708. Timestamp: math.Round(val.Timestamp/10) * 10,
  709. Value: (val.Value * cpuCost * (1 - cpuDiscount) * (1 - customDiscount) / idleCoefficient) * rateCoeff,
  710. })
  711. }
  712. ramv := make([]*util.Vector, 0, len(costDatum.RAMAllocation))
  713. for _, val := range costDatum.RAMAllocation {
  714. ramv = append(ramv, &util.Vector{
  715. Timestamp: math.Round(val.Timestamp/10) * 10,
  716. Value: ((val.Value / 1024 / 1024 / 1024) * ramCost * (1 - ramDiscount) * (1 - customDiscount) / idleCoefficient) * rateCoeff,
  717. })
  718. }
  719. gpuv := make([]*util.Vector, 0, len(costDatum.GPUReq))
  720. for _, val := range costDatum.GPUReq {
  721. gpuv = append(gpuv, &util.Vector{
  722. Timestamp: math.Round(val.Timestamp/10) * 10,
  723. Value: (val.Value * gpuCost * (1 - discount) * (1 - customDiscount) / idleCoefficient) * rateCoeff,
  724. })
  725. }
  726. pvvs := make([][]*util.Vector, 0, len(costDatum.PVCData))
  727. for _, pvcData := range costDatum.PVCData {
  728. pvv := make([]*util.Vector, 0, len(pvcData.Values))
  729. if pvcData.Volume != nil {
  730. cost, _ := strconv.ParseFloat(pvcData.Volume.Cost, 64)
  731. // override with custom pricing if enabled
  732. if cloud.CustomPricesEnabled(cp) {
  733. cost = pvCost
  734. }
  735. for _, val := range pvcData.Values {
  736. pvv = append(pvv, &util.Vector{
  737. Timestamp: math.Round(val.Timestamp/10) * 10,
  738. Value: ((val.Value / 1024 / 1024 / 1024) * cost * (1 - customDiscount) / idleCoefficient) * rateCoeff,
  739. })
  740. }
  741. pvvs = append(pvvs, pvv)
  742. }
  743. }
  744. netv := make([]*util.Vector, 0, len(costDatum.NetworkData))
  745. for _, val := range costDatum.NetworkData {
  746. netv = append(netv, &util.Vector{
  747. Timestamp: math.Round(val.Timestamp/10) * 10,
  748. Value: val.Value,
  749. })
  750. }
  751. return cpuv, ramv, gpuv, pvvs, netv
  752. }
  753. func averageVectors(vectors []*util.Vector) float64 {
  754. if len(vectors) == 0 {
  755. return 0.0
  756. }
  757. return totalVectors(vectors) / float64(len(vectors))
  758. }
  759. func totalVectors(vectors []*util.Vector) float64 {
  760. total := 0.0
  761. for _, vector := range vectors {
  762. total += vector.Value
  763. }
  764. return total
  765. }
  766. // addVectors adds two slices of Vectors. Vector timestamps are rounded to the
  767. // nearest ten seconds to allow matching of Vectors within a delta allowance.
  768. // Matching Vectors are summed, while unmatched Vectors are passed through.
  769. // e.g. [(t=1, 1), (t=2, 2)] + [(t=2, 2), (t=3, 3)] = [(t=1, 1), (t=2, 4), (t=3, 3)]
  770. func addVectors(xvs []*util.Vector, yvs []*util.Vector) []*util.Vector {
  771. sumOp := func(result *util.Vector, x *float64, y *float64) bool {
  772. if x != nil && y != nil {
  773. result.Value = *x + *y
  774. } else if y != nil {
  775. result.Value = *y
  776. } else if x != nil {
  777. result.Value = *x
  778. }
  779. return true
  780. }
  781. return util.ApplyVectorOp(xvs, yvs, sumOp)
  782. }
  783. // minCostDataLength sets the minimum number of time series data required to
  784. // cache both raw and aggregated cost data
  785. const minCostDataLength = 2
  786. // EmptyDataError describes an error caused by empty cost data for some
  787. // defined interval
  788. type EmptyDataError struct {
  789. err error
  790. window kubecost.Window
  791. }
  792. // Error implements the error interface
  793. func (ede *EmptyDataError) Error() string {
  794. err := fmt.Sprintf("empty data for range: %s", ede.window)
  795. if ede.err != nil {
  796. err += fmt.Sprintf(": %s", ede.err)
  797. }
  798. return err
  799. }
  800. func costDataTimeSeriesLength(costData map[string]*CostData) int {
  801. l := 0
  802. for _, cd := range costData {
  803. if l < len(cd.RAMAllocation) {
  804. l = len(cd.RAMAllocation)
  805. }
  806. if l < len(cd.CPUAllocation) {
  807. l = len(cd.CPUAllocation)
  808. }
  809. }
  810. return l
  811. }
  812. // ScaleHourlyCostData converts per-hour cost data to per-resolution data. If the target resolution is higher (i.e. < 1.0h)
  813. // then we can do simple multiplication by the fraction-of-an-hour and retain accuracy. If the target resolution is
  814. // lower (i.e. > 1.0h) then we sum groups of hourly data by resolution to maintain fidelity.
  815. // e.g. (100 hours of per-hour hourly data, resolutionHours=10) => 10 data points, grouped and summed by 10-hour window
  816. // e.g. (20 minutes of per-minute hourly data, resolutionHours=1/60) => 20 data points, scaled down by a factor of 60
  817. func ScaleHourlyCostData(data map[string]*CostData, resolutionHours float64) map[string]*CostData {
  818. scaled := map[string]*CostData{}
  819. for key, datum := range data {
  820. datum.RAMReq = scaleVectorSeries(datum.RAMReq, resolutionHours)
  821. datum.RAMUsed = scaleVectorSeries(datum.RAMUsed, resolutionHours)
  822. datum.RAMAllocation = scaleVectorSeries(datum.RAMAllocation, resolutionHours)
  823. datum.CPUReq = scaleVectorSeries(datum.CPUReq, resolutionHours)
  824. datum.CPUUsed = scaleVectorSeries(datum.CPUUsed, resolutionHours)
  825. datum.CPUAllocation = scaleVectorSeries(datum.CPUAllocation, resolutionHours)
  826. datum.GPUReq = scaleVectorSeries(datum.GPUReq, resolutionHours)
  827. datum.NetworkData = scaleVectorSeries(datum.NetworkData, resolutionHours)
  828. for _, pvcDatum := range datum.PVCData {
  829. pvcDatum.Values = scaleVectorSeries(pvcDatum.Values, resolutionHours)
  830. }
  831. scaled[key] = datum
  832. }
  833. return scaled
  834. }
  835. func scaleVectorSeries(vs []*util.Vector, resolutionHours float64) []*util.Vector {
  836. // if scaling to a lower resolution, compress the hourly data for maximum accuracy
  837. if resolutionHours > 1.0 {
  838. return compressVectorSeries(vs, resolutionHours)
  839. }
  840. // if scaling to a higher resolution, simply scale each value down by the fraction of an hour
  841. for _, v := range vs {
  842. v.Value *= resolutionHours
  843. }
  844. return vs
  845. }
  846. func compressVectorSeries(vs []*util.Vector, resolutionHours float64) []*util.Vector {
  847. if len(vs) == 0 {
  848. return vs
  849. }
  850. compressed := []*util.Vector{}
  851. threshold := float64(60 * 60 * resolutionHours)
  852. var acc *util.Vector
  853. for i, v := range vs {
  854. if acc == nil {
  855. // start a new accumulation from current datum
  856. acc = &util.Vector{
  857. Value: vs[i].Value,
  858. Timestamp: vs[i].Timestamp,
  859. }
  860. continue
  861. }
  862. if v.Timestamp-acc.Timestamp < threshold {
  863. // v should be accumulated in current datum
  864. acc.Value += v.Value
  865. } else {
  866. // v falls outside current datum's threshold; append and start a new one
  867. compressed = append(compressed, acc)
  868. acc = &util.Vector{
  869. Value: vs[i].Value,
  870. Timestamp: vs[i].Timestamp,
  871. }
  872. }
  873. }
  874. // append any remaining, incomplete accumulation
  875. if acc != nil {
  876. compressed = append(compressed, acc)
  877. }
  878. return compressed
  879. }
  880. type AggregateQueryOpts struct {
  881. Rate string
  882. Filters map[string]string
  883. SharedResources *SharedResourceInfo
  884. ShareSplit string
  885. AllocateIdle bool
  886. IncludeTimeSeries bool
  887. IncludeEfficiency bool
  888. DisableCache bool
  889. ClearCache bool
  890. NoCache bool
  891. NoExpireCache bool
  892. RemoteEnabled bool
  893. DisableSharedOverhead bool
  894. UseETLAdapter bool
  895. }
  896. func DefaultAggregateQueryOpts() *AggregateQueryOpts {
  897. return &AggregateQueryOpts{
  898. Rate: "",
  899. Filters: map[string]string{},
  900. SharedResources: nil,
  901. ShareSplit: SplitTypeWeighted,
  902. AllocateIdle: false,
  903. IncludeTimeSeries: true,
  904. IncludeEfficiency: true,
  905. DisableCache: false,
  906. ClearCache: false,
  907. NoCache: false,
  908. NoExpireCache: false,
  909. RemoteEnabled: env.IsRemoteEnabled(),
  910. DisableSharedOverhead: false,
  911. UseETLAdapter: false,
  912. }
  913. }
  914. // ComputeAggregateCostModel computes cost data for the given window, then aggregates it by the given fields.
  915. // Data is cached on two levels: the aggregation is cached as well as the underlying cost data.
  916. func (a *Accesses) ComputeAggregateCostModel(promClient prometheusClient.Client, window kubecost.Window, field string, subfields []string, opts *AggregateQueryOpts) (map[string]*Aggregation, string, error) {
  917. // Window is the range of the query, i.e. (start, end)
  918. // It must be closed, i.e. neither start nor end can be nil
  919. if window.IsOpen() {
  920. return nil, "", fmt.Errorf("illegal window: %s", window)
  921. }
  922. // Resolution is the duration of each datum in the cost model range query,
  923. // which corresponds to both the step size given to Prometheus query_range
  924. // and to the window passed to the range queries.
  925. // i.e. by default, we support 1h resolution for queries of windows defined
  926. // in terms of days or integer multiples of hours (e.g. 1d, 12h)
  927. resolution := time.Hour
  928. // Determine resolution by size of duration and divisibility of window.
  929. // By default, resolution is 1hr. If the window is smaller than 1hr, then
  930. // resolution goes down to 1m. If the window is not a multiple of 1hr, then
  931. // resolution goes down to 1m. If the window is greater than 1d, then
  932. // resolution gets scaled up to improve performance by reducing the amount
  933. // of data being computed.
  934. durMins := int64(math.Trunc(window.Minutes()))
  935. if durMins < 24*60 { // less than 1d
  936. if durMins%60 != 0 { // not divisible by 1h
  937. resolution = time.Minute
  938. }
  939. } else { // greater than 1d
  940. if durMins >= 7*24*60 { // greater than (or equal to) 7 days
  941. resolution = 24.0 * time.Hour
  942. } else if durMins >= 2*24*60 { // greater than (or equal to) 2 days
  943. resolution = 3.0 * time.Hour
  944. }
  945. }
  946. // Parse options
  947. if opts == nil {
  948. opts = DefaultAggregateQueryOpts()
  949. }
  950. rate := opts.Rate
  951. filters := opts.Filters
  952. sri := opts.SharedResources
  953. shared := opts.ShareSplit
  954. allocateIdle := opts.AllocateIdle
  955. includeTimeSeries := opts.IncludeTimeSeries
  956. includeEfficiency := opts.IncludeEfficiency
  957. disableCache := opts.DisableCache
  958. clearCache := opts.ClearCache
  959. noCache := opts.NoCache
  960. noExpireCache := opts.NoExpireCache
  961. remoteEnabled := opts.RemoteEnabled
  962. disableSharedOverhead := opts.DisableSharedOverhead
  963. // retainFuncs override filterFuncs. Make sure shared resources do not
  964. // get filtered out.
  965. retainFuncs := []FilterFunc{}
  966. retainFuncs = append(retainFuncs, func(cd *CostData) (bool, string) {
  967. if sri != nil {
  968. return sri.IsSharedResource(cd), ""
  969. }
  970. return false, ""
  971. })
  972. // Parse cost data filters into FilterFuncs
  973. filterFuncs := []FilterFunc{}
  974. aggregateEnvironment := func(costDatum *CostData) string {
  975. if field == "cluster" {
  976. return costDatum.ClusterID
  977. } else if field == "node" {
  978. return costDatum.NodeName
  979. } else if field == "namespace" {
  980. return costDatum.Namespace
  981. } else if field == "service" {
  982. if len(costDatum.Services) > 0 {
  983. return costDatum.Namespace + "/" + costDatum.Services[0]
  984. }
  985. } else if field == "deployment" {
  986. if len(costDatum.Deployments) > 0 {
  987. return costDatum.Namespace + "/" + costDatum.Deployments[0]
  988. }
  989. } else if field == "daemonset" {
  990. if len(costDatum.Daemonsets) > 0 {
  991. return costDatum.Namespace + "/" + costDatum.Daemonsets[0]
  992. }
  993. } else if field == "statefulset" {
  994. if len(costDatum.Statefulsets) > 0 {
  995. return costDatum.Namespace + "/" + costDatum.Statefulsets[0]
  996. }
  997. } else if field == "label" {
  998. if costDatum.Labels != nil {
  999. for _, sf := range subfields {
  1000. if subfieldName, ok := costDatum.Labels[sf]; ok {
  1001. return fmt.Sprintf("%s=%s", sf, subfieldName)
  1002. }
  1003. }
  1004. }
  1005. } else if field == "pod" {
  1006. return costDatum.Namespace + "/" + costDatum.PodName
  1007. } else if field == "container" {
  1008. return costDatum.Namespace + "/" + costDatum.PodName + "/" + costDatum.Name
  1009. }
  1010. return ""
  1011. }
  1012. if filters["podprefix"] != "" {
  1013. pps := []string{}
  1014. for _, fp := range strings.Split(filters["podprefix"], ",") {
  1015. if fp != "" {
  1016. cleanedFilter := strings.TrimSpace(fp)
  1017. pps = append(pps, cleanedFilter)
  1018. }
  1019. }
  1020. filterFuncs = append(filterFuncs, func(cd *CostData) (bool, string) {
  1021. aggEnv := aggregateEnvironment(cd)
  1022. for _, pp := range pps {
  1023. cleanedFilter := strings.TrimSpace(pp)
  1024. if strings.HasPrefix(cd.PodName, cleanedFilter) {
  1025. return true, aggEnv
  1026. }
  1027. }
  1028. return false, aggEnv
  1029. })
  1030. }
  1031. if filters["namespace"] != "" {
  1032. // namespaces may be comma-separated, e.g. kubecost,default
  1033. // multiple namespaces are evaluated as an OR relationship
  1034. nss := strings.Split(filters["namespace"], ",")
  1035. filterFuncs = append(filterFuncs, func(cd *CostData) (bool, string) {
  1036. aggEnv := aggregateEnvironment(cd)
  1037. for _, ns := range nss {
  1038. nsTrim := strings.TrimSpace(ns)
  1039. if cd.Namespace == nsTrim {
  1040. return true, aggEnv
  1041. } else if strings.HasSuffix(nsTrim, "*") { // trigger wildcard prefix filtering
  1042. nsTrimAsterisk := strings.TrimSuffix(nsTrim, "*")
  1043. if strings.HasPrefix(cd.Namespace, nsTrimAsterisk) {
  1044. return true, aggEnv
  1045. }
  1046. }
  1047. }
  1048. return false, aggEnv
  1049. })
  1050. }
  1051. if filters["node"] != "" {
  1052. // nodes may be comma-separated, e.g. aws-node-1,aws-node-2
  1053. // multiple nodes are evaluated as an OR relationship
  1054. nodes := strings.Split(filters["node"], ",")
  1055. filterFuncs = append(filterFuncs, func(cd *CostData) (bool, string) {
  1056. aggEnv := aggregateEnvironment(cd)
  1057. for _, node := range nodes {
  1058. nodeTrim := strings.TrimSpace(node)
  1059. if cd.NodeName == nodeTrim {
  1060. return true, aggEnv
  1061. } else if strings.HasSuffix(nodeTrim, "*") { // trigger wildcard prefix filtering
  1062. nodeTrimAsterisk := strings.TrimSuffix(nodeTrim, "*")
  1063. if strings.HasPrefix(cd.NodeName, nodeTrimAsterisk) {
  1064. return true, aggEnv
  1065. }
  1066. }
  1067. }
  1068. return false, aggEnv
  1069. })
  1070. }
  1071. if filters["cluster"] != "" {
  1072. // clusters may be comma-separated, e.g. cluster-one,cluster-two
  1073. // multiple clusters are evaluated as an OR relationship
  1074. cs := strings.Split(filters["cluster"], ",")
  1075. filterFuncs = append(filterFuncs, func(cd *CostData) (bool, string) {
  1076. aggEnv := aggregateEnvironment(cd)
  1077. for _, c := range cs {
  1078. cTrim := strings.TrimSpace(c)
  1079. id, name := cd.ClusterID, cd.ClusterName
  1080. if id == cTrim || name == cTrim {
  1081. return true, aggEnv
  1082. } else if strings.HasSuffix(cTrim, "*") { // trigger wildcard prefix filtering
  1083. cTrimAsterisk := strings.TrimSuffix(cTrim, "*")
  1084. if strings.HasPrefix(id, cTrimAsterisk) || strings.HasPrefix(name, cTrimAsterisk) {
  1085. return true, aggEnv
  1086. }
  1087. }
  1088. }
  1089. return false, aggEnv
  1090. })
  1091. }
  1092. if filters["labels"] != "" {
  1093. // labels are expected to be comma-separated and to take the form key=value
  1094. // e.g. app=cost-analyzer,app.kubernetes.io/instance=kubecost
  1095. // each different label will be applied as an AND
  1096. // multiple values for a single label will be evaluated as an OR
  1097. labelValues := map[string][]string{}
  1098. ls := strings.Split(filters["labels"], ",")
  1099. for _, l := range ls {
  1100. lTrim := strings.TrimSpace(l)
  1101. label := strings.Split(lTrim, "=")
  1102. if len(label) == 2 {
  1103. ln := prom.SanitizeLabelName(strings.TrimSpace(label[0]))
  1104. lv := strings.TrimSpace(label[1])
  1105. labelValues[ln] = append(labelValues[ln], lv)
  1106. } else {
  1107. // label is not of the form name=value, so log it and move on
  1108. log.Warningf("ComputeAggregateCostModel: skipping illegal label filter: %s", l)
  1109. }
  1110. }
  1111. // Generate FilterFunc for each set of label filters by invoking a function instead of accessing
  1112. // values by closure to prevent reference-type looping bug.
  1113. // (see https://github.com/golang/go/wiki/CommonMistakes#using-reference-to-loop-iterator-variable)
  1114. for label, values := range labelValues {
  1115. ff := (func(l string, vs []string) FilterFunc {
  1116. return func(cd *CostData) (bool, string) {
  1117. ae := aggregateEnvironment(cd)
  1118. for _, v := range vs {
  1119. if v == "__unallocated__" { // Special case. __unallocated__ means return all pods without the attached label
  1120. if _, ok := cd.Labels[label]; !ok {
  1121. return true, ae
  1122. }
  1123. }
  1124. if cd.Labels[label] == v {
  1125. return true, ae
  1126. } else if strings.HasSuffix(v, "*") { // trigger wildcard prefix filtering
  1127. vTrim := strings.TrimSuffix(v, "*")
  1128. if strings.HasPrefix(cd.Labels[label], vTrim) {
  1129. return true, ae
  1130. }
  1131. }
  1132. }
  1133. return false, ae
  1134. }
  1135. })(label, values)
  1136. filterFuncs = append(filterFuncs, ff)
  1137. }
  1138. }
  1139. // clear cache prior to checking the cache so that a clearCache=true
  1140. // request always returns a freshly computed value
  1141. if clearCache {
  1142. a.AggregateCache.Flush()
  1143. a.CostDataCache.Flush()
  1144. }
  1145. cacheExpiry := a.GetCacheExpiration(window.Duration())
  1146. if noExpireCache {
  1147. cacheExpiry = cache.NoExpiration
  1148. }
  1149. // parametrize cache key by all request parameters
  1150. aggKey := GenerateAggKey(window, field, subfields, opts)
  1151. thanosOffset := time.Now().Add(-thanos.OffsetDuration())
  1152. if a.ThanosClient != nil && window.End().After(thanosOffset) {
  1153. log.Infof("ComputeAggregateCostModel: setting end time backwards to first present data")
  1154. // Apply offsets to both end and start times to maintain correct time range
  1155. deltaDuration := window.End().Sub(thanosOffset)
  1156. s := window.Start().Add(-1 * deltaDuration)
  1157. e := time.Now().Add(-thanos.OffsetDuration())
  1158. window.Set(&s, &e)
  1159. }
  1160. dur, off := window.ToDurationOffset()
  1161. key := fmt.Sprintf(`%s:%s:%fh:%t`, dur, off, resolution.Hours(), remoteEnabled)
  1162. // report message about which of the two caches hit. by default report a miss
  1163. cacheMessage := fmt.Sprintf("ComputeAggregateCostModel: L1 cache miss: %s L2 cache miss: %s", aggKey, key)
  1164. // check the cache for aggregated response; if cache is hit and not disabled, return response
  1165. if value, found := a.AggregateCache.Get(aggKey); found && !disableCache && !noCache {
  1166. result, ok := value.(map[string]*Aggregation)
  1167. if !ok {
  1168. // disable cache and recompute if type cast fails
  1169. log.Errorf("ComputeAggregateCostModel: caching error: failed to cast aggregate data to struct: %s", aggKey)
  1170. return a.ComputeAggregateCostModel(promClient, window, field, subfields, opts)
  1171. }
  1172. return result, fmt.Sprintf("aggregate cache hit: %s", aggKey), nil
  1173. }
  1174. if window.Hours() >= 1.0 {
  1175. // exclude the last window of the time frame to match Prometheus definitions of range, offset, and resolution
  1176. start := window.Start().Add(resolution)
  1177. window.Set(&start, window.End())
  1178. } else {
  1179. // don't cache requests for durations of less than one hour
  1180. disableCache = true
  1181. }
  1182. // attempt to retrieve cost data from cache
  1183. var costData map[string]*CostData
  1184. var err error
  1185. cacheData, found := a.CostDataCache.Get(key)
  1186. if found && !disableCache && !noCache {
  1187. ok := false
  1188. costData, ok = cacheData.(map[string]*CostData)
  1189. cacheMessage = fmt.Sprintf("ComputeAggregateCostModel: L1 cache miss: %s, L2 cost data cache hit: %s", aggKey, key)
  1190. if !ok {
  1191. log.Errorf("ComputeAggregateCostModel: caching error: failed to cast cost data to struct: %s", key)
  1192. }
  1193. } else {
  1194. log.Infof("ComputeAggregateCostModel: missed cache: %s (found %t, disableCache %t, noCache %t)", key, found, disableCache, noCache)
  1195. costData, err = a.Model.ComputeCostDataRange(promClient, a.CloudProvider, window, resolution, "", "")
  1196. if err != nil {
  1197. if prom.IsErrorCollection(err) {
  1198. return nil, "", err
  1199. }
  1200. if pce, ok := err.(prom.CommError); ok {
  1201. return nil, "", pce
  1202. }
  1203. if strings.Contains(err.Error(), "data is empty") {
  1204. return nil, "", &EmptyDataError{err: err, window: window}
  1205. }
  1206. return nil, "", err
  1207. }
  1208. // compute length of the time series in the cost data and only compute
  1209. // aggregates and cache if the length is sufficiently high
  1210. costDataLen := costDataTimeSeriesLength(costData)
  1211. if window.Hours() < 1.0 {
  1212. // scale hourly cost data down to fractional hour
  1213. costData = ScaleHourlyCostData(costData, resolution.Hours())
  1214. }
  1215. if costDataLen == 0 {
  1216. return nil, "", &EmptyDataError{window: window}
  1217. }
  1218. if costDataLen >= minCostDataLength && !noCache {
  1219. log.Infof("ComputeAggregateCostModel: setting L2 cache: %s", key)
  1220. a.CostDataCache.Set(key, costData, cacheExpiry)
  1221. }
  1222. }
  1223. c, err := a.CloudProvider.GetConfig()
  1224. if err != nil {
  1225. return nil, "", err
  1226. }
  1227. discount, err := ParsePercentString(c.Discount)
  1228. if err != nil {
  1229. return nil, "", err
  1230. }
  1231. customDiscount, err := ParsePercentString(c.NegotiatedDiscount)
  1232. if err != nil {
  1233. return nil, "", err
  1234. }
  1235. sc := make(map[string]*SharedCostInfo)
  1236. if !disableSharedOverhead {
  1237. for key, val := range c.SharedCosts {
  1238. cost, err := strconv.ParseFloat(val, 64)
  1239. durationCoefficient := window.Hours() / util.HoursPerMonth
  1240. if err != nil {
  1241. return nil, "", fmt.Errorf("unable to parse shared cost %s: %s", val, err)
  1242. }
  1243. sc[key] = &SharedCostInfo{
  1244. Name: key,
  1245. Cost: cost * durationCoefficient,
  1246. }
  1247. }
  1248. }
  1249. idleCoefficients := make(map[string]float64)
  1250. if allocateIdle {
  1251. duration, offset := window.ToDurationOffset()
  1252. idleDurationCalcHours := window.Hours()
  1253. if window.Hours() < 1 {
  1254. idleDurationCalcHours = 1
  1255. }
  1256. duration = fmt.Sprintf("%dh", int(idleDurationCalcHours))
  1257. if a.ThanosClient != nil {
  1258. offset = thanos.Offset()
  1259. log.Infof("ComputeAggregateCostModel: setting offset to %s", offset)
  1260. }
  1261. idleCoefficients, err = a.ComputeIdleCoefficient(costData, promClient, a.CloudProvider, discount, customDiscount, duration, offset)
  1262. if err != nil {
  1263. log.Errorf("ComputeAggregateCostModel: error computing idle coefficient: duration=%s, offset=%s, err=%s", duration, offset, err)
  1264. return nil, "", err
  1265. }
  1266. }
  1267. totalContainerCost := 0.0
  1268. if shared == SplitTypeWeighted {
  1269. totalContainerCost = GetTotalContainerCost(costData, rate, a.CloudProvider, discount, customDiscount, idleCoefficients)
  1270. }
  1271. // filter cost data by namespace and cluster after caching for maximal cache hits
  1272. costData, filteredContainerCount, filteredEnvironments := FilterCostData(costData, retainFuncs, filterFuncs)
  1273. // aggregate cost model data by given fields and cache the result for the default expiration
  1274. aggOpts := &AggregationOptions{
  1275. Discount: discount,
  1276. CustomDiscount: customDiscount,
  1277. IdleCoefficients: idleCoefficients,
  1278. IncludeEfficiency: includeEfficiency,
  1279. IncludeTimeSeries: includeTimeSeries,
  1280. Rate: rate,
  1281. ResolutionHours: resolution.Hours(),
  1282. SharedResourceInfo: sri,
  1283. SharedCosts: sc,
  1284. FilteredContainerCount: filteredContainerCount,
  1285. FilteredEnvironments: filteredEnvironments,
  1286. TotalContainerCost: totalContainerCost,
  1287. SharedSplit: shared,
  1288. }
  1289. result := AggregateCostData(costData, field, subfields, a.CloudProvider, aggOpts)
  1290. // If sending time series data back, switch scale back to hourly data. At this point,
  1291. // resolutionHours may have converted our hourly data to more- or less-than hourly data.
  1292. if includeTimeSeries {
  1293. for _, aggs := range result {
  1294. ScaleAggregationTimeSeries(aggs, resolution.Hours())
  1295. }
  1296. }
  1297. // compute length of the time series in the cost data and only cache
  1298. // aggregation results if the length is sufficiently high
  1299. costDataLen := costDataTimeSeriesLength(costData)
  1300. if costDataLen >= minCostDataLength && window.Hours() > 1.0 && !noCache {
  1301. // Set the result map (rather than a pointer to it) because map is a reference type
  1302. log.Infof("ComputeAggregateCostModel: setting aggregate cache: %s", aggKey)
  1303. a.AggregateCache.Set(aggKey, result, cacheExpiry)
  1304. } else {
  1305. log.Infof("ComputeAggregateCostModel: not setting aggregate cache: %s (not enough data: %t; duration less than 1h: %t; noCache: %t)", key, costDataLen < minCostDataLength, window.Hours() < 1, noCache)
  1306. }
  1307. return result, cacheMessage, nil
  1308. }
  1309. // ScaleAggregationTimeSeries reverses the scaling done by ScaleHourlyCostData, returning
  1310. // the aggregation's time series to hourly data.
  1311. func ScaleAggregationTimeSeries(aggregation *Aggregation, resolutionHours float64) {
  1312. for _, v := range aggregation.CPUCostVector {
  1313. v.Value /= resolutionHours
  1314. }
  1315. for _, v := range aggregation.GPUCostVector {
  1316. v.Value /= resolutionHours
  1317. }
  1318. for _, v := range aggregation.RAMCostVector {
  1319. v.Value /= resolutionHours
  1320. }
  1321. for _, v := range aggregation.PVCostVector {
  1322. v.Value /= resolutionHours
  1323. }
  1324. for _, v := range aggregation.NetworkCostVector {
  1325. v.Value /= resolutionHours
  1326. }
  1327. for _, v := range aggregation.TotalCostVector {
  1328. v.Value /= resolutionHours
  1329. }
  1330. return
  1331. }
  1332. // String returns a string representation of the encapsulated shared resources, which
  1333. // can be used to uniquely identify a set of shared resources. Sorting sets of shared
  1334. // resources ensures that strings representing permutations of the same combination match.
  1335. func (s *SharedResourceInfo) String() string {
  1336. if s == nil {
  1337. return ""
  1338. }
  1339. nss := []string{}
  1340. for ns := range s.SharedNamespace {
  1341. nss = append(nss, ns)
  1342. }
  1343. sort.Strings(nss)
  1344. nsStr := strings.Join(nss, ",")
  1345. labels := []string{}
  1346. for lbl, vals := range s.LabelSelectors {
  1347. for val := range vals {
  1348. if lbl != "" && val != "" {
  1349. labels = append(labels, fmt.Sprintf("%s=%s", lbl, val))
  1350. }
  1351. }
  1352. }
  1353. sort.Strings(labels)
  1354. labelStr := strings.Join(labels, ",")
  1355. return fmt.Sprintf("%s:%s", nsStr, labelStr)
  1356. }
  1357. type aggKeyParams struct {
  1358. duration string
  1359. offset string
  1360. filters map[string]string
  1361. field string
  1362. subfields []string
  1363. rate string
  1364. sri *SharedResourceInfo
  1365. shareType string
  1366. idle bool
  1367. timeSeries bool
  1368. efficiency bool
  1369. }
  1370. // GenerateAggKey generates a parameter-unique key for caching the aggregate cost model
  1371. func GenerateAggKey(window kubecost.Window, field string, subfields []string, opts *AggregateQueryOpts) string {
  1372. if opts == nil {
  1373. opts = DefaultAggregateQueryOpts()
  1374. }
  1375. // Covert to duration, offset so that cache hits occur, even when timestamps have
  1376. // shifted slightly.
  1377. duration, offset := window.ToDurationOffset()
  1378. // parse, trim, and sort podprefix filters
  1379. podPrefixFilters := []string{}
  1380. if ppfs, ok := opts.Filters["podprefix"]; ok && ppfs != "" {
  1381. for _, psf := range strings.Split(ppfs, ",") {
  1382. podPrefixFilters = append(podPrefixFilters, strings.TrimSpace(psf))
  1383. }
  1384. }
  1385. sort.Strings(podPrefixFilters)
  1386. podPrefixFiltersStr := strings.Join(podPrefixFilters, ",")
  1387. // parse, trim, and sort namespace filters
  1388. nsFilters := []string{}
  1389. if nsfs, ok := opts.Filters["namespace"]; ok && nsfs != "" {
  1390. for _, nsf := range strings.Split(nsfs, ",") {
  1391. nsFilters = append(nsFilters, strings.TrimSpace(nsf))
  1392. }
  1393. }
  1394. sort.Strings(nsFilters)
  1395. nsFilterStr := strings.Join(nsFilters, ",")
  1396. // parse, trim, and sort node filters
  1397. nodeFilters := []string{}
  1398. if nodefs, ok := opts.Filters["node"]; ok && nodefs != "" {
  1399. for _, nodef := range strings.Split(nodefs, ",") {
  1400. nodeFilters = append(nodeFilters, strings.TrimSpace(nodef))
  1401. }
  1402. }
  1403. sort.Strings(nodeFilters)
  1404. nodeFilterStr := strings.Join(nodeFilters, ",")
  1405. // parse, trim, and sort cluster filters
  1406. cFilters := []string{}
  1407. if cfs, ok := opts.Filters["cluster"]; ok && cfs != "" {
  1408. for _, cf := range strings.Split(cfs, ",") {
  1409. cFilters = append(cFilters, strings.TrimSpace(cf))
  1410. }
  1411. }
  1412. sort.Strings(cFilters)
  1413. cFilterStr := strings.Join(cFilters, ",")
  1414. // parse, trim, and sort label filters
  1415. lFilters := []string{}
  1416. if lfs, ok := opts.Filters["labels"]; ok && lfs != "" {
  1417. for _, lf := range strings.Split(lfs, ",") {
  1418. // trim whitespace from the label name and the label value
  1419. // of each label name/value pair, then reconstruct
  1420. // e.g. "tier = frontend, app = kubecost" == "app=kubecost,tier=frontend"
  1421. lfa := strings.Split(lf, "=")
  1422. if len(lfa) == 2 {
  1423. lfn := strings.TrimSpace(lfa[0])
  1424. lfv := strings.TrimSpace(lfa[1])
  1425. lFilters = append(lFilters, fmt.Sprintf("%s=%s", lfn, lfv))
  1426. } else {
  1427. // label is not of the form name=value, so log it and move on
  1428. klog.V(2).Infof("[Warning] GenerateAggKey: skipping illegal label filter: %s", lf)
  1429. }
  1430. }
  1431. }
  1432. sort.Strings(lFilters)
  1433. lFilterStr := strings.Join(lFilters, ",")
  1434. filterStr := fmt.Sprintf("%s:%s:%s:%s:%s", nsFilterStr, nodeFilterStr, cFilterStr, lFilterStr, podPrefixFiltersStr)
  1435. sort.Strings(subfields)
  1436. fieldStr := fmt.Sprintf("%s:%s", field, strings.Join(subfields, ","))
  1437. return fmt.Sprintf("%s:%s:%s:%s:%s:%s:%s:%t:%t:%t", duration, offset, filterStr, fieldStr, opts.Rate,
  1438. opts.SharedResources, opts.ShareSplit, opts.AllocateIdle, opts.IncludeTimeSeries,
  1439. opts.IncludeEfficiency)
  1440. }
  1441. // Aggregator is capable of computing the aggregated cost model. This is
  1442. // a brutal interface, which should be cleaned up, but it's necessary for
  1443. // being able to swap in an ETL-backed implementation.
  1444. type Aggregator interface {
  1445. ComputeAggregateCostModel(promClient prometheusClient.Client, window kubecost.Window, field string, subfields []string, opts *AggregateQueryOpts) (map[string]*Aggregation, string, error)
  1446. }
  1447. func (a *Accesses) warmAggregateCostModelCache() {
  1448. // Only allow one concurrent cache-warming operation
  1449. sem := util.NewSemaphore(1)
  1450. // Set default values, pulling them from application settings where applicable, and warm the cache
  1451. // for the given duration. Cache is intentionally set to expire (i.e. noExpireCache=false) so that
  1452. // if the default parameters change, the old cached defaults with eventually expire. Thus, the
  1453. // timing of the cache expiry/refresh is the only mechanism ensuring 100% cache warmth.
  1454. warmFunc := func(duration, durationHrs, offset string, cacheEfficiencyData bool) (error, error) {
  1455. promClient := a.GetPrometheusClient(true)
  1456. windowStr := fmt.Sprintf("%s offset %s", duration, offset)
  1457. window, err := kubecost.ParseWindowUTC(windowStr)
  1458. if err != nil {
  1459. return nil, fmt.Errorf("invalid window from window string: %s", windowStr)
  1460. }
  1461. field := "namespace"
  1462. subfields := []string{}
  1463. aggOpts := DefaultAggregateQueryOpts()
  1464. aggOpts.Rate = ""
  1465. aggOpts.Filters = map[string]string{}
  1466. aggOpts.IncludeTimeSeries = false
  1467. aggOpts.IncludeEfficiency = true
  1468. aggOpts.DisableCache = true
  1469. aggOpts.ClearCache = false
  1470. aggOpts.NoCache = false
  1471. aggOpts.NoExpireCache = false
  1472. aggOpts.ShareSplit = SplitTypeWeighted
  1473. aggOpts.RemoteEnabled = env.IsRemoteEnabled()
  1474. aggOpts.AllocateIdle = cloud.AllocateIdleByDefault(a.CloudProvider)
  1475. sharedNamespaces := cloud.SharedNamespaces(a.CloudProvider)
  1476. sharedLabelNames, sharedLabelValues := cloud.SharedLabels(a.CloudProvider)
  1477. if len(sharedNamespaces) > 0 || len(sharedLabelNames) > 0 {
  1478. aggOpts.SharedResources = NewSharedResourceInfo(true, sharedNamespaces, sharedLabelNames, sharedLabelValues)
  1479. }
  1480. aggKey := GenerateAggKey(window, field, subfields, aggOpts)
  1481. log.Infof("aggregation: cache warming defaults: %s", aggKey)
  1482. key := fmt.Sprintf("%s:%s", durationHrs, offset)
  1483. _, _, aggErr := a.ComputeAggregateCostModel(promClient, window, field, subfields, aggOpts)
  1484. if aggErr != nil {
  1485. log.Infof("Error building cache %s: %s", window, aggErr)
  1486. }
  1487. if a.ThanosClient != nil {
  1488. offset = thanos.Offset()
  1489. log.Infof("Setting offset to %s", offset)
  1490. }
  1491. totals, err := a.ComputeClusterCosts(promClient, a.CloudProvider, durationHrs, offset, cacheEfficiencyData)
  1492. if err != nil {
  1493. log.Infof("Error building cluster costs cache %s", key)
  1494. }
  1495. maxMinutesWithData := 0.0
  1496. for _, cluster := range totals {
  1497. if cluster.DataMinutes > maxMinutesWithData {
  1498. maxMinutesWithData = cluster.DataMinutes
  1499. }
  1500. }
  1501. if len(totals) > 0 && maxMinutesWithData > clusterCostsCacheMinutes {
  1502. a.ClusterCostsCache.Set(key, totals, a.GetCacheExpiration(window.Duration()))
  1503. log.Infof("caching %s cluster costs for %s", duration, a.GetCacheExpiration(window.Duration()))
  1504. } else {
  1505. log.Warningf("not caching %s cluster costs: no data or less than %f minutes data ", duration, clusterCostsCacheMinutes)
  1506. }
  1507. return aggErr, err
  1508. }
  1509. // 1 day
  1510. go func(sem *util.Semaphore) {
  1511. defer errors.HandlePanic()
  1512. duration := "1d"
  1513. offset := "1m"
  1514. durHrs := "24h"
  1515. dur := 24 * time.Hour
  1516. for {
  1517. sem.Acquire()
  1518. warmFunc(duration, durHrs, offset, true)
  1519. sem.Return()
  1520. log.Infof("aggregation: warm cache: %s", duration)
  1521. time.Sleep(a.GetCacheRefresh(dur))
  1522. }
  1523. }(sem)
  1524. // 2 day
  1525. go func(sem *util.Semaphore) {
  1526. defer errors.HandlePanic()
  1527. duration := "2d"
  1528. offset := "1m"
  1529. durHrs := "48h"
  1530. dur := 2 * 24 * time.Hour
  1531. for {
  1532. sem.Acquire()
  1533. warmFunc(duration, durHrs, offset, false)
  1534. sem.Return()
  1535. log.Infof("aggregation: warm cache: %s", duration)
  1536. time.Sleep(a.GetCacheRefresh(dur))
  1537. }
  1538. }(sem)
  1539. if !env.IsETLEnabled() {
  1540. // 7 day
  1541. go func(sem *util.Semaphore) {
  1542. defer errors.HandlePanic()
  1543. duration := "7d"
  1544. offset := "1m"
  1545. durHrs := "168h"
  1546. dur := 7 * 24 * time.Hour
  1547. for {
  1548. sem.Acquire()
  1549. aggErr, err := warmFunc(duration, durHrs, offset, false)
  1550. sem.Return()
  1551. log.Infof("aggregation: warm cache: %s", duration)
  1552. if aggErr == nil && err == nil {
  1553. time.Sleep(a.GetCacheRefresh(dur))
  1554. } else {
  1555. time.Sleep(5 * time.Minute)
  1556. }
  1557. }
  1558. }(sem)
  1559. // 30 day
  1560. go func(sem *util.Semaphore) {
  1561. defer errors.HandlePanic()
  1562. for {
  1563. duration := "30d"
  1564. offset := "1m"
  1565. durHrs := "720h"
  1566. dur := 30 * 24 * time.Hour
  1567. sem.Acquire()
  1568. aggErr, err := warmFunc(duration, durHrs, offset, false)
  1569. sem.Return()
  1570. if aggErr == nil && err == nil {
  1571. time.Sleep(a.GetCacheRefresh(dur))
  1572. } else {
  1573. time.Sleep(5 * time.Minute)
  1574. }
  1575. }
  1576. }(sem)
  1577. }
  1578. }
  1579. // AggregateCostModelHandler handles requests to the aggregated cost model API. See
  1580. // ComputeAggregateCostModel for details.
  1581. func (a *Accesses) AggregateCostModelHandler(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  1582. w.Header().Set("Content-Type", "application/json")
  1583. windowStr := r.URL.Query().Get("window")
  1584. // Convert UTC-RFC3339 pairs to configured UTC offset
  1585. // e.g. with UTC offset of -0600, 2020-07-01T00:00:00Z becomes
  1586. // 2020-07-01T06:00:00Z == 2020-07-01T00:00:00-0600
  1587. // TODO niko/etl fix the frontend because this is confusing if you're
  1588. // actually asking for UTC time (...Z) and we swap that "Z" out for the
  1589. // configured UTC offset without asking
  1590. rfc3339 := `\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\dZ`
  1591. regex := regexp.MustCompile(fmt.Sprintf(`(%s),(%s)`, rfc3339, rfc3339))
  1592. match := regex.FindStringSubmatch(windowStr)
  1593. if match != nil {
  1594. start, _ := time.Parse(time.RFC3339, match[1])
  1595. start = start.Add(-env.GetParsedUTCOffset()).In(time.UTC)
  1596. end, _ := time.Parse(time.RFC3339, match[2])
  1597. end = end.Add(-env.GetParsedUTCOffset()).In(time.UTC)
  1598. windowStr = fmt.Sprintf("%sZ,%sZ", start.Format("2006-01-02T15:04:05"), end.Format("2006-01-02T15:04:05Z"))
  1599. }
  1600. // determine duration and offset from query parameters
  1601. window, err := kubecost.ParseWindowWithOffset(windowStr, env.GetParsedUTCOffset())
  1602. if err != nil || window.Start() == nil {
  1603. http.Error(w, fmt.Sprintf("invalid window: %s", err), http.StatusBadRequest)
  1604. return
  1605. }
  1606. durRegex := regexp.MustCompile(`^(\d+)(m|h|d|s)$`)
  1607. isDurationStr := durRegex.MatchString(windowStr)
  1608. // legacy offset option should override window offset
  1609. if r.URL.Query().Get("offset") != "" {
  1610. offset := r.URL.Query().Get("offset")
  1611. // Shift window by offset, but only when manually set with separate
  1612. // parameter and window was provided as a duration string. Otherwise,
  1613. // do not alter the (duration, offset) from ParseWindowWithOffset.
  1614. if offset != "1m" && isDurationStr {
  1615. match := durRegex.FindStringSubmatch(offset)
  1616. if match != nil && len(match) == 3 {
  1617. dur := time.Minute
  1618. if match[2] == "h" {
  1619. dur = time.Hour
  1620. }
  1621. if match[2] == "d" {
  1622. dur = 24 * time.Hour
  1623. }
  1624. if match[2] == "s" {
  1625. dur = time.Second
  1626. }
  1627. num, _ := strconv.ParseInt(match[1], 10, 64)
  1628. window = window.Shift(-time.Duration(num) * dur)
  1629. }
  1630. }
  1631. }
  1632. opts := DefaultAggregateQueryOpts()
  1633. // parse remaining query parameters
  1634. namespace := r.URL.Query().Get("namespace")
  1635. cluster := r.URL.Query().Get("cluster")
  1636. labels := r.URL.Query().Get("labels")
  1637. podprefix := r.URL.Query().Get("podprefix")
  1638. labelArray := strings.Split(labels, "=")
  1639. labelArray[0] = strings.ReplaceAll(labelArray[0], "-", "_")
  1640. labels = strings.Join(labelArray, "=")
  1641. field := r.URL.Query().Get("aggregation")
  1642. sharedNamespaces := r.URL.Query().Get("sharedNamespaces")
  1643. sharedLabelNames := r.URL.Query().Get("sharedLabelNames")
  1644. sharedLabelValues := r.URL.Query().Get("sharedLabelValues")
  1645. remote := r.URL.Query().Get("remote") != "false"
  1646. subfieldStr := r.URL.Query().Get("aggregationSubfield")
  1647. subfields := []string{}
  1648. if len(subfieldStr) > 0 {
  1649. s := strings.Split(r.URL.Query().Get("aggregationSubfield"), ",")
  1650. for _, rawLabel := range s {
  1651. subfields = append(subfields, prom.SanitizeLabelName(rawLabel))
  1652. }
  1653. }
  1654. idleFlag := r.URL.Query().Get("allocateIdle")
  1655. if idleFlag == "default" {
  1656. c, _ := a.CloudProvider.GetConfig()
  1657. opts.AllocateIdle = (c.DefaultIdle == "true")
  1658. } else {
  1659. opts.AllocateIdle = (idleFlag == "true")
  1660. }
  1661. opts.Rate = r.URL.Query().Get("rate")
  1662. opts.ShareSplit = r.URL.Query().Get("sharedSplit")
  1663. // timeSeries == true maintains the time series dimension of the data,
  1664. // which by default gets summed over the entire interval
  1665. opts.IncludeTimeSeries = r.URL.Query().Get("timeSeries") == "true"
  1666. // efficiency has been deprecated in favor of a default to always send efficiency
  1667. opts.IncludeEfficiency = true
  1668. // TODO niko/caching rename "recomputeCache"
  1669. // disableCache, if set to "true", tells this function to recompute and
  1670. // cache the requested data
  1671. opts.DisableCache = r.URL.Query().Get("disableCache") == "true"
  1672. // clearCache, if set to "true", tells this function to flush the cache,
  1673. // then recompute and cache the requested data
  1674. opts.ClearCache = r.URL.Query().Get("clearCache") == "true"
  1675. // noCache avoids the cache altogether, both reading from and writing to
  1676. opts.NoCache = r.URL.Query().Get("noCache") == "true"
  1677. // noExpireCache should only be used by cache warming to set non-expiring caches
  1678. opts.NoExpireCache = false
  1679. // etl triggers ETL adapter
  1680. opts.UseETLAdapter = r.URL.Query().Get("etl") == "true"
  1681. // aggregation field is required
  1682. if field == "" {
  1683. http.Error(w, "Missing aggregation field parameter", http.StatusBadRequest)
  1684. return
  1685. }
  1686. // aggregation subfield is required when aggregation field is "label"
  1687. if field == "label" && len(subfields) == 0 {
  1688. http.Error(w, "Missing aggregation subfield parameter for aggregation by label", http.StatusBadRequest)
  1689. return
  1690. }
  1691. // enforce one of four available rate options
  1692. if opts.Rate != "" && opts.Rate != "hourly" && opts.Rate != "daily" && opts.Rate != "monthly" {
  1693. http.Error(w, "If set, rate parameter must be one of: 'hourly', 'daily', 'monthly'", http.StatusBadRequest)
  1694. return
  1695. }
  1696. // parse cost data filters
  1697. // namespace and cluster are exact-string-matches
  1698. // labels are expected to be comma-separated and to take the form key=value
  1699. // e.g. app=cost-analyzer,app.kubernetes.io/instance=kubecost
  1700. opts.Filters = map[string]string{
  1701. "namespace": namespace,
  1702. "cluster": cluster,
  1703. "labels": labels,
  1704. "podprefix": podprefix,
  1705. }
  1706. // parse shared resources
  1707. sn := []string{}
  1708. sln := []string{}
  1709. slv := []string{}
  1710. if sharedNamespaces != "" {
  1711. sn = strings.Split(sharedNamespaces, ",")
  1712. }
  1713. if sharedLabelNames != "" {
  1714. sln = strings.Split(sharedLabelNames, ",")
  1715. slv = strings.Split(sharedLabelValues, ",")
  1716. if len(sln) != len(slv) || slv[0] == "" {
  1717. http.Error(w, "Supply exacly one shared label value per shared label name", http.StatusBadRequest)
  1718. return
  1719. }
  1720. }
  1721. if len(sn) > 0 || len(sln) > 0 {
  1722. opts.SharedResources = NewSharedResourceInfo(true, sn, sln, slv)
  1723. }
  1724. // enable remote if it is available and not disabled
  1725. opts.RemoteEnabled = remote && env.IsRemoteEnabled()
  1726. promClient := a.GetPrometheusClient(remote)
  1727. var data map[string]*Aggregation
  1728. var message string
  1729. data, message, err = a.AggAPI.ComputeAggregateCostModel(promClient, window, field, subfields, opts)
  1730. // Find any warnings in http request context
  1731. warning, _ := util.GetWarning(r)
  1732. if err != nil {
  1733. if emptyErr, ok := err.(*EmptyDataError); ok {
  1734. if warning == "" {
  1735. w.Write(WrapData(map[string]interface{}{}, emptyErr))
  1736. } else {
  1737. w.Write(WrapDataWithWarning(map[string]interface{}{}, emptyErr, warning))
  1738. }
  1739. return
  1740. }
  1741. if boundaryErr, ok := err.(*kubecost.BoundaryError); ok {
  1742. if window.Start() != nil && window.Start().After(time.Now().Add(-90*24*time.Hour)) {
  1743. // Asking for data within a 90 day period: it will be available
  1744. // after the pipeline builds
  1745. msg := "Data will be available after ETL is built"
  1746. rex := regexp.MustCompile(`(\d+\.*\d*)%`)
  1747. match := rex.FindStringSubmatch(boundaryErr.Message)
  1748. if len(match) > 1 {
  1749. completionPct, err := strconv.ParseFloat(match[1], 64)
  1750. if err == nil {
  1751. msg = fmt.Sprintf("%s (%.1f%% complete)", msg, completionPct)
  1752. }
  1753. }
  1754. http.Error(w, msg, http.StatusInternalServerError)
  1755. } else {
  1756. // Boundary error outside of 90 day period; may not be available
  1757. http.Error(w, boundaryErr.Error(), http.StatusInternalServerError)
  1758. }
  1759. return
  1760. }
  1761. errStr := fmt.Sprintf("error computing aggregate cost model: %s", err)
  1762. http.Error(w, errStr, http.StatusInternalServerError)
  1763. return
  1764. }
  1765. if warning == "" {
  1766. w.Write(WrapDataWithMessage(data, nil, message))
  1767. } else {
  1768. w.Write(WrapDataWithMessageAndWarning(data, nil, message, warning))
  1769. }
  1770. }