aggregation.go 88 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413
  1. package costmodel
  2. import (
  3. "fmt"
  4. "math"
  5. "net/http"
  6. "regexp"
  7. "sort"
  8. "strconv"
  9. "strings"
  10. "time"
  11. "github.com/julienschmidt/httprouter"
  12. "github.com/opencost/opencost/pkg/cloud/provider"
  13. "github.com/patrickmn/go-cache"
  14. prometheusClient "github.com/prometheus/client_golang/api"
  15. "github.com/opencost/opencost/core/pkg/filter/allocation"
  16. "github.com/opencost/opencost/core/pkg/log"
  17. "github.com/opencost/opencost/core/pkg/opencost"
  18. "github.com/opencost/opencost/core/pkg/util"
  19. "github.com/opencost/opencost/core/pkg/util/httputil"
  20. "github.com/opencost/opencost/core/pkg/util/json"
  21. "github.com/opencost/opencost/core/pkg/util/promutil"
  22. "github.com/opencost/opencost/core/pkg/util/timeutil"
  23. "github.com/opencost/opencost/pkg/cloud/models"
  24. "github.com/opencost/opencost/pkg/env"
  25. "github.com/opencost/opencost/pkg/errors"
  26. "github.com/opencost/opencost/pkg/prom"
  27. "github.com/opencost/opencost/pkg/thanos"
  28. )
  29. const (
  30. // SplitTypeWeighted signals that shared costs should be shared
  31. // proportionally, rather than evenly
  32. SplitTypeWeighted = "weighted"
  33. // UnallocatedSubfield indicates an allocation datum that does not have the
  34. // chosen Aggregator; e.g. during aggregation by some label, there may be
  35. // cost data that do not have the given label.
  36. UnallocatedSubfield = "__unallocated__"
  37. clusterCostsCacheMinutes = 5.0
  38. )
  39. // Aggregation describes aggregated cost data, containing cumulative cost and
  40. // allocation data per resource, vectors of rate data per resource, efficiency
  41. // data, and metadata describing the type of aggregation operation.
  42. type Aggregation struct {
  43. Aggregator string `json:"aggregation"`
  44. Subfields []string `json:"subfields,omitempty"`
  45. Environment string `json:"environment"`
  46. Cluster string `json:"cluster,omitempty"`
  47. Properties *opencost.AllocationProperties `json:"-"`
  48. Start time.Time `json:"-"`
  49. End time.Time `json:"-"`
  50. CPUAllocationHourlyAverage float64 `json:"cpuAllocationAverage"`
  51. CPUAllocationVectors []*util.Vector `json:"-"`
  52. CPUAllocationTotal float64 `json:"-"`
  53. CPUCost float64 `json:"cpuCost"`
  54. CPUCostVector []*util.Vector `json:"cpuCostVector,omitempty"`
  55. CPUEfficiency float64 `json:"cpuEfficiency"`
  56. CPURequestedVectors []*util.Vector `json:"-"`
  57. CPUUsedVectors []*util.Vector `json:"-"`
  58. Efficiency float64 `json:"efficiency"`
  59. GPUAllocationHourlyAverage float64 `json:"gpuAllocationAverage"`
  60. GPUAllocationVectors []*util.Vector `json:"-"`
  61. GPUCost float64 `json:"gpuCost"`
  62. GPUCostVector []*util.Vector `json:"gpuCostVector,omitempty"`
  63. GPUAllocationTotal float64 `json:"-"`
  64. RAMAllocationHourlyAverage float64 `json:"ramAllocationAverage"`
  65. RAMAllocationVectors []*util.Vector `json:"-"`
  66. RAMAllocationTotal float64 `json:"-"`
  67. RAMCost float64 `json:"ramCost"`
  68. RAMCostVector []*util.Vector `json:"ramCostVector,omitempty"`
  69. RAMEfficiency float64 `json:"ramEfficiency"`
  70. RAMRequestedVectors []*util.Vector `json:"-"`
  71. RAMUsedVectors []*util.Vector `json:"-"`
  72. PVAllocationHourlyAverage float64 `json:"pvAllocationAverage"`
  73. PVAllocationVectors []*util.Vector `json:"-"`
  74. PVAllocationTotal float64 `json:"-"`
  75. PVCost float64 `json:"pvCost"`
  76. PVCostVector []*util.Vector `json:"pvCostVector,omitempty"`
  77. NetworkCost float64 `json:"networkCost"`
  78. NetworkCostVector []*util.Vector `json:"networkCostVector,omitempty"`
  79. SharedCost float64 `json:"sharedCost"`
  80. TotalCost float64 `json:"totalCost"`
  81. TotalCostVector []*util.Vector `json:"totalCostVector,omitempty"`
  82. }
  83. // TotalHours determines the amount of hours the Aggregation covers, as a
  84. // function of the cost vectors and the resolution of those vectors' data
  85. func (a *Aggregation) TotalHours(resolutionHours float64) float64 {
  86. length := 1
  87. if length < len(a.CPUCostVector) {
  88. length = len(a.CPUCostVector)
  89. }
  90. if length < len(a.RAMCostVector) {
  91. length = len(a.RAMCostVector)
  92. }
  93. if length < len(a.PVCostVector) {
  94. length = len(a.PVCostVector)
  95. }
  96. if length < len(a.GPUCostVector) {
  97. length = len(a.GPUCostVector)
  98. }
  99. if length < len(a.NetworkCostVector) {
  100. length = len(a.NetworkCostVector)
  101. }
  102. return float64(length) * resolutionHours
  103. }
  104. // RateCoefficient computes the coefficient by which the total cost needs to be
  105. // multiplied in order to convert totals costs into per-rate costs.
  106. func (a *Aggregation) RateCoefficient(rateStr string, resolutionHours float64) float64 {
  107. // monthly rate = (730.0)*(total cost)/(total hours)
  108. // daily rate = (24.0)*(total cost)/(total hours)
  109. // hourly rate = (1.0)*(total cost)/(total hours)
  110. // default to hourly rate
  111. coeff := 1.0
  112. switch rateStr {
  113. case "daily":
  114. coeff = timeutil.HoursPerDay
  115. case "monthly":
  116. coeff = timeutil.HoursPerMonth
  117. }
  118. return coeff / a.TotalHours(resolutionHours)
  119. }
  120. type SharedResourceInfo struct {
  121. ShareResources bool
  122. SharedNamespace map[string]bool
  123. LabelSelectors map[string]map[string]bool
  124. }
  125. type SharedCostInfo struct {
  126. Name string
  127. Cost float64
  128. ShareType string
  129. }
  130. func (s *SharedResourceInfo) IsSharedResource(costDatum *CostData) bool {
  131. // exists in a shared namespace
  132. if _, ok := s.SharedNamespace[costDatum.Namespace]; ok {
  133. return true
  134. }
  135. // has at least one shared label (OR, not AND in the case of multiple labels)
  136. for labelName, labelValues := range s.LabelSelectors {
  137. if val, ok := costDatum.Labels[labelName]; ok && labelValues[val] {
  138. return true
  139. }
  140. }
  141. return false
  142. }
  143. func NewSharedResourceInfo(shareResources bool, sharedNamespaces []string, labelNames []string, labelValues []string) *SharedResourceInfo {
  144. sr := &SharedResourceInfo{
  145. ShareResources: shareResources,
  146. SharedNamespace: make(map[string]bool),
  147. LabelSelectors: make(map[string]map[string]bool),
  148. }
  149. for _, ns := range sharedNamespaces {
  150. sr.SharedNamespace[strings.Trim(ns, " ")] = true
  151. }
  152. // Creating a map of label name to label value, but only if
  153. // the cardinality matches
  154. if len(labelNames) == len(labelValues) {
  155. for i := range labelNames {
  156. cleanedLname := promutil.SanitizeLabelName(strings.Trim(labelNames[i], " "))
  157. if values, ok := sr.LabelSelectors[cleanedLname]; ok {
  158. values[strings.Trim(labelValues[i], " ")] = true
  159. } else {
  160. sr.LabelSelectors[cleanedLname] = map[string]bool{strings.Trim(labelValues[i], " "): true}
  161. }
  162. }
  163. }
  164. return sr
  165. }
  166. func GetTotalContainerCost(costData map[string]*CostData, rate string, cp models.Provider, discount float64, customDiscount float64, idleCoefficients map[string]float64) float64 {
  167. totalContainerCost := 0.0
  168. for _, costDatum := range costData {
  169. clusterID := costDatum.ClusterID
  170. cpuv, ramv, gpuv, pvvs, netv := getPriceVectors(cp, costDatum, rate, discount, customDiscount, idleCoefficients[clusterID])
  171. totalContainerCost += totalVectors(cpuv)
  172. totalContainerCost += totalVectors(ramv)
  173. totalContainerCost += totalVectors(gpuv)
  174. for _, pv := range pvvs {
  175. totalContainerCost += totalVectors(pv)
  176. }
  177. totalContainerCost += totalVectors(netv)
  178. }
  179. return totalContainerCost
  180. }
  181. func (a *Accesses) ComputeIdleCoefficient(costData map[string]*CostData, cli prometheusClient.Client, cp models.Provider, discount float64, customDiscount float64, window, offset time.Duration) (map[string]float64, error) {
  182. coefficients := make(map[string]float64)
  183. profileName := "ComputeIdleCoefficient: ComputeClusterCosts"
  184. profileStart := time.Now()
  185. var clusterCosts map[string]*ClusterCosts
  186. var err error
  187. fmtWindow, fmtOffset := timeutil.DurationOffsetStrings(window, offset)
  188. key := fmt.Sprintf("%s:%s", fmtWindow, fmtOffset)
  189. if data, valid := a.ClusterCostsCache.Get(key); valid {
  190. clusterCosts = data.(map[string]*ClusterCosts)
  191. } else {
  192. clusterCosts, err = a.ComputeClusterCosts(cli, cp, window, offset, false)
  193. if err != nil {
  194. return nil, err
  195. }
  196. }
  197. measureTime(profileStart, profileThreshold, profileName)
  198. for cid, costs := range clusterCosts {
  199. if costs.CPUCumulative == 0 && costs.RAMCumulative == 0 && costs.StorageCumulative == 0 {
  200. log.Warnf("No ClusterCosts data for cluster '%s'. Is it emitting data?", cid)
  201. coefficients[cid] = 1.0
  202. continue
  203. }
  204. if costs.TotalCumulative == 0 {
  205. return nil, fmt.Errorf("TotalCumulative cluster cost for cluster '%s' returned 0 over window '%s' offset '%s'", cid, fmtWindow, fmtOffset)
  206. }
  207. totalContainerCost := 0.0
  208. for _, costDatum := range costData {
  209. if costDatum.ClusterID == cid {
  210. cpuv, ramv, gpuv, pvvs, _ := getPriceVectors(cp, costDatum, "", discount, customDiscount, 1)
  211. totalContainerCost += totalVectors(cpuv)
  212. totalContainerCost += totalVectors(ramv)
  213. totalContainerCost += totalVectors(gpuv)
  214. for _, pv := range pvvs {
  215. totalContainerCost += totalVectors(pv)
  216. }
  217. }
  218. }
  219. coeff := totalContainerCost / costs.TotalCumulative
  220. coefficients[cid] = coeff
  221. }
  222. return coefficients, nil
  223. }
  224. // AggregationOptions provides optional parameters to AggregateCostData, allowing callers to perform more complex operations
  225. type AggregationOptions struct {
  226. Discount float64 // percent by which to discount CPU, RAM, and GPU cost
  227. CustomDiscount float64 // additional custom discount applied to all prices
  228. IdleCoefficients map[string]float64 // scales costs by amount of idle resources on a per-cluster basis
  229. IncludeEfficiency bool // set to true to receive efficiency/usage data
  230. IncludeTimeSeries bool // set to true to receive time series data
  231. Rate string // set to "hourly", "daily", or "monthly" to receive cost rate, rather than cumulative cost
  232. ResolutionHours float64
  233. SharedResourceInfo *SharedResourceInfo
  234. SharedCosts map[string]*SharedCostInfo
  235. FilteredContainerCount int
  236. FilteredEnvironments map[string]int
  237. SharedSplit string
  238. TotalContainerCost float64
  239. }
  240. // Helper method to test request/usgae values against allocation averages for efficiency scores. Generate a warning log if
  241. // clamp is required
  242. func clampAverage(requestsAvg float64, usedAverage float64, allocationAvg float64, resource string) (float64, float64) {
  243. rAvg := requestsAvg
  244. if rAvg > allocationAvg {
  245. log.Debugf("Average %s Requested (%f) > Average %s Allocated (%f). Clamping.", resource, rAvg, resource, allocationAvg)
  246. rAvg = allocationAvg
  247. }
  248. uAvg := usedAverage
  249. if uAvg > allocationAvg {
  250. log.Debugf(" Average %s Used (%f) > Average %s Allocated (%f). Clamping.", resource, uAvg, resource, allocationAvg)
  251. uAvg = allocationAvg
  252. }
  253. return rAvg, uAvg
  254. }
  255. // AggregateCostData aggregates raw cost data by field; e.g. namespace, cluster, service, or label. In the case of label, callers
  256. // must pass a slice of subfields indicating the labels by which to group. Provider is used to define custom resource pricing.
  257. // See AggregationOptions for optional parameters.
  258. func AggregateCostData(costData map[string]*CostData, field string, subfields []string, cp models.Provider, opts *AggregationOptions) map[string]*Aggregation {
  259. discount := opts.Discount
  260. customDiscount := opts.CustomDiscount
  261. idleCoefficients := opts.IdleCoefficients
  262. includeTimeSeries := opts.IncludeTimeSeries
  263. includeEfficiency := opts.IncludeEfficiency
  264. rate := opts.Rate
  265. sr := opts.SharedResourceInfo
  266. resolutionHours := 1.0
  267. if opts.ResolutionHours > 0.0 {
  268. resolutionHours = opts.ResolutionHours
  269. }
  270. if idleCoefficients == nil {
  271. idleCoefficients = make(map[string]float64)
  272. }
  273. // aggregations collects key-value pairs of resource group-to-aggregated data
  274. // e.g. namespace-to-data or label-value-to-data
  275. aggregations := make(map[string]*Aggregation)
  276. // sharedResourceCost is the running total cost of resources that should be reported
  277. // as shared across all other resources, rather than reported as a stand-alone category
  278. sharedResourceCost := 0.0
  279. for _, costDatum := range costData {
  280. idleCoefficient, ok := idleCoefficients[costDatum.ClusterID]
  281. if !ok {
  282. idleCoefficient = 1.0
  283. }
  284. if sr != nil && sr.ShareResources && sr.IsSharedResource(costDatum) {
  285. cpuv, ramv, gpuv, pvvs, netv := getPriceVectors(cp, costDatum, rate, discount, customDiscount, idleCoefficient)
  286. sharedResourceCost += totalVectors(cpuv)
  287. sharedResourceCost += totalVectors(ramv)
  288. sharedResourceCost += totalVectors(gpuv)
  289. sharedResourceCost += totalVectors(netv)
  290. for _, pv := range pvvs {
  291. sharedResourceCost += totalVectors(pv)
  292. }
  293. } else {
  294. if field == "cluster" {
  295. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.ClusterID, discount, customDiscount, idleCoefficient, false)
  296. } else if field == "node" {
  297. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.NodeName, discount, customDiscount, idleCoefficient, false)
  298. } else if field == "namespace" {
  299. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace, discount, customDiscount, idleCoefficient, false)
  300. } else if field == "service" {
  301. if len(costDatum.Services) > 0 {
  302. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace+"/"+costDatum.Services[0], discount, customDiscount, idleCoefficient, false)
  303. } else {
  304. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, UnallocatedSubfield, discount, customDiscount, idleCoefficient, false)
  305. }
  306. } else if field == "deployment" {
  307. if len(costDatum.Deployments) > 0 {
  308. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace+"/"+costDatum.Deployments[0], discount, customDiscount, idleCoefficient, false)
  309. } else {
  310. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, UnallocatedSubfield, discount, customDiscount, idleCoefficient, false)
  311. }
  312. } else if field == "statefulset" {
  313. if len(costDatum.Statefulsets) > 0 {
  314. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace+"/"+costDatum.Statefulsets[0], discount, customDiscount, idleCoefficient, false)
  315. } else {
  316. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, UnallocatedSubfield, discount, customDiscount, idleCoefficient, false)
  317. }
  318. } else if field == "daemonset" {
  319. if len(costDatum.Daemonsets) > 0 {
  320. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace+"/"+costDatum.Daemonsets[0], discount, customDiscount, idleCoefficient, false)
  321. } else {
  322. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, UnallocatedSubfield, discount, customDiscount, idleCoefficient, false)
  323. }
  324. } else if field == "controller" {
  325. if controller, kind, hasController := costDatum.GetController(); hasController {
  326. key := fmt.Sprintf("%s/%s:%s", costDatum.Namespace, kind, controller)
  327. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, key, discount, customDiscount, idleCoefficient, false)
  328. } else {
  329. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, UnallocatedSubfield, discount, customDiscount, idleCoefficient, false)
  330. }
  331. } else if field == "label" {
  332. found := false
  333. if costDatum.Labels != nil {
  334. for _, sf := range subfields {
  335. if subfieldName, ok := costDatum.Labels[sf]; ok {
  336. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, subfieldName, discount, customDiscount, idleCoefficient, false)
  337. found = true
  338. break
  339. }
  340. }
  341. }
  342. if !found {
  343. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, UnallocatedSubfield, discount, customDiscount, idleCoefficient, false)
  344. }
  345. } else if field == "annotation" {
  346. found := false
  347. if costDatum.Annotations != nil {
  348. for _, sf := range subfields {
  349. if subfieldName, ok := costDatum.Annotations[sf]; ok {
  350. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, subfieldName, discount, customDiscount, idleCoefficient, false)
  351. found = true
  352. break
  353. }
  354. }
  355. }
  356. if !found {
  357. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, UnallocatedSubfield, discount, customDiscount, idleCoefficient, false)
  358. }
  359. } else if field == "pod" {
  360. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace+"/"+costDatum.PodName, discount, customDiscount, idleCoefficient, false)
  361. } else if field == "container" {
  362. key := fmt.Sprintf("%s/%s/%s/%s", costDatum.ClusterID, costDatum.Namespace, costDatum.PodName, costDatum.Name)
  363. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, key, discount, customDiscount, idleCoefficient, true)
  364. }
  365. }
  366. }
  367. for key, agg := range aggregations {
  368. sharedCoefficient := 1 / float64(len(opts.FilteredEnvironments)+len(aggregations))
  369. agg.CPUCost = totalVectors(agg.CPUCostVector)
  370. agg.RAMCost = totalVectors(agg.RAMCostVector)
  371. agg.GPUCost = totalVectors(agg.GPUCostVector)
  372. agg.PVCost = totalVectors(agg.PVCostVector)
  373. agg.NetworkCost = totalVectors(agg.NetworkCostVector)
  374. if opts.SharedSplit == SplitTypeWeighted {
  375. d := opts.TotalContainerCost - sharedResourceCost
  376. if d == 0 {
  377. log.Warnf("Total container cost '%f' and shared resource cost '%f are the same'. Setting sharedCoefficient to 1", opts.TotalContainerCost, sharedResourceCost)
  378. sharedCoefficient = 1.0
  379. } else {
  380. sharedCoefficient = (agg.CPUCost + agg.RAMCost + agg.GPUCost + agg.PVCost + agg.NetworkCost) / d
  381. }
  382. }
  383. agg.SharedCost = sharedResourceCost * sharedCoefficient
  384. for _, v := range opts.SharedCosts {
  385. agg.SharedCost += v.Cost * sharedCoefficient
  386. }
  387. if rate != "" {
  388. rateCoeff := agg.RateCoefficient(rate, resolutionHours)
  389. agg.CPUCost *= rateCoeff
  390. agg.RAMCost *= rateCoeff
  391. agg.GPUCost *= rateCoeff
  392. agg.PVCost *= rateCoeff
  393. agg.NetworkCost *= rateCoeff
  394. agg.SharedCost *= rateCoeff
  395. }
  396. agg.TotalCost = agg.CPUCost + agg.RAMCost + agg.GPUCost + agg.PVCost + agg.NetworkCost + agg.SharedCost
  397. // Evicted and Completed Pods can still show up here, but have 0 cost.
  398. // Filter these by default. Any reason to keep them?
  399. if agg.TotalCost == 0 {
  400. delete(aggregations, key)
  401. continue
  402. }
  403. // CPU, RAM, and PV allocation are cumulative per-datum, whereas GPU is rate per-datum
  404. agg.CPUAllocationHourlyAverage = totalVectors(agg.CPUAllocationVectors) / agg.TotalHours(resolutionHours)
  405. agg.RAMAllocationHourlyAverage = totalVectors(agg.RAMAllocationVectors) / agg.TotalHours(resolutionHours)
  406. agg.GPUAllocationHourlyAverage = averageVectors(agg.GPUAllocationVectors)
  407. agg.PVAllocationHourlyAverage = totalVectors(agg.PVAllocationVectors) / agg.TotalHours(resolutionHours)
  408. // TODO niko/etl does this check out for GPU data? Do we need to rewrite GPU queries to be
  409. // cumulative?
  410. agg.CPUAllocationTotal = totalVectors(agg.CPUAllocationVectors)
  411. agg.GPUAllocationTotal = totalVectors(agg.GPUAllocationVectors)
  412. agg.PVAllocationTotal = totalVectors(agg.PVAllocationVectors)
  413. agg.RAMAllocationTotal = totalVectors(agg.RAMAllocationVectors)
  414. if includeEfficiency {
  415. // Default both RAM and CPU to 0% efficiency so that a 0-requested, 0-allocated, 0-used situation
  416. // returns 0% efficiency, which should be a red-flag.
  417. //
  418. // If non-zero numbers are available, then efficiency is defined as:
  419. // idlePercentage = (requested - used) / allocated
  420. // efficiency = (1.0 - idlePercentage)
  421. //
  422. // It is possible to score > 100% efficiency, which is meant to be interpreted as a red flag.
  423. // It is not possible to score < 0% efficiency.
  424. agg.CPUEfficiency = 0.0
  425. CPUIdle := 0.0
  426. if agg.CPUAllocationHourlyAverage > 0.0 {
  427. avgCPURequested := averageVectors(agg.CPURequestedVectors)
  428. avgCPUUsed := averageVectors(agg.CPUUsedVectors)
  429. // Clamp averages, log range violations
  430. avgCPURequested, avgCPUUsed = clampAverage(avgCPURequested, avgCPUUsed, agg.CPUAllocationHourlyAverage, "CPU")
  431. CPUIdle = ((avgCPURequested - avgCPUUsed) / agg.CPUAllocationHourlyAverage)
  432. agg.CPUEfficiency = 1.0 - CPUIdle
  433. }
  434. agg.RAMEfficiency = 0.0
  435. RAMIdle := 0.0
  436. if agg.RAMAllocationHourlyAverage > 0.0 {
  437. avgRAMRequested := averageVectors(agg.RAMRequestedVectors)
  438. avgRAMUsed := averageVectors(agg.RAMUsedVectors)
  439. // Clamp averages, log range violations
  440. avgRAMRequested, avgRAMUsed = clampAverage(avgRAMRequested, avgRAMUsed, agg.RAMAllocationHourlyAverage, "RAM")
  441. RAMIdle = ((avgRAMRequested - avgRAMUsed) / agg.RAMAllocationHourlyAverage)
  442. agg.RAMEfficiency = 1.0 - RAMIdle
  443. }
  444. // Score total efficiency by the sum of CPU and RAM efficiency, weighted by their
  445. // respective total costs.
  446. agg.Efficiency = 0.0
  447. if (agg.CPUCost + agg.RAMCost) > 0 {
  448. agg.Efficiency = ((agg.CPUCost * agg.CPUEfficiency) + (agg.RAMCost * agg.RAMEfficiency)) / (agg.CPUCost + agg.RAMCost)
  449. }
  450. }
  451. // convert RAM from bytes to GiB
  452. agg.RAMAllocationHourlyAverage = agg.RAMAllocationHourlyAverage / 1024 / 1024 / 1024
  453. // convert storage from bytes to GiB
  454. agg.PVAllocationHourlyAverage = agg.PVAllocationHourlyAverage / 1024 / 1024 / 1024
  455. // remove time series data if it is not explicitly requested
  456. if !includeTimeSeries {
  457. agg.CPUCostVector = nil
  458. agg.RAMCostVector = nil
  459. agg.GPUCostVector = nil
  460. agg.PVCostVector = nil
  461. agg.NetworkCostVector = nil
  462. agg.TotalCostVector = nil
  463. } else { // otherwise compute a totalcostvector
  464. v1 := addVectors(agg.CPUCostVector, agg.RAMCostVector)
  465. v2 := addVectors(v1, agg.GPUCostVector)
  466. v3 := addVectors(v2, agg.PVCostVector)
  467. v4 := addVectors(v3, agg.NetworkCostVector)
  468. agg.TotalCostVector = v4
  469. }
  470. // Typesafety checks
  471. if math.IsNaN(agg.CPUAllocationHourlyAverage) || math.IsInf(agg.CPUAllocationHourlyAverage, 0) {
  472. log.Warnf("CPUAllocationHourlyAverage is %f for '%s: %s/%s'", agg.CPUAllocationHourlyAverage, agg.Cluster, agg.Aggregator, agg.Environment)
  473. agg.CPUAllocationHourlyAverage = 0
  474. }
  475. if math.IsNaN(agg.CPUCost) || math.IsInf(agg.CPUCost, 0) {
  476. log.Warnf("CPUCost is %f for '%s: %s/%s'", agg.CPUCost, agg.Cluster, agg.Aggregator, agg.Environment)
  477. agg.CPUCost = 0
  478. }
  479. if math.IsNaN(agg.CPUEfficiency) || math.IsInf(agg.CPUEfficiency, 0) {
  480. log.Warnf("CPUEfficiency is %f for '%s: %s/%s'", agg.CPUEfficiency, agg.Cluster, agg.Aggregator, agg.Environment)
  481. agg.CPUEfficiency = 0
  482. }
  483. if math.IsNaN(agg.Efficiency) || math.IsInf(agg.Efficiency, 0) {
  484. log.Warnf("Efficiency is %f for '%s: %s/%s'", agg.Efficiency, agg.Cluster, agg.Aggregator, agg.Environment)
  485. agg.Efficiency = 0
  486. }
  487. if math.IsNaN(agg.GPUAllocationHourlyAverage) || math.IsInf(agg.GPUAllocationHourlyAverage, 0) {
  488. log.Warnf("GPUAllocationHourlyAverage is %f for '%s: %s/%s'", agg.GPUAllocationHourlyAverage, agg.Cluster, agg.Aggregator, agg.Environment)
  489. agg.GPUAllocationHourlyAverage = 0
  490. }
  491. if math.IsNaN(agg.GPUCost) || math.IsInf(agg.GPUCost, 0) {
  492. log.Warnf("GPUCost is %f for '%s: %s/%s'", agg.GPUCost, agg.Cluster, agg.Aggregator, agg.Environment)
  493. agg.GPUCost = 0
  494. }
  495. if math.IsNaN(agg.RAMAllocationHourlyAverage) || math.IsInf(agg.RAMAllocationHourlyAverage, 0) {
  496. log.Warnf("RAMAllocationHourlyAverage is %f for '%s: %s/%s'", agg.RAMAllocationHourlyAverage, agg.Cluster, agg.Aggregator, agg.Environment)
  497. agg.RAMAllocationHourlyAverage = 0
  498. }
  499. if math.IsNaN(agg.RAMCost) || math.IsInf(agg.RAMCost, 0) {
  500. log.Warnf("RAMCost is %f for '%s: %s/%s'", agg.RAMCost, agg.Cluster, agg.Aggregator, agg.Environment)
  501. agg.RAMCost = 0
  502. }
  503. if math.IsNaN(agg.RAMEfficiency) || math.IsInf(agg.RAMEfficiency, 0) {
  504. log.Warnf("RAMEfficiency is %f for '%s: %s/%s'", agg.RAMEfficiency, agg.Cluster, agg.Aggregator, agg.Environment)
  505. agg.RAMEfficiency = 0
  506. }
  507. if math.IsNaN(agg.PVAllocationHourlyAverage) || math.IsInf(agg.PVAllocationHourlyAverage, 0) {
  508. log.Warnf("PVAllocationHourlyAverage is %f for '%s: %s/%s'", agg.PVAllocationHourlyAverage, agg.Cluster, agg.Aggregator, agg.Environment)
  509. agg.PVAllocationHourlyAverage = 0
  510. }
  511. if math.IsNaN(agg.PVCost) || math.IsInf(agg.PVCost, 0) {
  512. log.Warnf("PVCost is %f for '%s: %s/%s'", agg.PVCost, agg.Cluster, agg.Aggregator, agg.Environment)
  513. agg.PVCost = 0
  514. }
  515. if math.IsNaN(agg.NetworkCost) || math.IsInf(agg.NetworkCost, 0) {
  516. log.Warnf("NetworkCost is %f for '%s: %s/%s'", agg.NetworkCost, agg.Cluster, agg.Aggregator, agg.Environment)
  517. agg.NetworkCost = 0
  518. }
  519. if math.IsNaN(agg.SharedCost) || math.IsInf(agg.SharedCost, 0) {
  520. log.Warnf("SharedCost is %f for '%s: %s/%s'", agg.SharedCost, agg.Cluster, agg.Aggregator, agg.Environment)
  521. agg.SharedCost = 0
  522. }
  523. if math.IsNaN(agg.TotalCost) || math.IsInf(agg.TotalCost, 0) {
  524. log.Warnf("TotalCost is %f for '%s: %s/%s'", agg.TotalCost, agg.Cluster, agg.Aggregator, agg.Environment)
  525. agg.TotalCost = 0
  526. }
  527. }
  528. return aggregations
  529. }
  530. func aggregateDatum(cp models.Provider, aggregations map[string]*Aggregation, costDatum *CostData, field string, subfields []string, rate string, key string, discount float64, customDiscount float64, idleCoefficient float64, includeProperties bool) {
  531. // add new entry to aggregation results if a new key is encountered
  532. if _, ok := aggregations[key]; !ok {
  533. agg := &Aggregation{
  534. Aggregator: field,
  535. Environment: key,
  536. }
  537. if len(subfields) > 0 {
  538. agg.Subfields = subfields
  539. }
  540. if includeProperties {
  541. props := &opencost.AllocationProperties{}
  542. props.Cluster = costDatum.ClusterID
  543. props.Node = costDatum.NodeName
  544. if controller, kind, hasController := costDatum.GetController(); hasController {
  545. props.Controller = controller
  546. props.ControllerKind = kind
  547. }
  548. props.Labels = costDatum.Labels
  549. props.Annotations = costDatum.Annotations
  550. props.Namespace = costDatum.Namespace
  551. props.Pod = costDatum.PodName
  552. props.Services = costDatum.Services
  553. props.Container = costDatum.Name
  554. agg.Properties = props
  555. }
  556. aggregations[key] = agg
  557. }
  558. mergeVectors(cp, costDatum, aggregations[key], rate, discount, customDiscount, idleCoefficient)
  559. }
  560. func mergeVectors(cp models.Provider, costDatum *CostData, aggregation *Aggregation, rate string, discount float64, customDiscount float64, idleCoefficient float64) {
  561. aggregation.CPUAllocationVectors = addVectors(costDatum.CPUAllocation, aggregation.CPUAllocationVectors)
  562. aggregation.CPURequestedVectors = addVectors(costDatum.CPUReq, aggregation.CPURequestedVectors)
  563. aggregation.CPUUsedVectors = addVectors(costDatum.CPUUsed, aggregation.CPUUsedVectors)
  564. aggregation.RAMAllocationVectors = addVectors(costDatum.RAMAllocation, aggregation.RAMAllocationVectors)
  565. aggregation.RAMRequestedVectors = addVectors(costDatum.RAMReq, aggregation.RAMRequestedVectors)
  566. aggregation.RAMUsedVectors = addVectors(costDatum.RAMUsed, aggregation.RAMUsedVectors)
  567. aggregation.GPUAllocationVectors = addVectors(costDatum.GPUReq, aggregation.GPUAllocationVectors)
  568. for _, pvcd := range costDatum.PVCData {
  569. aggregation.PVAllocationVectors = addVectors(pvcd.Values, aggregation.PVAllocationVectors)
  570. }
  571. cpuv, ramv, gpuv, pvvs, netv := getPriceVectors(cp, costDatum, rate, discount, customDiscount, idleCoefficient)
  572. aggregation.CPUCostVector = addVectors(cpuv, aggregation.CPUCostVector)
  573. aggregation.RAMCostVector = addVectors(ramv, aggregation.RAMCostVector)
  574. aggregation.GPUCostVector = addVectors(gpuv, aggregation.GPUCostVector)
  575. aggregation.NetworkCostVector = addVectors(netv, aggregation.NetworkCostVector)
  576. for _, vectorList := range pvvs {
  577. aggregation.PVCostVector = addVectors(aggregation.PVCostVector, vectorList)
  578. }
  579. }
  580. // Returns the blended discounts applied to the node as a result of global discounts and reserved instance
  581. // discounts
  582. func getDiscounts(costDatum *CostData, cpuCost float64, ramCost float64, discount float64) (float64, float64) {
  583. if costDatum.NodeData == nil {
  584. return discount, discount
  585. }
  586. if costDatum.NodeData.IsSpot() {
  587. return 0, 0
  588. }
  589. reserved := costDatum.NodeData.Reserved
  590. // blended discounts
  591. blendedCPUDiscount := discount
  592. blendedRAMDiscount := discount
  593. if reserved != nil && reserved.CPUCost > 0 && reserved.RAMCost > 0 {
  594. reservedCPUDiscount := 0.0
  595. if cpuCost == 0 {
  596. log.Warnf("No cpu cost found for cluster '%s' node '%s'", costDatum.ClusterID, costDatum.NodeName)
  597. } else {
  598. reservedCPUDiscount = 1.0 - (reserved.CPUCost / cpuCost)
  599. }
  600. reservedRAMDiscount := 0.0
  601. if ramCost == 0 {
  602. log.Warnf("No ram cost found for cluster '%s' node '%s'", costDatum.ClusterID, costDatum.NodeName)
  603. } else {
  604. reservedRAMDiscount = 1.0 - (reserved.RAMCost / ramCost)
  605. }
  606. // AWS passes the # of reserved CPU and RAM as -1 to represent "All"
  607. if reserved.ReservedCPU < 0 && reserved.ReservedRAM < 0 {
  608. blendedCPUDiscount = reservedCPUDiscount
  609. blendedRAMDiscount = reservedRAMDiscount
  610. } else {
  611. nodeCPU, ierr := strconv.ParseInt(costDatum.NodeData.VCPU, 10, 64)
  612. nodeRAM, ferr := strconv.ParseFloat(costDatum.NodeData.RAMBytes, 64)
  613. if ierr == nil && ferr == nil {
  614. nodeRAMGB := nodeRAM / 1024 / 1024 / 1024
  615. reservedRAMGB := float64(reserved.ReservedRAM) / 1024 / 1024 / 1024
  616. nonReservedCPU := nodeCPU - reserved.ReservedCPU
  617. nonReservedRAM := nodeRAMGB - reservedRAMGB
  618. if nonReservedCPU == 0 {
  619. blendedCPUDiscount = reservedCPUDiscount
  620. } else {
  621. if nodeCPU == 0 {
  622. log.Warnf("No ram found for cluster '%s' node '%s'", costDatum.ClusterID, costDatum.NodeName)
  623. } else {
  624. blendedCPUDiscount = (float64(reserved.ReservedCPU) * reservedCPUDiscount) + (float64(nonReservedCPU)*discount)/float64(nodeCPU)
  625. }
  626. }
  627. if nonReservedRAM == 0 {
  628. blendedRAMDiscount = reservedRAMDiscount
  629. } else {
  630. if nodeRAMGB == 0 {
  631. log.Warnf("No ram found for cluster '%s' node '%s'", costDatum.ClusterID, costDatum.NodeName)
  632. } else {
  633. blendedRAMDiscount = (reservedRAMGB * reservedRAMDiscount) + (nonReservedRAM*discount)/nodeRAMGB
  634. }
  635. }
  636. }
  637. }
  638. }
  639. return blendedCPUDiscount, blendedRAMDiscount
  640. }
  641. func parseVectorPricing(cfg *models.CustomPricing, costDatum *CostData, cpuCostStr, ramCostStr, gpuCostStr, pvCostStr string) (float64, float64, float64, float64, bool) {
  642. usesCustom := false
  643. cpuCost, err := strconv.ParseFloat(cpuCostStr, 64)
  644. if err != nil || math.IsNaN(cpuCost) || math.IsInf(cpuCost, 0) || cpuCost == 0 {
  645. cpuCost, err = strconv.ParseFloat(cfg.CPU, 64)
  646. usesCustom = true
  647. if err != nil || math.IsNaN(cpuCost) || math.IsInf(cpuCost, 0) {
  648. cpuCost = 0
  649. }
  650. }
  651. ramCost, err := strconv.ParseFloat(ramCostStr, 64)
  652. if err != nil || math.IsNaN(ramCost) || math.IsInf(ramCost, 0) || ramCost == 0 {
  653. ramCost, err = strconv.ParseFloat(cfg.RAM, 64)
  654. usesCustom = true
  655. if err != nil || math.IsNaN(ramCost) || math.IsInf(ramCost, 0) {
  656. ramCost = 0
  657. }
  658. }
  659. gpuCost, err := strconv.ParseFloat(gpuCostStr, 64)
  660. if err != nil || math.IsNaN(gpuCost) || math.IsInf(gpuCost, 0) {
  661. gpuCost, err = strconv.ParseFloat(cfg.GPU, 64)
  662. if err != nil || math.IsNaN(gpuCost) || math.IsInf(gpuCost, 0) {
  663. gpuCost = 0
  664. }
  665. }
  666. pvCost, err := strconv.ParseFloat(pvCostStr, 64)
  667. if err != nil || math.IsNaN(cpuCost) || math.IsInf(cpuCost, 0) {
  668. pvCost, err = strconv.ParseFloat(cfg.Storage, 64)
  669. if err != nil || math.IsNaN(pvCost) || math.IsInf(pvCost, 0) {
  670. pvCost = 0
  671. }
  672. }
  673. return cpuCost, ramCost, gpuCost, pvCost, usesCustom
  674. }
  675. func getPriceVectors(cp models.Provider, costDatum *CostData, rate string, discount float64, customDiscount float64, idleCoefficient float64) ([]*util.Vector, []*util.Vector, []*util.Vector, [][]*util.Vector, []*util.Vector) {
  676. var cpuCost float64
  677. var ramCost float64
  678. var gpuCost float64
  679. var pvCost float64
  680. var usesCustom bool
  681. // If custom pricing is enabled and can be retrieved, replace
  682. // default cost values with custom values
  683. customPricing, err := cp.GetConfig()
  684. if err != nil {
  685. log.Errorf("failed to load custom pricing: %s", err)
  686. }
  687. if provider.CustomPricesEnabled(cp) && err == nil {
  688. var cpuCostStr string
  689. var ramCostStr string
  690. var gpuCostStr string
  691. var pvCostStr string
  692. if costDatum.NodeData.IsSpot() {
  693. cpuCostStr = customPricing.SpotCPU
  694. ramCostStr = customPricing.SpotRAM
  695. gpuCostStr = customPricing.SpotGPU
  696. } else {
  697. cpuCostStr = customPricing.CPU
  698. ramCostStr = customPricing.RAM
  699. gpuCostStr = customPricing.GPU
  700. }
  701. pvCostStr = customPricing.Storage
  702. cpuCost, ramCost, gpuCost, pvCost, usesCustom = parseVectorPricing(customPricing, costDatum, cpuCostStr, ramCostStr, gpuCostStr, pvCostStr)
  703. } else if costDatum.NodeData == nil && err == nil {
  704. cpuCostStr := customPricing.CPU
  705. ramCostStr := customPricing.RAM
  706. gpuCostStr := customPricing.GPU
  707. pvCostStr := customPricing.Storage
  708. cpuCost, ramCost, gpuCost, pvCost, usesCustom = parseVectorPricing(customPricing, costDatum, cpuCostStr, ramCostStr, gpuCostStr, pvCostStr)
  709. } else {
  710. cpuCostStr := costDatum.NodeData.VCPUCost
  711. ramCostStr := costDatum.NodeData.RAMCost
  712. gpuCostStr := costDatum.NodeData.GPUCost
  713. pvCostStr := costDatum.NodeData.StorageCost
  714. cpuCost, ramCost, gpuCost, pvCost, usesCustom = parseVectorPricing(customPricing, costDatum, cpuCostStr, ramCostStr, gpuCostStr, pvCostStr)
  715. }
  716. if usesCustom {
  717. log.DedupedWarningf(5, "No pricing data found for node `%s` , using custom pricing", costDatum.NodeName)
  718. }
  719. cpuDiscount, ramDiscount := getDiscounts(costDatum, cpuCost, ramCost, discount)
  720. log.Debugf("Node Name: %s", costDatum.NodeName)
  721. log.Debugf("Blended CPU Discount: %f", cpuDiscount)
  722. log.Debugf("Blended RAM Discount: %f", ramDiscount)
  723. // TODO should we try to apply the rate coefficient here or leave it as a totals-only metric?
  724. rateCoeff := 1.0
  725. if idleCoefficient == 0 {
  726. idleCoefficient = 1.0
  727. }
  728. cpuv := make([]*util.Vector, 0, len(costDatum.CPUAllocation))
  729. for _, val := range costDatum.CPUAllocation {
  730. cpuv = append(cpuv, &util.Vector{
  731. Timestamp: math.Round(val.Timestamp/10) * 10,
  732. Value: (val.Value * cpuCost * (1 - cpuDiscount) * (1 - customDiscount) / idleCoefficient) * rateCoeff,
  733. })
  734. }
  735. ramv := make([]*util.Vector, 0, len(costDatum.RAMAllocation))
  736. for _, val := range costDatum.RAMAllocation {
  737. ramv = append(ramv, &util.Vector{
  738. Timestamp: math.Round(val.Timestamp/10) * 10,
  739. Value: ((val.Value / 1024 / 1024 / 1024) * ramCost * (1 - ramDiscount) * (1 - customDiscount) / idleCoefficient) * rateCoeff,
  740. })
  741. }
  742. gpuv := make([]*util.Vector, 0, len(costDatum.GPUReq))
  743. for _, val := range costDatum.GPUReq {
  744. gpuv = append(gpuv, &util.Vector{
  745. Timestamp: math.Round(val.Timestamp/10) * 10,
  746. Value: (val.Value * gpuCost * (1 - discount) * (1 - customDiscount) / idleCoefficient) * rateCoeff,
  747. })
  748. }
  749. pvvs := make([][]*util.Vector, 0, len(costDatum.PVCData))
  750. for _, pvcData := range costDatum.PVCData {
  751. pvv := make([]*util.Vector, 0, len(pvcData.Values))
  752. if pvcData.Volume != nil {
  753. cost, _ := strconv.ParseFloat(pvcData.Volume.Cost, 64)
  754. // override with custom pricing if enabled
  755. if provider.CustomPricesEnabled(cp) {
  756. cost = pvCost
  757. }
  758. for _, val := range pvcData.Values {
  759. pvv = append(pvv, &util.Vector{
  760. Timestamp: math.Round(val.Timestamp/10) * 10,
  761. Value: ((val.Value / 1024 / 1024 / 1024) * cost * (1 - customDiscount) / idleCoefficient) * rateCoeff,
  762. })
  763. }
  764. pvvs = append(pvvs, pvv)
  765. }
  766. }
  767. netv := make([]*util.Vector, 0, len(costDatum.NetworkData))
  768. for _, val := range costDatum.NetworkData {
  769. netv = append(netv, &util.Vector{
  770. Timestamp: math.Round(val.Timestamp/10) * 10,
  771. Value: val.Value,
  772. })
  773. }
  774. return cpuv, ramv, gpuv, pvvs, netv
  775. }
  776. func averageVectors(vectors []*util.Vector) float64 {
  777. if len(vectors) == 0 {
  778. return 0.0
  779. }
  780. return totalVectors(vectors) / float64(len(vectors))
  781. }
  782. func totalVectors(vectors []*util.Vector) float64 {
  783. total := 0.0
  784. for _, vector := range vectors {
  785. total += vector.Value
  786. }
  787. return total
  788. }
  789. // addVectors adds two slices of Vectors. Vector timestamps are rounded to the
  790. // nearest ten seconds to allow matching of Vectors within a delta allowance.
  791. // Matching Vectors are summed, while unmatched Vectors are passed through.
  792. // e.g. [(t=1, 1), (t=2, 2)] + [(t=2, 2), (t=3, 3)] = [(t=1, 1), (t=2, 4), (t=3, 3)]
  793. func addVectors(xvs []*util.Vector, yvs []*util.Vector) []*util.Vector {
  794. sumOp := func(result *util.Vector, x *float64, y *float64) bool {
  795. if x != nil && y != nil {
  796. result.Value = *x + *y
  797. } else if y != nil {
  798. result.Value = *y
  799. } else if x != nil {
  800. result.Value = *x
  801. }
  802. return true
  803. }
  804. return util.ApplyVectorOp(xvs, yvs, sumOp)
  805. }
  806. // minCostDataLength sets the minimum number of time series data required to
  807. // cache both raw and aggregated cost data
  808. const minCostDataLength = 2
  809. // EmptyDataError describes an error caused by empty cost data for some
  810. // defined interval
  811. type EmptyDataError struct {
  812. err error
  813. window opencost.Window
  814. }
  815. // Error implements the error interface
  816. func (ede *EmptyDataError) Error() string {
  817. err := fmt.Sprintf("empty data for range: %s", ede.window)
  818. if ede.err != nil {
  819. err += fmt.Sprintf(": %s", ede.err)
  820. }
  821. return err
  822. }
  823. func costDataTimeSeriesLength(costData map[string]*CostData) int {
  824. l := 0
  825. for _, cd := range costData {
  826. if l < len(cd.RAMAllocation) {
  827. l = len(cd.RAMAllocation)
  828. }
  829. if l < len(cd.CPUAllocation) {
  830. l = len(cd.CPUAllocation)
  831. }
  832. }
  833. return l
  834. }
  835. // ScaleHourlyCostData converts per-hour cost data to per-resolution data. If the target resolution is higher (i.e. < 1.0h)
  836. // then we can do simple multiplication by the fraction-of-an-hour and retain accuracy. If the target resolution is
  837. // lower (i.e. > 1.0h) then we sum groups of hourly data by resolution to maintain fidelity.
  838. // e.g. (100 hours of per-hour hourly data, resolutionHours=10) => 10 data points, grouped and summed by 10-hour window
  839. // e.g. (20 minutes of per-minute hourly data, resolutionHours=1/60) => 20 data points, scaled down by a factor of 60
  840. func ScaleHourlyCostData(data map[string]*CostData, resolutionHours float64) map[string]*CostData {
  841. scaled := map[string]*CostData{}
  842. for key, datum := range data {
  843. datum.RAMReq = scaleVectorSeries(datum.RAMReq, resolutionHours)
  844. datum.RAMUsed = scaleVectorSeries(datum.RAMUsed, resolutionHours)
  845. datum.RAMAllocation = scaleVectorSeries(datum.RAMAllocation, resolutionHours)
  846. datum.CPUReq = scaleVectorSeries(datum.CPUReq, resolutionHours)
  847. datum.CPUUsed = scaleVectorSeries(datum.CPUUsed, resolutionHours)
  848. datum.CPUAllocation = scaleVectorSeries(datum.CPUAllocation, resolutionHours)
  849. datum.GPUReq = scaleVectorSeries(datum.GPUReq, resolutionHours)
  850. datum.NetworkData = scaleVectorSeries(datum.NetworkData, resolutionHours)
  851. for _, pvcDatum := range datum.PVCData {
  852. pvcDatum.Values = scaleVectorSeries(pvcDatum.Values, resolutionHours)
  853. }
  854. scaled[key] = datum
  855. }
  856. return scaled
  857. }
  858. func scaleVectorSeries(vs []*util.Vector, resolutionHours float64) []*util.Vector {
  859. // if scaling to a lower resolution, compress the hourly data for maximum accuracy
  860. if resolutionHours > 1.0 {
  861. return compressVectorSeries(vs, resolutionHours)
  862. }
  863. // if scaling to a higher resolution, simply scale each value down by the fraction of an hour
  864. for _, v := range vs {
  865. v.Value *= resolutionHours
  866. }
  867. return vs
  868. }
  869. func compressVectorSeries(vs []*util.Vector, resolutionHours float64) []*util.Vector {
  870. if len(vs) == 0 {
  871. return vs
  872. }
  873. compressed := []*util.Vector{}
  874. threshold := float64(60 * 60 * resolutionHours)
  875. var acc *util.Vector
  876. for i, v := range vs {
  877. if acc == nil {
  878. // start a new accumulation from current datum
  879. acc = &util.Vector{
  880. Value: vs[i].Value,
  881. Timestamp: vs[i].Timestamp,
  882. }
  883. continue
  884. }
  885. if v.Timestamp-acc.Timestamp < threshold {
  886. // v should be accumulated in current datum
  887. acc.Value += v.Value
  888. } else {
  889. // v falls outside current datum's threshold; append and start a new one
  890. compressed = append(compressed, acc)
  891. acc = &util.Vector{
  892. Value: vs[i].Value,
  893. Timestamp: vs[i].Timestamp,
  894. }
  895. }
  896. }
  897. // append any remaining, incomplete accumulation
  898. if acc != nil {
  899. compressed = append(compressed, acc)
  900. }
  901. return compressed
  902. }
  903. type AggregateQueryOpts struct {
  904. Rate string
  905. Filters map[string]string
  906. SharedResources *SharedResourceInfo
  907. ShareSplit string
  908. AllocateIdle bool
  909. IncludeTimeSeries bool
  910. IncludeEfficiency bool
  911. DisableAggregateCostModelCache bool
  912. ClearCache bool
  913. NoCache bool
  914. NoExpireCache bool
  915. RemoteEnabled bool
  916. DisableSharedOverhead bool
  917. UseETLAdapter bool
  918. }
  919. func DefaultAggregateQueryOpts() *AggregateQueryOpts {
  920. return &AggregateQueryOpts{
  921. Rate: "",
  922. Filters: map[string]string{},
  923. SharedResources: nil,
  924. ShareSplit: SplitTypeWeighted,
  925. AllocateIdle: false,
  926. IncludeTimeSeries: true,
  927. IncludeEfficiency: true,
  928. DisableAggregateCostModelCache: env.IsAggregateCostModelCacheDisabled(),
  929. ClearCache: false,
  930. NoCache: false,
  931. NoExpireCache: false,
  932. RemoteEnabled: env.IsRemoteEnabled(),
  933. DisableSharedOverhead: false,
  934. UseETLAdapter: false,
  935. }
  936. }
  937. // ComputeAggregateCostModel computes cost data for the given window, then aggregates it by the given fields.
  938. // Data is cached on two levels: the aggregation is cached as well as the underlying cost data.
  939. func (a *Accesses) ComputeAggregateCostModel(promClient prometheusClient.Client, window opencost.Window, field string, subfields []string, opts *AggregateQueryOpts) (map[string]*Aggregation, string, error) {
  940. // Window is the range of the query, i.e. (start, end)
  941. // It must be closed, i.e. neither start nor end can be nil
  942. if window.IsOpen() {
  943. return nil, "", fmt.Errorf("illegal window: %s", window)
  944. }
  945. // Resolution is the duration of each datum in the cost model range query,
  946. // which corresponds to both the step size given to Prometheus query_range
  947. // and to the window passed to the range queries.
  948. // i.e. by default, we support 1h resolution for queries of windows defined
  949. // in terms of days or integer multiples of hours (e.g. 1d, 12h)
  950. resolution := time.Hour
  951. // Determine resolution by size of duration and divisibility of window.
  952. // By default, resolution is 1hr. If the window is smaller than 1hr, then
  953. // resolution goes down to 1m. If the window is not a multiple of 1hr, then
  954. // resolution goes down to 1m. If the window is greater than 1d, then
  955. // resolution gets scaled up to improve performance by reducing the amount
  956. // of data being computed.
  957. durMins := int64(math.Trunc(window.Minutes()))
  958. if durMins < 24*60 { // less than 1d
  959. // TODO should we have additional options for going by
  960. // e.g. 30m? 10m? 5m?
  961. if durMins%60 != 0 || durMins < 3*60 { // not divisible by 1h or less than 3h
  962. resolution = time.Minute
  963. }
  964. } else { // greater than 1d
  965. if durMins >= 7*24*60 { // greater than (or equal to) 7 days
  966. resolution = 24.0 * time.Hour
  967. } else if durMins >= 2*24*60 { // greater than (or equal to) 2 days
  968. resolution = 2.0 * time.Hour
  969. }
  970. }
  971. // Parse options
  972. if opts == nil {
  973. opts = DefaultAggregateQueryOpts()
  974. }
  975. rate := opts.Rate
  976. filters := opts.Filters
  977. sri := opts.SharedResources
  978. shared := opts.ShareSplit
  979. allocateIdle := opts.AllocateIdle
  980. includeTimeSeries := opts.IncludeTimeSeries
  981. includeEfficiency := opts.IncludeEfficiency
  982. disableAggregateCostModelCache := opts.DisableAggregateCostModelCache
  983. clearCache := opts.ClearCache
  984. noCache := opts.NoCache
  985. noExpireCache := opts.NoExpireCache
  986. remoteEnabled := opts.RemoteEnabled
  987. disableSharedOverhead := opts.DisableSharedOverhead
  988. // retainFuncs override filterFuncs. Make sure shared resources do not
  989. // get filtered out.
  990. retainFuncs := []FilterFunc{}
  991. retainFuncs = append(retainFuncs, func(cd *CostData) (bool, string) {
  992. if sri != nil {
  993. return sri.IsSharedResource(cd), ""
  994. }
  995. return false, ""
  996. })
  997. // Parse cost data filters into FilterFuncs
  998. filterFuncs := []FilterFunc{}
  999. aggregateEnvironment := func(costDatum *CostData) string {
  1000. if field == "cluster" {
  1001. return costDatum.ClusterID
  1002. } else if field == "node" {
  1003. return costDatum.NodeName
  1004. } else if field == "namespace" {
  1005. return costDatum.Namespace
  1006. } else if field == "service" {
  1007. if len(costDatum.Services) > 0 {
  1008. return costDatum.Namespace + "/" + costDatum.Services[0]
  1009. }
  1010. } else if field == "deployment" {
  1011. if len(costDatum.Deployments) > 0 {
  1012. return costDatum.Namespace + "/" + costDatum.Deployments[0]
  1013. }
  1014. } else if field == "daemonset" {
  1015. if len(costDatum.Daemonsets) > 0 {
  1016. return costDatum.Namespace + "/" + costDatum.Daemonsets[0]
  1017. }
  1018. } else if field == "statefulset" {
  1019. if len(costDatum.Statefulsets) > 0 {
  1020. return costDatum.Namespace + "/" + costDatum.Statefulsets[0]
  1021. }
  1022. } else if field == "label" {
  1023. if costDatum.Labels != nil {
  1024. for _, sf := range subfields {
  1025. if subfieldName, ok := costDatum.Labels[sf]; ok {
  1026. return fmt.Sprintf("%s=%s", sf, subfieldName)
  1027. }
  1028. }
  1029. }
  1030. } else if field == "annotation" {
  1031. if costDatum.Annotations != nil {
  1032. for _, sf := range subfields {
  1033. if subfieldName, ok := costDatum.Annotations[sf]; ok {
  1034. return fmt.Sprintf("%s=%s", sf, subfieldName)
  1035. }
  1036. }
  1037. }
  1038. } else if field == "pod" {
  1039. return costDatum.Namespace + "/" + costDatum.PodName
  1040. } else if field == "container" {
  1041. return costDatum.Namespace + "/" + costDatum.PodName + "/" + costDatum.Name
  1042. }
  1043. return ""
  1044. }
  1045. if filters["podprefix"] != "" {
  1046. pps := []string{}
  1047. for _, fp := range strings.Split(filters["podprefix"], ",") {
  1048. if fp != "" {
  1049. cleanedFilter := strings.TrimSpace(fp)
  1050. pps = append(pps, cleanedFilter)
  1051. }
  1052. }
  1053. filterFuncs = append(filterFuncs, func(cd *CostData) (bool, string) {
  1054. aggEnv := aggregateEnvironment(cd)
  1055. for _, pp := range pps {
  1056. cleanedFilter := strings.TrimSpace(pp)
  1057. if strings.HasPrefix(cd.PodName, cleanedFilter) {
  1058. return true, aggEnv
  1059. }
  1060. }
  1061. return false, aggEnv
  1062. })
  1063. }
  1064. if filters["namespace"] != "" {
  1065. // namespaces may be comma-separated, e.g. kubecost,default
  1066. // multiple namespaces are evaluated as an OR relationship
  1067. nss := strings.Split(filters["namespace"], ",")
  1068. filterFuncs = append(filterFuncs, func(cd *CostData) (bool, string) {
  1069. aggEnv := aggregateEnvironment(cd)
  1070. for _, ns := range nss {
  1071. nsTrim := strings.TrimSpace(ns)
  1072. if cd.Namespace == nsTrim {
  1073. return true, aggEnv
  1074. } else if strings.HasSuffix(nsTrim, "*") { // trigger wildcard prefix filtering
  1075. nsTrimAsterisk := strings.TrimSuffix(nsTrim, "*")
  1076. if strings.HasPrefix(cd.Namespace, nsTrimAsterisk) {
  1077. return true, aggEnv
  1078. }
  1079. }
  1080. }
  1081. return false, aggEnv
  1082. })
  1083. }
  1084. if filters["node"] != "" {
  1085. // nodes may be comma-separated, e.g. aws-node-1,aws-node-2
  1086. // multiple nodes are evaluated as an OR relationship
  1087. nodes := strings.Split(filters["node"], ",")
  1088. filterFuncs = append(filterFuncs, func(cd *CostData) (bool, string) {
  1089. aggEnv := aggregateEnvironment(cd)
  1090. for _, node := range nodes {
  1091. nodeTrim := strings.TrimSpace(node)
  1092. if cd.NodeName == nodeTrim {
  1093. return true, aggEnv
  1094. } else if strings.HasSuffix(nodeTrim, "*") { // trigger wildcard prefix filtering
  1095. nodeTrimAsterisk := strings.TrimSuffix(nodeTrim, "*")
  1096. if strings.HasPrefix(cd.NodeName, nodeTrimAsterisk) {
  1097. return true, aggEnv
  1098. }
  1099. }
  1100. }
  1101. return false, aggEnv
  1102. })
  1103. }
  1104. if filters["cluster"] != "" {
  1105. // clusters may be comma-separated, e.g. cluster-one,cluster-two
  1106. // multiple clusters are evaluated as an OR relationship
  1107. cs := strings.Split(filters["cluster"], ",")
  1108. filterFuncs = append(filterFuncs, func(cd *CostData) (bool, string) {
  1109. aggEnv := aggregateEnvironment(cd)
  1110. for _, c := range cs {
  1111. cTrim := strings.TrimSpace(c)
  1112. id, name := cd.ClusterID, cd.ClusterName
  1113. if id == cTrim || name == cTrim {
  1114. return true, aggEnv
  1115. } else if strings.HasSuffix(cTrim, "*") { // trigger wildcard prefix filtering
  1116. cTrimAsterisk := strings.TrimSuffix(cTrim, "*")
  1117. if strings.HasPrefix(id, cTrimAsterisk) || strings.HasPrefix(name, cTrimAsterisk) {
  1118. return true, aggEnv
  1119. }
  1120. }
  1121. }
  1122. return false, aggEnv
  1123. })
  1124. }
  1125. if filters["labels"] != "" {
  1126. // labels are expected to be comma-separated and to take the form key=value
  1127. // e.g. app=cost-analyzer,app.kubernetes.io/instance=kubecost
  1128. // each different label will be applied as an AND
  1129. // multiple values for a single label will be evaluated as an OR
  1130. labelValues := map[string][]string{}
  1131. ls := strings.Split(filters["labels"], ",")
  1132. for _, l := range ls {
  1133. lTrim := strings.TrimSpace(l)
  1134. label := strings.Split(lTrim, "=")
  1135. if len(label) == 2 {
  1136. ln := promutil.SanitizeLabelName(strings.TrimSpace(label[0]))
  1137. lv := strings.TrimSpace(label[1])
  1138. labelValues[ln] = append(labelValues[ln], lv)
  1139. } else {
  1140. // label is not of the form name=value, so log it and move on
  1141. log.Warnf("ComputeAggregateCostModel: skipping illegal label filter: %s", l)
  1142. }
  1143. }
  1144. // Generate FilterFunc for each set of label filters by invoking a function instead of accessing
  1145. // values by closure to prevent reference-type looping bug.
  1146. // (see https://github.com/golang/go/wiki/CommonMistakes#using-reference-to-loop-iterator-variable)
  1147. for label, values := range labelValues {
  1148. ff := (func(l string, vs []string) FilterFunc {
  1149. return func(cd *CostData) (bool, string) {
  1150. ae := aggregateEnvironment(cd)
  1151. for _, v := range vs {
  1152. if v == "__unallocated__" { // Special case. __unallocated__ means return all pods without the attached label
  1153. if _, ok := cd.Labels[l]; !ok {
  1154. return true, ae
  1155. }
  1156. }
  1157. if cd.Labels[l] == v {
  1158. return true, ae
  1159. } else if strings.HasSuffix(v, "*") { // trigger wildcard prefix filtering
  1160. vTrim := strings.TrimSuffix(v, "*")
  1161. if strings.HasPrefix(cd.Labels[l], vTrim) {
  1162. return true, ae
  1163. }
  1164. }
  1165. }
  1166. return false, ae
  1167. }
  1168. })(label, values)
  1169. filterFuncs = append(filterFuncs, ff)
  1170. }
  1171. }
  1172. if filters["annotations"] != "" {
  1173. // annotations are expected to be comma-separated and to take the form key=value
  1174. // e.g. app=cost-analyzer,app.kubernetes.io/instance=kubecost
  1175. // each different annotation will be applied as an AND
  1176. // multiple values for a single annotation will be evaluated as an OR
  1177. annotationValues := map[string][]string{}
  1178. as := strings.Split(filters["annotations"], ",")
  1179. for _, annot := range as {
  1180. aTrim := strings.TrimSpace(annot)
  1181. annotation := strings.Split(aTrim, "=")
  1182. if len(annotation) == 2 {
  1183. an := promutil.SanitizeLabelName(strings.TrimSpace(annotation[0]))
  1184. av := strings.TrimSpace(annotation[1])
  1185. annotationValues[an] = append(annotationValues[an], av)
  1186. } else {
  1187. // annotation is not of the form name=value, so log it and move on
  1188. log.Warnf("ComputeAggregateCostModel: skipping illegal annotation filter: %s", annot)
  1189. }
  1190. }
  1191. // Generate FilterFunc for each set of annotation filters by invoking a function instead of accessing
  1192. // values by closure to prevent reference-type looping bug.
  1193. // (see https://github.com/golang/go/wiki/CommonMistakes#using-reference-to-loop-iterator-variable)
  1194. for annotation, values := range annotationValues {
  1195. ff := (func(l string, vs []string) FilterFunc {
  1196. return func(cd *CostData) (bool, string) {
  1197. ae := aggregateEnvironment(cd)
  1198. for _, v := range vs {
  1199. if v == "__unallocated__" { // Special case. __unallocated__ means return all pods without the attached label
  1200. if _, ok := cd.Annotations[l]; !ok {
  1201. return true, ae
  1202. }
  1203. }
  1204. if cd.Annotations[l] == v {
  1205. return true, ae
  1206. } else if strings.HasSuffix(v, "*") { // trigger wildcard prefix filtering
  1207. vTrim := strings.TrimSuffix(v, "*")
  1208. if strings.HasPrefix(cd.Annotations[l], vTrim) {
  1209. return true, ae
  1210. }
  1211. }
  1212. }
  1213. return false, ae
  1214. }
  1215. })(annotation, values)
  1216. filterFuncs = append(filterFuncs, ff)
  1217. }
  1218. }
  1219. // clear cache prior to checking the cache so that a clearCache=true
  1220. // request always returns a freshly computed value
  1221. if clearCache {
  1222. a.AggregateCache.Flush()
  1223. a.CostDataCache.Flush()
  1224. }
  1225. cacheExpiry := a.GetCacheExpiration(window.Duration())
  1226. if noExpireCache {
  1227. cacheExpiry = cache.NoExpiration
  1228. }
  1229. // parametrize cache key by all request parameters
  1230. aggKey := GenerateAggKey(window, field, subfields, opts)
  1231. thanosOffset := time.Now().Add(-thanos.OffsetDuration())
  1232. if a.ThanosClient != nil && window.End().After(thanosOffset) {
  1233. log.Infof("ComputeAggregateCostModel: setting end time backwards to first present data")
  1234. // Apply offsets to both end and start times to maintain correct time range
  1235. deltaDuration := window.End().Sub(thanosOffset)
  1236. s := window.Start().Add(-1 * deltaDuration)
  1237. e := time.Now().Add(-thanos.OffsetDuration())
  1238. window.Set(&s, &e)
  1239. }
  1240. dur, off := window.DurationOffsetStrings()
  1241. key := fmt.Sprintf(`%s:%s:%fh:%t`, dur, off, resolution.Hours(), remoteEnabled)
  1242. // report message about which of the two caches hit. by default report a miss
  1243. cacheMessage := fmt.Sprintf("ComputeAggregateCostModel: L1 cache miss: %s L2 cache miss: %s", aggKey, key)
  1244. // check the cache for aggregated response; if cache is hit and not disabled, return response
  1245. if value, found := a.AggregateCache.Get(aggKey); found && !disableAggregateCostModelCache && !noCache {
  1246. result, ok := value.(map[string]*Aggregation)
  1247. if !ok {
  1248. // disable cache and recompute if type cast fails
  1249. log.Errorf("ComputeAggregateCostModel: caching error: failed to cast aggregate data to struct: %s", aggKey)
  1250. return a.ComputeAggregateCostModel(promClient, window, field, subfields, opts)
  1251. }
  1252. return result, fmt.Sprintf("aggregate cache hit: %s", aggKey), nil
  1253. }
  1254. if window.Hours() >= 1.0 {
  1255. // exclude the last window of the time frame to match Prometheus definitions of range, offset, and resolution
  1256. start := window.Start().Add(resolution)
  1257. window.Set(&start, window.End())
  1258. } else {
  1259. // don't cache requests for durations of less than one hour
  1260. disableAggregateCostModelCache = true
  1261. }
  1262. // attempt to retrieve cost data from cache
  1263. var costData map[string]*CostData
  1264. var err error
  1265. cacheData, found := a.CostDataCache.Get(key)
  1266. if found && !disableAggregateCostModelCache && !noCache {
  1267. ok := false
  1268. costData, ok = cacheData.(map[string]*CostData)
  1269. cacheMessage = fmt.Sprintf("ComputeAggregateCostModel: L1 cache miss: %s, L2 cost data cache hit: %s", aggKey, key)
  1270. if !ok {
  1271. log.Errorf("ComputeAggregateCostModel: caching error: failed to cast cost data to struct: %s", key)
  1272. }
  1273. } else {
  1274. log.Infof("ComputeAggregateCostModel: missed cache: %s (found %t, disableAggregateCostModelCache %t, noCache %t)", key, found, disableAggregateCostModelCache, noCache)
  1275. costData, err = a.Model.ComputeCostDataRange(promClient, a.CloudProvider, window, resolution, "", "", remoteEnabled)
  1276. if err != nil {
  1277. if prom.IsErrorCollection(err) {
  1278. return nil, "", err
  1279. }
  1280. if pce, ok := err.(prom.CommError); ok {
  1281. return nil, "", pce
  1282. }
  1283. if strings.Contains(err.Error(), "data is empty") {
  1284. return nil, "", &EmptyDataError{err: err, window: window}
  1285. }
  1286. return nil, "", err
  1287. }
  1288. // compute length of the time series in the cost data and only compute
  1289. // aggregates and cache if the length is sufficiently high
  1290. costDataLen := costDataTimeSeriesLength(costData)
  1291. if costDataLen == 0 {
  1292. return nil, "", &EmptyDataError{window: window}
  1293. }
  1294. if costDataLen >= minCostDataLength && !noCache {
  1295. log.Infof("ComputeAggregateCostModel: setting L2 cache: %s", key)
  1296. a.CostDataCache.Set(key, costData, cacheExpiry)
  1297. }
  1298. }
  1299. c, err := a.CloudProvider.GetConfig()
  1300. if err != nil {
  1301. return nil, "", err
  1302. }
  1303. discount, err := ParsePercentString(c.Discount)
  1304. if err != nil {
  1305. return nil, "", err
  1306. }
  1307. customDiscount, err := ParsePercentString(c.NegotiatedDiscount)
  1308. if err != nil {
  1309. return nil, "", err
  1310. }
  1311. sc := make(map[string]*SharedCostInfo)
  1312. if !disableSharedOverhead {
  1313. costPerMonth := c.GetSharedOverheadCostPerMonth()
  1314. durationCoefficient := window.Hours() / timeutil.HoursPerMonth
  1315. sc["total"] = &SharedCostInfo{
  1316. Name: "total",
  1317. Cost: costPerMonth * durationCoefficient,
  1318. }
  1319. }
  1320. idleCoefficients := make(map[string]float64)
  1321. if allocateIdle {
  1322. dur, off, err := window.DurationOffset()
  1323. if err != nil {
  1324. log.Errorf("ComputeAggregateCostModel: error computing idle coefficient: illegal window: %s (%s)", window, err)
  1325. return nil, "", err
  1326. }
  1327. if a.ThanosClient != nil && off < thanos.OffsetDuration() {
  1328. // Determine difference between the Thanos offset and the requested
  1329. // offset; e.g. off=1h, thanosOffsetDuration=3h => diff=2h
  1330. diff := thanos.OffsetDuration() - off
  1331. // Reduce duration by difference and increase offset by difference
  1332. // e.g. 24h offset 0h => 21h offset 3h
  1333. dur = dur - diff
  1334. off = thanos.OffsetDuration()
  1335. log.Infof("ComputeAggregateCostModel: setting duration, offset to %s, %s due to Thanos", dur, off)
  1336. // Idle computation cannot be fulfilled for some windows, specifically
  1337. // those with sum(duration, offset) < Thanos offset, because there is
  1338. // no data within that window.
  1339. if dur <= 0 {
  1340. return nil, "", fmt.Errorf("requested idle coefficients from Thanos for illegal duration, offset: %s, %s (original window %s)", dur, off, window)
  1341. }
  1342. }
  1343. idleCoefficients, err = a.ComputeIdleCoefficient(costData, promClient, a.CloudProvider, discount, customDiscount, dur, off)
  1344. if err != nil {
  1345. durStr, offStr := timeutil.DurationOffsetStrings(dur, off)
  1346. log.Errorf("ComputeAggregateCostModel: error computing idle coefficient: duration=%s, offset=%s, err=%s", durStr, offStr, err)
  1347. return nil, "", err
  1348. }
  1349. }
  1350. totalContainerCost := 0.0
  1351. if shared == SplitTypeWeighted {
  1352. totalContainerCost = GetTotalContainerCost(costData, rate, a.CloudProvider, discount, customDiscount, idleCoefficients)
  1353. }
  1354. // filter cost data by namespace and cluster after caching for maximal cache hits
  1355. costData, filteredContainerCount, filteredEnvironments := FilterCostData(costData, retainFuncs, filterFuncs)
  1356. // aggregate cost model data by given fields and cache the result for the default expiration
  1357. aggOpts := &AggregationOptions{
  1358. Discount: discount,
  1359. CustomDiscount: customDiscount,
  1360. IdleCoefficients: idleCoefficients,
  1361. IncludeEfficiency: includeEfficiency,
  1362. IncludeTimeSeries: includeTimeSeries,
  1363. Rate: rate,
  1364. ResolutionHours: resolution.Hours(),
  1365. SharedResourceInfo: sri,
  1366. SharedCosts: sc,
  1367. FilteredContainerCount: filteredContainerCount,
  1368. FilteredEnvironments: filteredEnvironments,
  1369. TotalContainerCost: totalContainerCost,
  1370. SharedSplit: shared,
  1371. }
  1372. result := AggregateCostData(costData, field, subfields, a.CloudProvider, aggOpts)
  1373. // If sending time series data back, switch scale back to hourly data. At this point,
  1374. // resolutionHours may have converted our hourly data to more- or less-than hourly data.
  1375. if includeTimeSeries {
  1376. for _, aggs := range result {
  1377. ScaleAggregationTimeSeries(aggs, resolution.Hours())
  1378. }
  1379. }
  1380. // compute length of the time series in the cost data and only cache
  1381. // aggregation results if the length is sufficiently high
  1382. costDataLen := costDataTimeSeriesLength(costData)
  1383. if costDataLen >= minCostDataLength && window.Hours() > 1.0 && !noCache {
  1384. // Set the result map (rather than a pointer to it) because map is a reference type
  1385. log.Infof("ComputeAggregateCostModel: setting aggregate cache: %s", aggKey)
  1386. a.AggregateCache.Set(aggKey, result, cacheExpiry)
  1387. } else {
  1388. log.Infof("ComputeAggregateCostModel: not setting aggregate cache: %s (not enough data: %t; duration less than 1h: %t; noCache: %t)", key, costDataLen < minCostDataLength, window.Hours() < 1, noCache)
  1389. }
  1390. return result, cacheMessage, nil
  1391. }
  1392. // ScaleAggregationTimeSeries reverses the scaling done by ScaleHourlyCostData, returning
  1393. // the aggregation's time series to hourly data.
  1394. func ScaleAggregationTimeSeries(aggregation *Aggregation, resolutionHours float64) {
  1395. for _, v := range aggregation.CPUCostVector {
  1396. v.Value /= resolutionHours
  1397. }
  1398. for _, v := range aggregation.GPUCostVector {
  1399. v.Value /= resolutionHours
  1400. }
  1401. for _, v := range aggregation.RAMCostVector {
  1402. v.Value /= resolutionHours
  1403. }
  1404. for _, v := range aggregation.PVCostVector {
  1405. v.Value /= resolutionHours
  1406. }
  1407. for _, v := range aggregation.NetworkCostVector {
  1408. v.Value /= resolutionHours
  1409. }
  1410. for _, v := range aggregation.TotalCostVector {
  1411. v.Value /= resolutionHours
  1412. }
  1413. return
  1414. }
  1415. // String returns a string representation of the encapsulated shared resources, which
  1416. // can be used to uniquely identify a set of shared resources. Sorting sets of shared
  1417. // resources ensures that strings representing permutations of the same combination match.
  1418. func (s *SharedResourceInfo) String() string {
  1419. if s == nil {
  1420. return ""
  1421. }
  1422. nss := []string{}
  1423. for ns := range s.SharedNamespace {
  1424. nss = append(nss, ns)
  1425. }
  1426. sort.Strings(nss)
  1427. nsStr := strings.Join(nss, ",")
  1428. labels := []string{}
  1429. for lbl, vals := range s.LabelSelectors {
  1430. for val := range vals {
  1431. if lbl != "" && val != "" {
  1432. labels = append(labels, fmt.Sprintf("%s=%s", lbl, val))
  1433. }
  1434. }
  1435. }
  1436. sort.Strings(labels)
  1437. labelStr := strings.Join(labels, ",")
  1438. return fmt.Sprintf("%s:%s", nsStr, labelStr)
  1439. }
  1440. type aggKeyParams struct {
  1441. duration string
  1442. offset string
  1443. filters map[string]string
  1444. field string
  1445. subfields []string
  1446. rate string
  1447. sri *SharedResourceInfo
  1448. shareType string
  1449. idle bool
  1450. timeSeries bool
  1451. efficiency bool
  1452. }
  1453. // GenerateAggKey generates a parameter-unique key for caching the aggregate cost model
  1454. func GenerateAggKey(window opencost.Window, field string, subfields []string, opts *AggregateQueryOpts) string {
  1455. if opts == nil {
  1456. opts = DefaultAggregateQueryOpts()
  1457. }
  1458. // Covert to duration, offset so that cache hits occur, even when timestamps have
  1459. // shifted slightly.
  1460. duration, offset := window.DurationOffsetStrings()
  1461. // parse, trim, and sort podprefix filters
  1462. podPrefixFilters := []string{}
  1463. if ppfs, ok := opts.Filters["podprefix"]; ok && ppfs != "" {
  1464. for _, psf := range strings.Split(ppfs, ",") {
  1465. podPrefixFilters = append(podPrefixFilters, strings.TrimSpace(psf))
  1466. }
  1467. }
  1468. sort.Strings(podPrefixFilters)
  1469. podPrefixFiltersStr := strings.Join(podPrefixFilters, ",")
  1470. // parse, trim, and sort namespace filters
  1471. nsFilters := []string{}
  1472. if nsfs, ok := opts.Filters["namespace"]; ok && nsfs != "" {
  1473. for _, nsf := range strings.Split(nsfs, ",") {
  1474. nsFilters = append(nsFilters, strings.TrimSpace(nsf))
  1475. }
  1476. }
  1477. sort.Strings(nsFilters)
  1478. nsFilterStr := strings.Join(nsFilters, ",")
  1479. // parse, trim, and sort node filters
  1480. nodeFilters := []string{}
  1481. if nodefs, ok := opts.Filters["node"]; ok && nodefs != "" {
  1482. for _, nodef := range strings.Split(nodefs, ",") {
  1483. nodeFilters = append(nodeFilters, strings.TrimSpace(nodef))
  1484. }
  1485. }
  1486. sort.Strings(nodeFilters)
  1487. nodeFilterStr := strings.Join(nodeFilters, ",")
  1488. // parse, trim, and sort cluster filters
  1489. cFilters := []string{}
  1490. if cfs, ok := opts.Filters["cluster"]; ok && cfs != "" {
  1491. for _, cf := range strings.Split(cfs, ",") {
  1492. cFilters = append(cFilters, strings.TrimSpace(cf))
  1493. }
  1494. }
  1495. sort.Strings(cFilters)
  1496. cFilterStr := strings.Join(cFilters, ",")
  1497. // parse, trim, and sort label filters
  1498. lFilters := []string{}
  1499. if lfs, ok := opts.Filters["labels"]; ok && lfs != "" {
  1500. for _, lf := range strings.Split(lfs, ",") {
  1501. // trim whitespace from the label name and the label value
  1502. // of each label name/value pair, then reconstruct
  1503. // e.g. "tier = frontend, app = kubecost" == "app=kubecost,tier=frontend"
  1504. lfa := strings.Split(lf, "=")
  1505. if len(lfa) == 2 {
  1506. lfn := strings.TrimSpace(lfa[0])
  1507. lfv := strings.TrimSpace(lfa[1])
  1508. lFilters = append(lFilters, fmt.Sprintf("%s=%s", lfn, lfv))
  1509. } else {
  1510. // label is not of the form name=value, so log it and move on
  1511. log.Warnf("GenerateAggKey: skipping illegal label filter: %s", lf)
  1512. }
  1513. }
  1514. }
  1515. sort.Strings(lFilters)
  1516. lFilterStr := strings.Join(lFilters, ",")
  1517. // parse, trim, and sort annotation filters
  1518. aFilters := []string{}
  1519. if afs, ok := opts.Filters["annotations"]; ok && afs != "" {
  1520. for _, af := range strings.Split(afs, ",") {
  1521. // trim whitespace from the annotation name and the annotation value
  1522. // of each annotation name/value pair, then reconstruct
  1523. // e.g. "tier = frontend, app = kubecost" == "app=kubecost,tier=frontend"
  1524. afa := strings.Split(af, "=")
  1525. if len(afa) == 2 {
  1526. afn := strings.TrimSpace(afa[0])
  1527. afv := strings.TrimSpace(afa[1])
  1528. aFilters = append(aFilters, fmt.Sprintf("%s=%s", afn, afv))
  1529. } else {
  1530. // annotation is not of the form name=value, so log it and move on
  1531. log.Warnf("GenerateAggKey: skipping illegal annotation filter: %s", af)
  1532. }
  1533. }
  1534. }
  1535. sort.Strings(aFilters)
  1536. aFilterStr := strings.Join(aFilters, ",")
  1537. filterStr := fmt.Sprintf("%s:%s:%s:%s:%s:%s", nsFilterStr, nodeFilterStr, cFilterStr, lFilterStr, aFilterStr, podPrefixFiltersStr)
  1538. sort.Strings(subfields)
  1539. fieldStr := fmt.Sprintf("%s:%s", field, strings.Join(subfields, ","))
  1540. if offset == "1m" {
  1541. offset = ""
  1542. }
  1543. return fmt.Sprintf("%s:%s:%s:%s:%s:%s:%s:%t:%t:%t", duration, offset, filterStr, fieldStr, opts.Rate,
  1544. opts.SharedResources, opts.ShareSplit, opts.AllocateIdle, opts.IncludeTimeSeries,
  1545. opts.IncludeEfficiency)
  1546. }
  1547. // Aggregator is capable of computing the aggregated cost model. This is
  1548. // a brutal interface, which should be cleaned up, but it's necessary for
  1549. // being able to swap in an ETL-backed implementation.
  1550. type Aggregator interface {
  1551. ComputeAggregateCostModel(promClient prometheusClient.Client, window opencost.Window, field string, subfields []string, opts *AggregateQueryOpts) (map[string]*Aggregation, string, error)
  1552. }
  1553. func (a *Accesses) warmAggregateCostModelCache() {
  1554. // Only allow one concurrent cache-warming operation
  1555. sem := util.NewSemaphore(1)
  1556. // Set default values, pulling them from application settings where applicable, and warm the cache
  1557. // for the given duration. Cache is intentionally set to expire (i.e. noExpireCache=false) so that
  1558. // if the default parameters change, the old cached defaults with eventually expire. Thus, the
  1559. // timing of the cache expiry/refresh is the only mechanism ensuring 100% cache warmth.
  1560. warmFunc := func(duration, offset time.Duration, cacheEfficiencyData bool) (error, error) {
  1561. if a.ThanosClient != nil {
  1562. duration = thanos.OffsetDuration()
  1563. log.Infof("Setting Offset to %s", duration)
  1564. }
  1565. fmtDuration, fmtOffset := timeutil.DurationOffsetStrings(duration, offset)
  1566. durationHrs, err := timeutil.FormatDurationStringDaysToHours(fmtDuration)
  1567. promClient := a.GetPrometheusClient(true)
  1568. windowStr := fmt.Sprintf("%s offset %s", fmtDuration, fmtOffset)
  1569. window, err := opencost.ParseWindowUTC(windowStr)
  1570. if err != nil {
  1571. return nil, fmt.Errorf("invalid window from window string: %s", windowStr)
  1572. }
  1573. field := "namespace"
  1574. subfields := []string{}
  1575. aggOpts := DefaultAggregateQueryOpts()
  1576. aggOpts.Rate = ""
  1577. aggOpts.Filters = map[string]string{}
  1578. aggOpts.IncludeTimeSeries = false
  1579. aggOpts.IncludeEfficiency = true
  1580. aggOpts.DisableAggregateCostModelCache = true
  1581. aggOpts.ClearCache = false
  1582. aggOpts.NoCache = false
  1583. aggOpts.NoExpireCache = false
  1584. aggOpts.ShareSplit = SplitTypeWeighted
  1585. aggOpts.RemoteEnabled = env.IsRemoteEnabled()
  1586. aggOpts.AllocateIdle = provider.AllocateIdleByDefault(a.CloudProvider)
  1587. sharedNamespaces := provider.SharedNamespaces(a.CloudProvider)
  1588. sharedLabelNames, sharedLabelValues := provider.SharedLabels(a.CloudProvider)
  1589. if len(sharedNamespaces) > 0 || len(sharedLabelNames) > 0 {
  1590. aggOpts.SharedResources = NewSharedResourceInfo(true, sharedNamespaces, sharedLabelNames, sharedLabelValues)
  1591. }
  1592. aggKey := GenerateAggKey(window, field, subfields, aggOpts)
  1593. log.Infof("aggregation: cache warming defaults: %s", aggKey)
  1594. key := fmt.Sprintf("%s:%s", durationHrs, fmtOffset)
  1595. _, _, aggErr := a.ComputeAggregateCostModel(promClient, window, field, subfields, aggOpts)
  1596. if aggErr != nil {
  1597. log.Infof("Error building cache %s: %s", window, aggErr)
  1598. }
  1599. totals, err := a.ComputeClusterCosts(promClient, a.CloudProvider, duration, offset, cacheEfficiencyData)
  1600. if err != nil {
  1601. log.Infof("Error building cluster costs cache %s", key)
  1602. }
  1603. maxMinutesWithData := 0.0
  1604. for _, cluster := range totals {
  1605. if cluster.DataMinutes > maxMinutesWithData {
  1606. maxMinutesWithData = cluster.DataMinutes
  1607. }
  1608. }
  1609. if len(totals) > 0 && maxMinutesWithData > clusterCostsCacheMinutes {
  1610. a.ClusterCostsCache.Set(key, totals, a.GetCacheExpiration(window.Duration()))
  1611. log.Infof("caching %s cluster costs for %s", fmtDuration, a.GetCacheExpiration(window.Duration()))
  1612. } else {
  1613. log.Warnf("not caching %s cluster costs: no data or less than %f minutes data ", fmtDuration, clusterCostsCacheMinutes)
  1614. }
  1615. return aggErr, err
  1616. }
  1617. // 1 day
  1618. go func(sem *util.Semaphore) {
  1619. defer errors.HandlePanic()
  1620. offset := time.Minute
  1621. duration := 24 * time.Hour
  1622. for {
  1623. sem.Acquire()
  1624. warmFunc(duration, offset, true)
  1625. sem.Return()
  1626. log.Infof("aggregation: warm cache: %s", timeutil.DurationString(duration))
  1627. time.Sleep(a.GetCacheRefresh(duration))
  1628. }
  1629. }(sem)
  1630. if !env.IsETLEnabled() {
  1631. // 2 day
  1632. go func(sem *util.Semaphore) {
  1633. defer errors.HandlePanic()
  1634. offset := time.Minute
  1635. duration := 2 * 24 * time.Hour
  1636. for {
  1637. sem.Acquire()
  1638. warmFunc(duration, offset, false)
  1639. sem.Return()
  1640. log.Infof("aggregation: warm cache: %s", timeutil.DurationString(duration))
  1641. time.Sleep(a.GetCacheRefresh(duration))
  1642. }
  1643. }(sem)
  1644. // 7 day
  1645. go func(sem *util.Semaphore) {
  1646. defer errors.HandlePanic()
  1647. offset := time.Minute
  1648. duration := 7 * 24 * time.Hour
  1649. for {
  1650. sem.Acquire()
  1651. aggErr, err := warmFunc(duration, offset, false)
  1652. sem.Return()
  1653. log.Infof("aggregation: warm cache: %s", timeutil.DurationString(duration))
  1654. if aggErr == nil && err == nil {
  1655. time.Sleep(a.GetCacheRefresh(duration))
  1656. } else {
  1657. time.Sleep(5 * time.Minute)
  1658. }
  1659. }
  1660. }(sem)
  1661. // 30 day
  1662. go func(sem *util.Semaphore) {
  1663. defer errors.HandlePanic()
  1664. for {
  1665. offset := time.Minute
  1666. duration := 30 * 24 * time.Hour
  1667. sem.Acquire()
  1668. aggErr, err := warmFunc(duration, offset, false)
  1669. sem.Return()
  1670. if aggErr == nil && err == nil {
  1671. time.Sleep(a.GetCacheRefresh(duration))
  1672. } else {
  1673. time.Sleep(5 * time.Minute)
  1674. }
  1675. }
  1676. }(sem)
  1677. }
  1678. }
  1679. var (
  1680. // Convert UTC-RFC3339 pairs to configured UTC offset
  1681. // e.g. with UTC offset of -0600, 2020-07-01T00:00:00Z becomes
  1682. // 2020-07-01T06:00:00Z == 2020-07-01T00:00:00-0600
  1683. // TODO niko/etl fix the frontend because this is confusing if you're
  1684. // actually asking for UTC time (...Z) and we swap that "Z" out for the
  1685. // configured UTC offset without asking
  1686. rfc3339 = `\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\dZ`
  1687. rfc3339Regex = regexp.MustCompile(fmt.Sprintf(`(%s),(%s)`, rfc3339, rfc3339))
  1688. durRegex = regexp.MustCompile(`^(\d+)(m|h|d|s)$`)
  1689. percentRegex = regexp.MustCompile(`(\d+\.*\d*)%`)
  1690. )
  1691. // AggregateCostModelHandler handles requests to the aggregated cost model API. See
  1692. // ComputeAggregateCostModel for details.
  1693. func (a *Accesses) AggregateCostModelHandler(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  1694. w.Header().Set("Content-Type", "application/json")
  1695. windowStr := r.URL.Query().Get("window")
  1696. match := rfc3339Regex.FindStringSubmatch(windowStr)
  1697. if match != nil {
  1698. start, _ := time.Parse(time.RFC3339, match[1])
  1699. start = start.Add(-env.GetParsedUTCOffset()).In(time.UTC)
  1700. end, _ := time.Parse(time.RFC3339, match[2])
  1701. end = end.Add(-env.GetParsedUTCOffset()).In(time.UTC)
  1702. windowStr = fmt.Sprintf("%sZ,%sZ", start.Format("2006-01-02T15:04:05"), end.Format("2006-01-02T15:04:05Z"))
  1703. }
  1704. // determine duration and offset from query parameters
  1705. window, err := opencost.ParseWindowWithOffset(windowStr, env.GetParsedUTCOffset())
  1706. if err != nil || window.Start() == nil {
  1707. WriteError(w, BadRequest(fmt.Sprintf("invalid window: %s", err)))
  1708. return
  1709. }
  1710. isDurationStr := durRegex.MatchString(windowStr)
  1711. // legacy offset option should override window offset
  1712. if r.URL.Query().Get("offset") != "" {
  1713. offset := r.URL.Query().Get("offset")
  1714. // Shift window by offset, but only when manually set with separate
  1715. // parameter and window was provided as a duration string. Otherwise,
  1716. // do not alter the (duration, offset) from ParseWindowWithOffset.
  1717. if offset != "1m" && isDurationStr {
  1718. match := durRegex.FindStringSubmatch(offset)
  1719. if match != nil && len(match) == 3 {
  1720. dur := time.Minute
  1721. if match[2] == "h" {
  1722. dur = time.Hour
  1723. }
  1724. if match[2] == "d" {
  1725. dur = 24 * time.Hour
  1726. }
  1727. if match[2] == "s" {
  1728. dur = time.Second
  1729. }
  1730. num, _ := strconv.ParseInt(match[1], 10, 64)
  1731. window = window.Shift(-time.Duration(num) * dur)
  1732. }
  1733. }
  1734. }
  1735. opts := DefaultAggregateQueryOpts()
  1736. // parse remaining query parameters
  1737. namespace := r.URL.Query().Get("namespace")
  1738. cluster := r.URL.Query().Get("cluster")
  1739. labels := r.URL.Query().Get("labels")
  1740. annotations := r.URL.Query().Get("annotations")
  1741. podprefix := r.URL.Query().Get("podprefix")
  1742. field := r.URL.Query().Get("aggregation")
  1743. sharedNamespaces := r.URL.Query().Get("sharedNamespaces")
  1744. sharedLabelNames := r.URL.Query().Get("sharedLabelNames")
  1745. sharedLabelValues := r.URL.Query().Get("sharedLabelValues")
  1746. remote := r.URL.Query().Get("remote") != "false"
  1747. subfieldStr := r.URL.Query().Get("aggregationSubfield")
  1748. subfields := []string{}
  1749. if len(subfieldStr) > 0 {
  1750. s := strings.Split(r.URL.Query().Get("aggregationSubfield"), ",")
  1751. for _, rawLabel := range s {
  1752. subfields = append(subfields, promutil.SanitizeLabelName(rawLabel))
  1753. }
  1754. }
  1755. idleFlag := r.URL.Query().Get("allocateIdle")
  1756. if idleFlag == "default" {
  1757. c, _ := a.CloudProvider.GetConfig()
  1758. opts.AllocateIdle = (c.DefaultIdle == "true")
  1759. } else {
  1760. opts.AllocateIdle = (idleFlag == "true")
  1761. }
  1762. opts.Rate = r.URL.Query().Get("rate")
  1763. opts.ShareSplit = r.URL.Query().Get("sharedSplit")
  1764. // timeSeries == true maintains the time series dimension of the data,
  1765. // which by default gets summed over the entire interval
  1766. opts.IncludeTimeSeries = r.URL.Query().Get("timeSeries") == "true"
  1767. // efficiency has been deprecated in favor of a default to always send efficiency
  1768. opts.IncludeEfficiency = true
  1769. // TODO niko/caching rename "recomputeCache"
  1770. // disableCache, if set to "true", tells this function to recompute and
  1771. // cache the requested data
  1772. opts.DisableAggregateCostModelCache = r.URL.Query().Get("disableCache") == "true"
  1773. // clearCache, if set to "true", tells this function to flush the cache,
  1774. // then recompute and cache the requested data
  1775. opts.ClearCache = r.URL.Query().Get("clearCache") == "true"
  1776. // noCache avoids the cache altogether, both reading from and writing to
  1777. opts.NoCache = r.URL.Query().Get("noCache") == "true"
  1778. // noExpireCache should only be used by cache warming to set non-expiring caches
  1779. opts.NoExpireCache = false
  1780. // etl triggers ETL adapter
  1781. opts.UseETLAdapter = r.URL.Query().Get("etl") == "true"
  1782. // aggregation field is required
  1783. if field == "" {
  1784. WriteError(w, BadRequest("Missing aggregation field parameter"))
  1785. return
  1786. }
  1787. // aggregation subfield is required when aggregation field is "label"
  1788. if (field == "label" || field == "annotation") && len(subfields) == 0 {
  1789. WriteError(w, BadRequest("Missing aggregation subfield parameter"))
  1790. return
  1791. }
  1792. // enforce one of the available rate options
  1793. if opts.Rate != "" && opts.Rate != "hourly" && opts.Rate != "daily" && opts.Rate != "monthly" {
  1794. WriteError(w, BadRequest("Rate parameter only supports: hourly, daily, monthly or empty"))
  1795. return
  1796. }
  1797. // parse cost data filters
  1798. // namespace and cluster are exact-string-matches
  1799. // labels are expected to be comma-separated and to take the form key=value
  1800. // e.g. app=cost-analyzer,app.kubernetes.io/instance=kubecost
  1801. opts.Filters = map[string]string{
  1802. "namespace": namespace,
  1803. "cluster": cluster,
  1804. "labels": labels,
  1805. "annotations": annotations,
  1806. "podprefix": podprefix,
  1807. }
  1808. // parse shared resources
  1809. sn := []string{}
  1810. sln := []string{}
  1811. slv := []string{}
  1812. if sharedNamespaces != "" {
  1813. sn = strings.Split(sharedNamespaces, ",")
  1814. }
  1815. if sharedLabelNames != "" {
  1816. sln = strings.Split(sharedLabelNames, ",")
  1817. slv = strings.Split(sharedLabelValues, ",")
  1818. if len(sln) != len(slv) || slv[0] == "" {
  1819. WriteError(w, BadRequest("Supply exactly one shared label value per shared label name"))
  1820. return
  1821. }
  1822. }
  1823. if len(sn) > 0 || len(sln) > 0 {
  1824. opts.SharedResources = NewSharedResourceInfo(true, sn, sln, slv)
  1825. }
  1826. // enable remote if it is available and not disabled
  1827. opts.RemoteEnabled = remote && env.IsRemoteEnabled()
  1828. promClient := a.GetPrometheusClient(remote)
  1829. var data map[string]*Aggregation
  1830. var message string
  1831. data, message, err = a.AggAPI.ComputeAggregateCostModel(promClient, window, field, subfields, opts)
  1832. // Find any warnings in http request context
  1833. warning, _ := httputil.GetWarning(r)
  1834. if err != nil {
  1835. if emptyErr, ok := err.(*EmptyDataError); ok {
  1836. if warning == "" {
  1837. w.Write(WrapData(map[string]interface{}{}, emptyErr))
  1838. } else {
  1839. w.Write(WrapDataWithWarning(map[string]interface{}{}, emptyErr, warning))
  1840. }
  1841. return
  1842. }
  1843. if boundaryErr, ok := err.(*opencost.BoundaryError); ok {
  1844. if window.Start() != nil && window.Start().After(time.Now().Add(-90*24*time.Hour)) {
  1845. // Asking for data within a 90 day period: it will be available
  1846. // after the pipeline builds
  1847. msg := "Data will be available after ETL is built"
  1848. match := percentRegex.FindStringSubmatch(boundaryErr.Message)
  1849. if len(match) > 1 {
  1850. completionPct, err := strconv.ParseFloat(match[1], 64)
  1851. if err == nil {
  1852. msg = fmt.Sprintf("%s (%.1f%% complete)", msg, completionPct)
  1853. }
  1854. }
  1855. WriteError(w, InternalServerError(msg))
  1856. } else {
  1857. // Boundary error outside of 90 day period; may not be available
  1858. WriteError(w, InternalServerError(boundaryErr.Error()))
  1859. }
  1860. return
  1861. }
  1862. errStr := fmt.Sprintf("error computing aggregate cost model: %s", err)
  1863. WriteError(w, InternalServerError(errStr))
  1864. return
  1865. }
  1866. if warning == "" {
  1867. w.Write(WrapDataWithMessage(data, nil, message))
  1868. } else {
  1869. w.Write(WrapDataWithMessageAndWarning(data, nil, message, warning))
  1870. }
  1871. }
  1872. // ParseAggregationProperties attempts to parse and return aggregation properties
  1873. // encoded under the given key. If none exist, or if parsing fails, an error
  1874. // is returned with empty AllocationProperties.
  1875. func ParseAggregationProperties(aggregations []string) ([]string, error) {
  1876. aggregateBy := []string{}
  1877. // In case of no aggregation option, aggregate to the container, with a key Cluster/Node/Namespace/Pod/Container
  1878. if len(aggregations) == 0 {
  1879. aggregateBy = []string{
  1880. opencost.AllocationClusterProp,
  1881. opencost.AllocationNodeProp,
  1882. opencost.AllocationNamespaceProp,
  1883. opencost.AllocationPodProp,
  1884. opencost.AllocationContainerProp,
  1885. }
  1886. } else if len(aggregations) == 1 && aggregations[0] == "all" {
  1887. aggregateBy = []string{}
  1888. } else {
  1889. for _, agg := range aggregations {
  1890. aggregate := strings.TrimSpace(agg)
  1891. if aggregate != "" {
  1892. if prop, err := opencost.ParseProperty(aggregate); err == nil {
  1893. aggregateBy = append(aggregateBy, string(prop))
  1894. } else if strings.HasPrefix(aggregate, "label:") {
  1895. aggregateBy = append(aggregateBy, aggregate)
  1896. } else if strings.HasPrefix(aggregate, "annotation:") {
  1897. aggregateBy = append(aggregateBy, aggregate)
  1898. }
  1899. }
  1900. }
  1901. }
  1902. return aggregateBy, nil
  1903. }
  1904. func (a *Accesses) ComputeAllocationHandlerSummary(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  1905. w.Header().Set("Content-Type", "application/json")
  1906. qp := httputil.NewQueryParams(r.URL.Query())
  1907. // Window is a required field describing the window of time over which to
  1908. // compute allocation data.
  1909. window, err := opencost.ParseWindowWithOffset(qp.Get("window", ""), env.GetParsedUTCOffset())
  1910. if err != nil {
  1911. http.Error(w, fmt.Sprintf("Invalid 'window' parameter: %s", err), http.StatusBadRequest)
  1912. }
  1913. // Step is an optional parameter that defines the duration per-set, i.e.
  1914. // the window for an AllocationSet, of the AllocationSetRange to be
  1915. // computed. Defaults to the window size, making one set.
  1916. step := qp.GetDuration("step", window.Duration())
  1917. // Resolution is an optional parameter, defaulting to the configured ETL
  1918. // resolution.
  1919. resolution := qp.GetDuration("resolution", env.GetETLResolution())
  1920. // Aggregation is a required comma-separated list of fields by which to
  1921. // aggregate results. Some fields allow a sub-field, which is distinguished
  1922. // with a colon; e.g. "label:app".
  1923. // Examples: "namespace", "namespace,label:app"
  1924. aggregations := qp.GetList("aggregate", ",")
  1925. aggregateBy, err := ParseAggregationProperties(aggregations)
  1926. if err != nil {
  1927. http.Error(w, fmt.Sprintf("Invalid 'aggregate' parameter: %s", err), http.StatusBadRequest)
  1928. }
  1929. // Accumulate is an optional parameter, defaulting to false, which if true
  1930. // sums each Set in the Range, producing one Set.
  1931. accumulate := qp.GetBool("accumulate", false)
  1932. // Get allocation filter if provided
  1933. allocationFilter := qp.Get("filter", "")
  1934. // Query for AllocationSets in increments of the given step duration,
  1935. // appending each to the AllocationSetRange.
  1936. asr := opencost.NewAllocationSetRange()
  1937. stepStart := *window.Start()
  1938. for window.End().After(stepStart) {
  1939. stepEnd := stepStart.Add(step)
  1940. stepWindow := opencost.NewWindow(&stepStart, &stepEnd)
  1941. as, err := a.Model.ComputeAllocation(*stepWindow.Start(), *stepWindow.End(), resolution)
  1942. if err != nil {
  1943. WriteError(w, InternalServerError(err.Error()))
  1944. return
  1945. }
  1946. asr.Append(as)
  1947. stepStart = stepEnd
  1948. }
  1949. // Apply allocation filter if provided
  1950. if allocationFilter != "" {
  1951. parser := allocation.NewAllocationFilterParser()
  1952. filterNode, err := parser.Parse(allocationFilter)
  1953. if err != nil {
  1954. WriteError(w, BadRequest(fmt.Sprintf("Invalid filter: %s", err)))
  1955. return
  1956. }
  1957. compiler := opencost.NewAllocationMatchCompiler(nil)
  1958. matcher, err := compiler.Compile(filterNode)
  1959. if err != nil {
  1960. WriteError(w, BadRequest(fmt.Sprintf("Failed to compile filter: %s", err)))
  1961. return
  1962. }
  1963. filteredASR := opencost.NewAllocationSetRange()
  1964. for _, as := range asr.Slice() {
  1965. filteredAS := opencost.NewAllocationSet(as.Start(), as.End())
  1966. for _, alloc := range as.Allocations {
  1967. if matcher.Matches(alloc) {
  1968. filteredAS.Set(alloc)
  1969. }
  1970. }
  1971. if filteredAS.Length() > 0 {
  1972. filteredASR.Append(filteredAS)
  1973. }
  1974. }
  1975. asr = filteredASR
  1976. }
  1977. // Aggregate, if requested
  1978. if len(aggregateBy) > 0 {
  1979. err = asr.AggregateBy(aggregateBy, nil)
  1980. if err != nil {
  1981. WriteError(w, InternalServerError(err.Error()))
  1982. return
  1983. }
  1984. }
  1985. // Accumulate, if requested
  1986. if accumulate {
  1987. asr, err = asr.Accumulate(opencost.AccumulateOptionAll)
  1988. if err != nil {
  1989. WriteError(w, InternalServerError(err.Error()))
  1990. return
  1991. }
  1992. }
  1993. sasl := []*opencost.SummaryAllocationSet{}
  1994. for _, as := range asr.Slice() {
  1995. sas := opencost.NewSummaryAllocationSet(as, nil, nil, false, false)
  1996. sasl = append(sasl, sas)
  1997. }
  1998. sasr := opencost.NewSummaryAllocationSetRange(sasl...)
  1999. w.Write(WrapData(sasr, nil))
  2000. }
  2001. // ComputeAllocationHandler computes an AllocationSetRange from the CostModel.
  2002. func (a *Accesses) ComputeAllocationHandler(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  2003. w.Header().Set("Content-Type", "application/json")
  2004. qp := httputil.NewQueryParams(r.URL.Query())
  2005. // Window is a required field describing the window of time over which to
  2006. // compute allocation data.
  2007. window, err := opencost.ParseWindowWithOffset(qp.Get("window", ""), env.GetParsedUTCOffset())
  2008. if err != nil {
  2009. http.Error(w, fmt.Sprintf("Invalid 'window' parameter: %s", err), http.StatusBadRequest)
  2010. }
  2011. // Resolution is an optional parameter, defaulting to the configured ETL
  2012. // resolution.
  2013. resolution := qp.GetDuration("resolution", env.GetETLResolution())
  2014. // Step is an optional parameter that defines the duration per-set, i.e.
  2015. // the window for an AllocationSet, of the AllocationSetRange to be
  2016. // computed. Defaults to the window size, making one set.
  2017. step := qp.GetDuration("step", window.Duration())
  2018. // Aggregation is an optional comma-separated list of fields by which to
  2019. // aggregate results. Some fields allow a sub-field, which is distinguished
  2020. // with a colon; e.g. "label:app".
  2021. // Examples: "namespace", "namespace,label:app"
  2022. aggregations := qp.GetList("aggregate", ",")
  2023. aggregateBy, err := ParseAggregationProperties(aggregations)
  2024. if err != nil {
  2025. http.Error(w, fmt.Sprintf("Invalid 'aggregate' parameter: %s", err), http.StatusBadRequest)
  2026. }
  2027. // IncludeIdle, if true, uses Asset data to incorporate Idle Allocation
  2028. includeIdle := qp.GetBool("includeIdle", false)
  2029. // Accumulate is an optional parameter, defaulting to false, which if true
  2030. // sums each Set in the Range, producing one Set.
  2031. accumulate := qp.GetBool("accumulate", false)
  2032. // Accumulate is an optional parameter that accumulates an AllocationSetRange
  2033. // by the resolution of the given time duration.
  2034. // Defaults to 0. If a value is not passed then the parameter is not used.
  2035. accumulateBy := opencost.AccumulateOption(qp.Get("accumulateBy", ""))
  2036. // if accumulateBy is not explicitly set, and accumulate is true, ensure result is accumulated
  2037. if accumulateBy == opencost.AccumulateOptionNone && accumulate {
  2038. accumulateBy = opencost.AccumulateOptionAll
  2039. }
  2040. // IdleByNode, if true, computes idle allocations at the node level.
  2041. // Otherwise it is computed at the cluster level. (Not relevant if idle
  2042. // is not included.)
  2043. idleByNode := qp.GetBool("idleByNode", false)
  2044. sharedLoadBalancer := qp.GetBool("sharelb", false)
  2045. // IncludeProportionalAssetResourceCosts, if true,
  2046. includeProportionalAssetResourceCosts := qp.GetBool("includeProportionalAssetResourceCosts", false)
  2047. // include aggregated labels/annotations if true
  2048. includeAggregatedMetadata := qp.GetBool("includeAggregatedMetadata", false)
  2049. shareIdle := qp.GetBool("shareIdle", false)
  2050. // Get allocation filter if provided
  2051. allocationFilter := qp.Get("filter", "")
  2052. asr, err := a.Model.QueryAllocation(window, resolution, step, aggregateBy, includeIdle, idleByNode, includeProportionalAssetResourceCosts, includeAggregatedMetadata, sharedLoadBalancer, accumulateBy, shareIdle)
  2053. if err != nil {
  2054. if strings.Contains(strings.ToLower(err.Error()), "bad request") {
  2055. WriteError(w, BadRequest(err.Error()))
  2056. } else {
  2057. WriteError(w, InternalServerError(err.Error()))
  2058. }
  2059. return
  2060. }
  2061. // Apply allocation filter if provided
  2062. if allocationFilter != "" {
  2063. parser := allocation.NewAllocationFilterParser()
  2064. filterNode, err := parser.Parse(allocationFilter)
  2065. if err != nil {
  2066. WriteError(w, BadRequest(fmt.Sprintf("Invalid filter: %s", err)))
  2067. return
  2068. }
  2069. compiler := opencost.NewAllocationMatchCompiler(nil)
  2070. matcher, err := compiler.Compile(filterNode)
  2071. if err != nil {
  2072. WriteError(w, BadRequest(fmt.Sprintf("Failed to compile filter: %s", err)))
  2073. return
  2074. }
  2075. filteredASR := opencost.NewAllocationSetRange()
  2076. for _, as := range asr.Slice() {
  2077. filteredAS := opencost.NewAllocationSet(as.Start(), as.End())
  2078. for _, alloc := range as.Allocations {
  2079. if matcher.Matches(alloc) {
  2080. filteredAS.Set(alloc)
  2081. }
  2082. }
  2083. if filteredAS.Length() > 0 {
  2084. filteredASR.Append(filteredAS)
  2085. }
  2086. }
  2087. asr = filteredASR
  2088. }
  2089. w.Write(WrapData(asr, nil))
  2090. }
  2091. // The below was transferred from a different package in order to maintain
  2092. // previous behavior. Ultimately, we should clean this up at some point.
  2093. // TODO move to util and/or standardize everything
  2094. type Error struct {
  2095. StatusCode int
  2096. Body string
  2097. }
  2098. func WriteError(w http.ResponseWriter, err Error) {
  2099. status := err.StatusCode
  2100. if status == 0 {
  2101. status = http.StatusInternalServerError
  2102. }
  2103. w.WriteHeader(status)
  2104. resp, _ := json.Marshal(&Response{
  2105. Code: status,
  2106. Message: fmt.Sprintf("Error: %s", err.Body),
  2107. })
  2108. w.Write(resp)
  2109. }
  2110. func BadRequest(message string) Error {
  2111. return Error{
  2112. StatusCode: http.StatusBadRequest,
  2113. Body: message,
  2114. }
  2115. }
  2116. func InternalServerError(message string) Error {
  2117. if message == "" {
  2118. message = "Internal Server Error"
  2119. }
  2120. return Error{
  2121. StatusCode: http.StatusInternalServerError,
  2122. Body: message,
  2123. }
  2124. }
  2125. func NotFound() Error {
  2126. return Error{
  2127. StatusCode: http.StatusNotFound,
  2128. Body: "Not Found",
  2129. }
  2130. }