aggregation.go 82 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204
  1. package costmodel
  2. import (
  3. "fmt"
  4. "math"
  5. "net/http"
  6. "regexp"
  7. "sort"
  8. "strconv"
  9. "strings"
  10. "time"
  11. "github.com/julienschmidt/httprouter"
  12. "github.com/opencost/opencost/pkg/cloud/provider"
  13. "github.com/patrickmn/go-cache"
  14. prometheusClient "github.com/prometheus/client_golang/api"
  15. "github.com/opencost/opencost/core/pkg/log"
  16. "github.com/opencost/opencost/core/pkg/opencost"
  17. "github.com/opencost/opencost/core/pkg/util"
  18. "github.com/opencost/opencost/core/pkg/util/httputil"
  19. "github.com/opencost/opencost/core/pkg/util/json"
  20. "github.com/opencost/opencost/core/pkg/util/promutil"
  21. "github.com/opencost/opencost/core/pkg/util/timeutil"
  22. "github.com/opencost/opencost/pkg/cloud/models"
  23. "github.com/opencost/opencost/pkg/env"
  24. "github.com/opencost/opencost/pkg/prom"
  25. "github.com/opencost/opencost/pkg/thanos"
  26. )
  27. const (
  28. // SplitTypeWeighted signals that shared costs should be shared
  29. // proportionally, rather than evenly
  30. SplitTypeWeighted = "weighted"
  31. // UnallocatedSubfield indicates an allocation datum that does not have the
  32. // chosen Aggregator; e.g. during aggregation by some label, there may be
  33. // cost data that do not have the given label.
  34. UnallocatedSubfield = "__unallocated__"
  35. clusterCostsCacheMinutes = 5.0
  36. )
  37. // Aggregation describes aggregated cost data, containing cumulative cost and
  38. // allocation data per resource, vectors of rate data per resource, efficiency
  39. // data, and metadata describing the type of aggregation operation.
  40. type Aggregation struct {
  41. Aggregator string `json:"aggregation"`
  42. Subfields []string `json:"subfields,omitempty"`
  43. Environment string `json:"environment"`
  44. Cluster string `json:"cluster,omitempty"`
  45. Properties *opencost.AllocationProperties `json:"-"`
  46. Start time.Time `json:"-"`
  47. End time.Time `json:"-"`
  48. CPUAllocationHourlyAverage float64 `json:"cpuAllocationAverage"`
  49. CPUAllocationVectors []*util.Vector `json:"-"`
  50. CPUAllocationTotal float64 `json:"-"`
  51. CPUCost float64 `json:"cpuCost"`
  52. CPUCostVector []*util.Vector `json:"cpuCostVector,omitempty"`
  53. CPUEfficiency float64 `json:"cpuEfficiency"`
  54. CPURequestedVectors []*util.Vector `json:"-"`
  55. CPUUsedVectors []*util.Vector `json:"-"`
  56. Efficiency float64 `json:"efficiency"`
  57. GPUAllocationHourlyAverage float64 `json:"gpuAllocationAverage"`
  58. GPUAllocationVectors []*util.Vector `json:"-"`
  59. GPUCost float64 `json:"gpuCost"`
  60. GPUCostVector []*util.Vector `json:"gpuCostVector,omitempty"`
  61. GPUAllocationTotal float64 `json:"-"`
  62. RAMAllocationHourlyAverage float64 `json:"ramAllocationAverage"`
  63. RAMAllocationVectors []*util.Vector `json:"-"`
  64. RAMAllocationTotal float64 `json:"-"`
  65. RAMCost float64 `json:"ramCost"`
  66. RAMCostVector []*util.Vector `json:"ramCostVector,omitempty"`
  67. RAMEfficiency float64 `json:"ramEfficiency"`
  68. RAMRequestedVectors []*util.Vector `json:"-"`
  69. RAMUsedVectors []*util.Vector `json:"-"`
  70. PVAllocationHourlyAverage float64 `json:"pvAllocationAverage"`
  71. PVAllocationVectors []*util.Vector `json:"-"`
  72. PVAllocationTotal float64 `json:"-"`
  73. PVCost float64 `json:"pvCost"`
  74. PVCostVector []*util.Vector `json:"pvCostVector,omitempty"`
  75. NetworkCost float64 `json:"networkCost"`
  76. NetworkCostVector []*util.Vector `json:"networkCostVector,omitempty"`
  77. SharedCost float64 `json:"sharedCost"`
  78. TotalCost float64 `json:"totalCost"`
  79. TotalCostVector []*util.Vector `json:"totalCostVector,omitempty"`
  80. }
  81. // TotalHours determines the amount of hours the Aggregation covers, as a
  82. // function of the cost vectors and the resolution of those vectors' data
  83. func (a *Aggregation) TotalHours(resolutionHours float64) float64 {
  84. length := 1
  85. if length < len(a.CPUCostVector) {
  86. length = len(a.CPUCostVector)
  87. }
  88. if length < len(a.RAMCostVector) {
  89. length = len(a.RAMCostVector)
  90. }
  91. if length < len(a.PVCostVector) {
  92. length = len(a.PVCostVector)
  93. }
  94. if length < len(a.GPUCostVector) {
  95. length = len(a.GPUCostVector)
  96. }
  97. if length < len(a.NetworkCostVector) {
  98. length = len(a.NetworkCostVector)
  99. }
  100. return float64(length) * resolutionHours
  101. }
  102. // RateCoefficient computes the coefficient by which the total cost needs to be
  103. // multiplied in order to convert totals costs into per-rate costs.
  104. func (a *Aggregation) RateCoefficient(rateStr string, resolutionHours float64) float64 {
  105. // monthly rate = (730.0)*(total cost)/(total hours)
  106. // daily rate = (24.0)*(total cost)/(total hours)
  107. // hourly rate = (1.0)*(total cost)/(total hours)
  108. // default to hourly rate
  109. coeff := 1.0
  110. switch rateStr {
  111. case "daily":
  112. coeff = timeutil.HoursPerDay
  113. case "monthly":
  114. coeff = timeutil.HoursPerMonth
  115. }
  116. return coeff / a.TotalHours(resolutionHours)
  117. }
  118. type SharedResourceInfo struct {
  119. ShareResources bool
  120. SharedNamespace map[string]bool
  121. LabelSelectors map[string]map[string]bool
  122. }
  123. type SharedCostInfo struct {
  124. Name string
  125. Cost float64
  126. ShareType string
  127. }
  128. func (s *SharedResourceInfo) IsSharedResource(costDatum *CostData) bool {
  129. // exists in a shared namespace
  130. if _, ok := s.SharedNamespace[costDatum.Namespace]; ok {
  131. return true
  132. }
  133. // has at least one shared label (OR, not AND in the case of multiple labels)
  134. for labelName, labelValues := range s.LabelSelectors {
  135. if val, ok := costDatum.Labels[labelName]; ok && labelValues[val] {
  136. return true
  137. }
  138. }
  139. return false
  140. }
  141. func NewSharedResourceInfo(shareResources bool, sharedNamespaces []string, labelNames []string, labelValues []string) *SharedResourceInfo {
  142. sr := &SharedResourceInfo{
  143. ShareResources: shareResources,
  144. SharedNamespace: make(map[string]bool),
  145. LabelSelectors: make(map[string]map[string]bool),
  146. }
  147. for _, ns := range sharedNamespaces {
  148. sr.SharedNamespace[strings.Trim(ns, " ")] = true
  149. }
  150. // Creating a map of label name to label value, but only if
  151. // the cardinality matches
  152. if len(labelNames) == len(labelValues) {
  153. for i := range labelNames {
  154. cleanedLname := promutil.SanitizeLabelName(strings.Trim(labelNames[i], " "))
  155. if values, ok := sr.LabelSelectors[cleanedLname]; ok {
  156. values[strings.Trim(labelValues[i], " ")] = true
  157. } else {
  158. sr.LabelSelectors[cleanedLname] = map[string]bool{strings.Trim(labelValues[i], " "): true}
  159. }
  160. }
  161. }
  162. return sr
  163. }
  164. func GetTotalContainerCost(costData map[string]*CostData, rate string, cp models.Provider, discount float64, customDiscount float64, idleCoefficients map[string]float64) float64 {
  165. totalContainerCost := 0.0
  166. for _, costDatum := range costData {
  167. clusterID := costDatum.ClusterID
  168. cpuv, ramv, gpuv, pvvs, netv := getPriceVectors(cp, costDatum, rate, discount, customDiscount, idleCoefficients[clusterID])
  169. totalContainerCost += totalVectors(cpuv)
  170. totalContainerCost += totalVectors(ramv)
  171. totalContainerCost += totalVectors(gpuv)
  172. for _, pv := range pvvs {
  173. totalContainerCost += totalVectors(pv)
  174. }
  175. totalContainerCost += totalVectors(netv)
  176. }
  177. return totalContainerCost
  178. }
  179. func (a *Accesses) ComputeIdleCoefficient(costData map[string]*CostData, cli prometheusClient.Client, cp models.Provider, discount float64, customDiscount float64, window, offset time.Duration) (map[string]float64, error) {
  180. coefficients := make(map[string]float64)
  181. profileName := "ComputeIdleCoefficient: ComputeClusterCosts"
  182. profileStart := time.Now()
  183. var clusterCosts map[string]*ClusterCosts
  184. var err error
  185. fmtWindow, fmtOffset := timeutil.DurationOffsetStrings(window, offset)
  186. key := fmt.Sprintf("%s:%s", fmtWindow, fmtOffset)
  187. if data, valid := a.ClusterCostsCache.Get(key); valid {
  188. clusterCosts = data.(map[string]*ClusterCosts)
  189. } else {
  190. clusterCosts, err = a.ComputeClusterCosts(cli, cp, window, offset, false)
  191. if err != nil {
  192. return nil, err
  193. }
  194. }
  195. measureTime(profileStart, profileThreshold, profileName)
  196. for cid, costs := range clusterCosts {
  197. if costs.CPUCumulative == 0 && costs.RAMCumulative == 0 && costs.StorageCumulative == 0 {
  198. log.Warnf("No ClusterCosts data for cluster '%s'. Is it emitting data?", cid)
  199. coefficients[cid] = 1.0
  200. continue
  201. }
  202. if costs.TotalCumulative == 0 {
  203. return nil, fmt.Errorf("TotalCumulative cluster cost for cluster '%s' returned 0 over window '%s' offset '%s'", cid, fmtWindow, fmtOffset)
  204. }
  205. totalContainerCost := 0.0
  206. for _, costDatum := range costData {
  207. if costDatum.ClusterID == cid {
  208. cpuv, ramv, gpuv, pvvs, _ := getPriceVectors(cp, costDatum, "", discount, customDiscount, 1)
  209. totalContainerCost += totalVectors(cpuv)
  210. totalContainerCost += totalVectors(ramv)
  211. totalContainerCost += totalVectors(gpuv)
  212. for _, pv := range pvvs {
  213. totalContainerCost += totalVectors(pv)
  214. }
  215. }
  216. }
  217. coeff := totalContainerCost / costs.TotalCumulative
  218. coefficients[cid] = coeff
  219. }
  220. return coefficients, nil
  221. }
  222. // AggregationOptions provides optional parameters to AggregateCostData, allowing callers to perform more complex operations
  223. type AggregationOptions struct {
  224. Discount float64 // percent by which to discount CPU, RAM, and GPU cost
  225. CustomDiscount float64 // additional custom discount applied to all prices
  226. IdleCoefficients map[string]float64 // scales costs by amount of idle resources on a per-cluster basis
  227. IncludeEfficiency bool // set to true to receive efficiency/usage data
  228. IncludeTimeSeries bool // set to true to receive time series data
  229. Rate string // set to "hourly", "daily", or "monthly" to receive cost rate, rather than cumulative cost
  230. ResolutionHours float64
  231. SharedResourceInfo *SharedResourceInfo
  232. SharedCosts map[string]*SharedCostInfo
  233. FilteredContainerCount int
  234. FilteredEnvironments map[string]int
  235. SharedSplit string
  236. TotalContainerCost float64
  237. }
  238. // Helper method to test request/usgae values against allocation averages for efficiency scores. Generate a warning log if
  239. // clamp is required
  240. func clampAverage(requestsAvg float64, usedAverage float64, allocationAvg float64, resource string) (float64, float64) {
  241. rAvg := requestsAvg
  242. if rAvg > allocationAvg {
  243. log.Debugf("Average %s Requested (%f) > Average %s Allocated (%f). Clamping.", resource, rAvg, resource, allocationAvg)
  244. rAvg = allocationAvg
  245. }
  246. uAvg := usedAverage
  247. if uAvg > allocationAvg {
  248. log.Debugf(" Average %s Used (%f) > Average %s Allocated (%f). Clamping.", resource, uAvg, resource, allocationAvg)
  249. uAvg = allocationAvg
  250. }
  251. return rAvg, uAvg
  252. }
  253. // AggregateCostData aggregates raw cost data by field; e.g. namespace, cluster, service, or label. In the case of label, callers
  254. // must pass a slice of subfields indicating the labels by which to group. Provider is used to define custom resource pricing.
  255. // See AggregationOptions for optional parameters.
  256. func AggregateCostData(costData map[string]*CostData, field string, subfields []string, cp models.Provider, opts *AggregationOptions) map[string]*Aggregation {
  257. discount := opts.Discount
  258. customDiscount := opts.CustomDiscount
  259. idleCoefficients := opts.IdleCoefficients
  260. includeTimeSeries := opts.IncludeTimeSeries
  261. includeEfficiency := opts.IncludeEfficiency
  262. rate := opts.Rate
  263. sr := opts.SharedResourceInfo
  264. resolutionHours := 1.0
  265. if opts.ResolutionHours > 0.0 {
  266. resolutionHours = opts.ResolutionHours
  267. }
  268. if idleCoefficients == nil {
  269. idleCoefficients = make(map[string]float64)
  270. }
  271. // aggregations collects key-value pairs of resource group-to-aggregated data
  272. // e.g. namespace-to-data or label-value-to-data
  273. aggregations := make(map[string]*Aggregation)
  274. // sharedResourceCost is the running total cost of resources that should be reported
  275. // as shared across all other resources, rather than reported as a stand-alone category
  276. sharedResourceCost := 0.0
  277. for _, costDatum := range costData {
  278. idleCoefficient, ok := idleCoefficients[costDatum.ClusterID]
  279. if !ok {
  280. idleCoefficient = 1.0
  281. }
  282. if sr != nil && sr.ShareResources && sr.IsSharedResource(costDatum) {
  283. cpuv, ramv, gpuv, pvvs, netv := getPriceVectors(cp, costDatum, rate, discount, customDiscount, idleCoefficient)
  284. sharedResourceCost += totalVectors(cpuv)
  285. sharedResourceCost += totalVectors(ramv)
  286. sharedResourceCost += totalVectors(gpuv)
  287. sharedResourceCost += totalVectors(netv)
  288. for _, pv := range pvvs {
  289. sharedResourceCost += totalVectors(pv)
  290. }
  291. } else {
  292. if field == "cluster" {
  293. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.ClusterID, discount, customDiscount, idleCoefficient, false)
  294. } else if field == "node" {
  295. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.NodeName, discount, customDiscount, idleCoefficient, false)
  296. } else if field == "namespace" {
  297. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace, discount, customDiscount, idleCoefficient, false)
  298. } else if field == "service" {
  299. if len(costDatum.Services) > 0 {
  300. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace+"/"+costDatum.Services[0], discount, customDiscount, idleCoefficient, false)
  301. } else {
  302. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, UnallocatedSubfield, discount, customDiscount, idleCoefficient, false)
  303. }
  304. } else if field == "deployment" {
  305. if len(costDatum.Deployments) > 0 {
  306. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace+"/"+costDatum.Deployments[0], discount, customDiscount, idleCoefficient, false)
  307. } else {
  308. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, UnallocatedSubfield, discount, customDiscount, idleCoefficient, false)
  309. }
  310. } else if field == "statefulset" {
  311. if len(costDatum.Statefulsets) > 0 {
  312. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace+"/"+costDatum.Statefulsets[0], discount, customDiscount, idleCoefficient, false)
  313. } else {
  314. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, UnallocatedSubfield, discount, customDiscount, idleCoefficient, false)
  315. }
  316. } else if field == "daemonset" {
  317. if len(costDatum.Daemonsets) > 0 {
  318. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace+"/"+costDatum.Daemonsets[0], discount, customDiscount, idleCoefficient, false)
  319. } else {
  320. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, UnallocatedSubfield, discount, customDiscount, idleCoefficient, false)
  321. }
  322. } else if field == "controller" {
  323. if controller, kind, hasController := costDatum.GetController(); hasController {
  324. key := fmt.Sprintf("%s/%s:%s", costDatum.Namespace, kind, controller)
  325. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, key, discount, customDiscount, idleCoefficient, false)
  326. } else {
  327. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, UnallocatedSubfield, discount, customDiscount, idleCoefficient, false)
  328. }
  329. } else if field == "label" {
  330. found := false
  331. if costDatum.Labels != nil {
  332. for _, sf := range subfields {
  333. if subfieldName, ok := costDatum.Labels[sf]; ok {
  334. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, subfieldName, discount, customDiscount, idleCoefficient, false)
  335. found = true
  336. break
  337. }
  338. }
  339. }
  340. if !found {
  341. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, UnallocatedSubfield, discount, customDiscount, idleCoefficient, false)
  342. }
  343. } else if field == "annotation" {
  344. found := false
  345. if costDatum.Annotations != nil {
  346. for _, sf := range subfields {
  347. if subfieldName, ok := costDatum.Annotations[sf]; ok {
  348. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, subfieldName, discount, customDiscount, idleCoefficient, false)
  349. found = true
  350. break
  351. }
  352. }
  353. }
  354. if !found {
  355. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, UnallocatedSubfield, discount, customDiscount, idleCoefficient, false)
  356. }
  357. } else if field == "pod" {
  358. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace+"/"+costDatum.PodName, discount, customDiscount, idleCoefficient, false)
  359. } else if field == "container" {
  360. key := fmt.Sprintf("%s/%s/%s/%s", costDatum.ClusterID, costDatum.Namespace, costDatum.PodName, costDatum.Name)
  361. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, key, discount, customDiscount, idleCoefficient, true)
  362. }
  363. }
  364. }
  365. for key, agg := range aggregations {
  366. sharedCoefficient := 1 / float64(len(opts.FilteredEnvironments)+len(aggregations))
  367. agg.CPUCost = totalVectors(agg.CPUCostVector)
  368. agg.RAMCost = totalVectors(agg.RAMCostVector)
  369. agg.GPUCost = totalVectors(agg.GPUCostVector)
  370. agg.PVCost = totalVectors(agg.PVCostVector)
  371. agg.NetworkCost = totalVectors(agg.NetworkCostVector)
  372. if opts.SharedSplit == SplitTypeWeighted {
  373. d := opts.TotalContainerCost - sharedResourceCost
  374. if d == 0 {
  375. log.Warnf("Total container cost '%f' and shared resource cost '%f are the same'. Setting sharedCoefficient to 1", opts.TotalContainerCost, sharedResourceCost)
  376. sharedCoefficient = 1.0
  377. } else {
  378. sharedCoefficient = (agg.CPUCost + agg.RAMCost + agg.GPUCost + agg.PVCost + agg.NetworkCost) / d
  379. }
  380. }
  381. agg.SharedCost = sharedResourceCost * sharedCoefficient
  382. for _, v := range opts.SharedCosts {
  383. agg.SharedCost += v.Cost * sharedCoefficient
  384. }
  385. if rate != "" {
  386. rateCoeff := agg.RateCoefficient(rate, resolutionHours)
  387. agg.CPUCost *= rateCoeff
  388. agg.RAMCost *= rateCoeff
  389. agg.GPUCost *= rateCoeff
  390. agg.PVCost *= rateCoeff
  391. agg.NetworkCost *= rateCoeff
  392. agg.SharedCost *= rateCoeff
  393. }
  394. agg.TotalCost = agg.CPUCost + agg.RAMCost + agg.GPUCost + agg.PVCost + agg.NetworkCost + agg.SharedCost
  395. // Evicted and Completed Pods can still show up here, but have 0 cost.
  396. // Filter these by default. Any reason to keep them?
  397. if agg.TotalCost == 0 {
  398. delete(aggregations, key)
  399. continue
  400. }
  401. // CPU, RAM, and PV allocation are cumulative per-datum, whereas GPU is rate per-datum
  402. agg.CPUAllocationHourlyAverage = totalVectors(agg.CPUAllocationVectors) / agg.TotalHours(resolutionHours)
  403. agg.RAMAllocationHourlyAverage = totalVectors(agg.RAMAllocationVectors) / agg.TotalHours(resolutionHours)
  404. agg.GPUAllocationHourlyAverage = averageVectors(agg.GPUAllocationVectors)
  405. agg.PVAllocationHourlyAverage = totalVectors(agg.PVAllocationVectors) / agg.TotalHours(resolutionHours)
  406. // TODO niko/etl does this check out for GPU data? Do we need to rewrite GPU queries to be
  407. // cumulative?
  408. agg.CPUAllocationTotal = totalVectors(agg.CPUAllocationVectors)
  409. agg.GPUAllocationTotal = totalVectors(agg.GPUAllocationVectors)
  410. agg.PVAllocationTotal = totalVectors(agg.PVAllocationVectors)
  411. agg.RAMAllocationTotal = totalVectors(agg.RAMAllocationVectors)
  412. if includeEfficiency {
  413. // Default both RAM and CPU to 0% efficiency so that a 0-requested, 0-allocated, 0-used situation
  414. // returns 0% efficiency, which should be a red-flag.
  415. //
  416. // If non-zero numbers are available, then efficiency is defined as:
  417. // idlePercentage = (requested - used) / allocated
  418. // efficiency = (1.0 - idlePercentage)
  419. //
  420. // It is possible to score > 100% efficiency, which is meant to be interpreted as a red flag.
  421. // It is not possible to score < 0% efficiency.
  422. agg.CPUEfficiency = 0.0
  423. CPUIdle := 0.0
  424. if agg.CPUAllocationHourlyAverage > 0.0 {
  425. avgCPURequested := averageVectors(agg.CPURequestedVectors)
  426. avgCPUUsed := averageVectors(agg.CPUUsedVectors)
  427. // Clamp averages, log range violations
  428. avgCPURequested, avgCPUUsed = clampAverage(avgCPURequested, avgCPUUsed, agg.CPUAllocationHourlyAverage, "CPU")
  429. CPUIdle = ((avgCPURequested - avgCPUUsed) / agg.CPUAllocationHourlyAverage)
  430. agg.CPUEfficiency = 1.0 - CPUIdle
  431. }
  432. agg.RAMEfficiency = 0.0
  433. RAMIdle := 0.0
  434. if agg.RAMAllocationHourlyAverage > 0.0 {
  435. avgRAMRequested := averageVectors(agg.RAMRequestedVectors)
  436. avgRAMUsed := averageVectors(agg.RAMUsedVectors)
  437. // Clamp averages, log range violations
  438. avgRAMRequested, avgRAMUsed = clampAverage(avgRAMRequested, avgRAMUsed, agg.RAMAllocationHourlyAverage, "RAM")
  439. RAMIdle = ((avgRAMRequested - avgRAMUsed) / agg.RAMAllocationHourlyAverage)
  440. agg.RAMEfficiency = 1.0 - RAMIdle
  441. }
  442. // Score total efficiency by the sum of CPU and RAM efficiency, weighted by their
  443. // respective total costs.
  444. agg.Efficiency = 0.0
  445. if (agg.CPUCost + agg.RAMCost) > 0 {
  446. agg.Efficiency = ((agg.CPUCost * agg.CPUEfficiency) + (agg.RAMCost * agg.RAMEfficiency)) / (agg.CPUCost + agg.RAMCost)
  447. }
  448. }
  449. // convert RAM from bytes to GiB
  450. agg.RAMAllocationHourlyAverage = agg.RAMAllocationHourlyAverage / 1024 / 1024 / 1024
  451. // convert storage from bytes to GiB
  452. agg.PVAllocationHourlyAverage = agg.PVAllocationHourlyAverage / 1024 / 1024 / 1024
  453. // remove time series data if it is not explicitly requested
  454. if !includeTimeSeries {
  455. agg.CPUCostVector = nil
  456. agg.RAMCostVector = nil
  457. agg.GPUCostVector = nil
  458. agg.PVCostVector = nil
  459. agg.NetworkCostVector = nil
  460. agg.TotalCostVector = nil
  461. } else { // otherwise compute a totalcostvector
  462. v1 := addVectors(agg.CPUCostVector, agg.RAMCostVector)
  463. v2 := addVectors(v1, agg.GPUCostVector)
  464. v3 := addVectors(v2, agg.PVCostVector)
  465. v4 := addVectors(v3, agg.NetworkCostVector)
  466. agg.TotalCostVector = v4
  467. }
  468. // Typesafety checks
  469. if math.IsNaN(agg.CPUAllocationHourlyAverage) || math.IsInf(agg.CPUAllocationHourlyAverage, 0) {
  470. log.Warnf("CPUAllocationHourlyAverage is %f for '%s: %s/%s'", agg.CPUAllocationHourlyAverage, agg.Cluster, agg.Aggregator, agg.Environment)
  471. agg.CPUAllocationHourlyAverage = 0
  472. }
  473. if math.IsNaN(agg.CPUCost) || math.IsInf(agg.CPUCost, 0) {
  474. log.Warnf("CPUCost is %f for '%s: %s/%s'", agg.CPUCost, agg.Cluster, agg.Aggregator, agg.Environment)
  475. agg.CPUCost = 0
  476. }
  477. if math.IsNaN(agg.CPUEfficiency) || math.IsInf(agg.CPUEfficiency, 0) {
  478. log.Warnf("CPUEfficiency is %f for '%s: %s/%s'", agg.CPUEfficiency, agg.Cluster, agg.Aggregator, agg.Environment)
  479. agg.CPUEfficiency = 0
  480. }
  481. if math.IsNaN(agg.Efficiency) || math.IsInf(agg.Efficiency, 0) {
  482. log.Warnf("Efficiency is %f for '%s: %s/%s'", agg.Efficiency, agg.Cluster, agg.Aggregator, agg.Environment)
  483. agg.Efficiency = 0
  484. }
  485. if math.IsNaN(agg.GPUAllocationHourlyAverage) || math.IsInf(agg.GPUAllocationHourlyAverage, 0) {
  486. log.Warnf("GPUAllocationHourlyAverage is %f for '%s: %s/%s'", agg.GPUAllocationHourlyAverage, agg.Cluster, agg.Aggregator, agg.Environment)
  487. agg.GPUAllocationHourlyAverage = 0
  488. }
  489. if math.IsNaN(agg.GPUCost) || math.IsInf(agg.GPUCost, 0) {
  490. log.Warnf("GPUCost is %f for '%s: %s/%s'", agg.GPUCost, agg.Cluster, agg.Aggregator, agg.Environment)
  491. agg.GPUCost = 0
  492. }
  493. if math.IsNaN(agg.RAMAllocationHourlyAverage) || math.IsInf(agg.RAMAllocationHourlyAverage, 0) {
  494. log.Warnf("RAMAllocationHourlyAverage is %f for '%s: %s/%s'", agg.RAMAllocationHourlyAverage, agg.Cluster, agg.Aggregator, agg.Environment)
  495. agg.RAMAllocationHourlyAverage = 0
  496. }
  497. if math.IsNaN(agg.RAMCost) || math.IsInf(agg.RAMCost, 0) {
  498. log.Warnf("RAMCost is %f for '%s: %s/%s'", agg.RAMCost, agg.Cluster, agg.Aggregator, agg.Environment)
  499. agg.RAMCost = 0
  500. }
  501. if math.IsNaN(agg.RAMEfficiency) || math.IsInf(agg.RAMEfficiency, 0) {
  502. log.Warnf("RAMEfficiency is %f for '%s: %s/%s'", agg.RAMEfficiency, agg.Cluster, agg.Aggregator, agg.Environment)
  503. agg.RAMEfficiency = 0
  504. }
  505. if math.IsNaN(agg.PVAllocationHourlyAverage) || math.IsInf(agg.PVAllocationHourlyAverage, 0) {
  506. log.Warnf("PVAllocationHourlyAverage is %f for '%s: %s/%s'", agg.PVAllocationHourlyAverage, agg.Cluster, agg.Aggregator, agg.Environment)
  507. agg.PVAllocationHourlyAverage = 0
  508. }
  509. if math.IsNaN(agg.PVCost) || math.IsInf(agg.PVCost, 0) {
  510. log.Warnf("PVCost is %f for '%s: %s/%s'", agg.PVCost, agg.Cluster, agg.Aggregator, agg.Environment)
  511. agg.PVCost = 0
  512. }
  513. if math.IsNaN(agg.NetworkCost) || math.IsInf(agg.NetworkCost, 0) {
  514. log.Warnf("NetworkCost is %f for '%s: %s/%s'", agg.NetworkCost, agg.Cluster, agg.Aggregator, agg.Environment)
  515. agg.NetworkCost = 0
  516. }
  517. if math.IsNaN(agg.SharedCost) || math.IsInf(agg.SharedCost, 0) {
  518. log.Warnf("SharedCost is %f for '%s: %s/%s'", agg.SharedCost, agg.Cluster, agg.Aggregator, agg.Environment)
  519. agg.SharedCost = 0
  520. }
  521. if math.IsNaN(agg.TotalCost) || math.IsInf(agg.TotalCost, 0) {
  522. log.Warnf("TotalCost is %f for '%s: %s/%s'", agg.TotalCost, agg.Cluster, agg.Aggregator, agg.Environment)
  523. agg.TotalCost = 0
  524. }
  525. }
  526. return aggregations
  527. }
  528. func aggregateDatum(cp models.Provider, aggregations map[string]*Aggregation, costDatum *CostData, field string, subfields []string, rate string, key string, discount float64, customDiscount float64, idleCoefficient float64, includeProperties bool) {
  529. // add new entry to aggregation results if a new key is encountered
  530. if _, ok := aggregations[key]; !ok {
  531. agg := &Aggregation{
  532. Aggregator: field,
  533. Environment: key,
  534. }
  535. if len(subfields) > 0 {
  536. agg.Subfields = subfields
  537. }
  538. if includeProperties {
  539. props := &opencost.AllocationProperties{}
  540. props.Cluster = costDatum.ClusterID
  541. props.Node = costDatum.NodeName
  542. if controller, kind, hasController := costDatum.GetController(); hasController {
  543. props.Controller = controller
  544. props.ControllerKind = kind
  545. }
  546. props.Labels = costDatum.Labels
  547. props.Annotations = costDatum.Annotations
  548. props.Namespace = costDatum.Namespace
  549. props.Pod = costDatum.PodName
  550. props.Services = costDatum.Services
  551. props.Container = costDatum.Name
  552. agg.Properties = props
  553. }
  554. aggregations[key] = agg
  555. }
  556. mergeVectors(cp, costDatum, aggregations[key], rate, discount, customDiscount, idleCoefficient)
  557. }
  558. func mergeVectors(cp models.Provider, costDatum *CostData, aggregation *Aggregation, rate string, discount float64, customDiscount float64, idleCoefficient float64) {
  559. aggregation.CPUAllocationVectors = addVectors(costDatum.CPUAllocation, aggregation.CPUAllocationVectors)
  560. aggregation.CPURequestedVectors = addVectors(costDatum.CPUReq, aggregation.CPURequestedVectors)
  561. aggregation.CPUUsedVectors = addVectors(costDatum.CPUUsed, aggregation.CPUUsedVectors)
  562. aggregation.RAMAllocationVectors = addVectors(costDatum.RAMAllocation, aggregation.RAMAllocationVectors)
  563. aggregation.RAMRequestedVectors = addVectors(costDatum.RAMReq, aggregation.RAMRequestedVectors)
  564. aggregation.RAMUsedVectors = addVectors(costDatum.RAMUsed, aggregation.RAMUsedVectors)
  565. aggregation.GPUAllocationVectors = addVectors(costDatum.GPUReq, aggregation.GPUAllocationVectors)
  566. for _, pvcd := range costDatum.PVCData {
  567. aggregation.PVAllocationVectors = addVectors(pvcd.Values, aggregation.PVAllocationVectors)
  568. }
  569. cpuv, ramv, gpuv, pvvs, netv := getPriceVectors(cp, costDatum, rate, discount, customDiscount, idleCoefficient)
  570. aggregation.CPUCostVector = addVectors(cpuv, aggregation.CPUCostVector)
  571. aggregation.RAMCostVector = addVectors(ramv, aggregation.RAMCostVector)
  572. aggregation.GPUCostVector = addVectors(gpuv, aggregation.GPUCostVector)
  573. aggregation.NetworkCostVector = addVectors(netv, aggregation.NetworkCostVector)
  574. for _, vectorList := range pvvs {
  575. aggregation.PVCostVector = addVectors(aggregation.PVCostVector, vectorList)
  576. }
  577. }
  578. // Returns the blended discounts applied to the node as a result of global discounts and reserved instance
  579. // discounts
  580. func getDiscounts(costDatum *CostData, cpuCost float64, ramCost float64, discount float64) (float64, float64) {
  581. if costDatum.NodeData == nil {
  582. return discount, discount
  583. }
  584. if costDatum.NodeData.IsSpot() {
  585. return 0, 0
  586. }
  587. reserved := costDatum.NodeData.Reserved
  588. // blended discounts
  589. blendedCPUDiscount := discount
  590. blendedRAMDiscount := discount
  591. if reserved != nil && reserved.CPUCost > 0 && reserved.RAMCost > 0 {
  592. reservedCPUDiscount := 0.0
  593. if cpuCost == 0 {
  594. log.Warnf("No cpu cost found for cluster '%s' node '%s'", costDatum.ClusterID, costDatum.NodeName)
  595. } else {
  596. reservedCPUDiscount = 1.0 - (reserved.CPUCost / cpuCost)
  597. }
  598. reservedRAMDiscount := 0.0
  599. if ramCost == 0 {
  600. log.Warnf("No ram cost found for cluster '%s' node '%s'", costDatum.ClusterID, costDatum.NodeName)
  601. } else {
  602. reservedRAMDiscount = 1.0 - (reserved.RAMCost / ramCost)
  603. }
  604. // AWS passes the # of reserved CPU and RAM as -1 to represent "All"
  605. if reserved.ReservedCPU < 0 && reserved.ReservedRAM < 0 {
  606. blendedCPUDiscount = reservedCPUDiscount
  607. blendedRAMDiscount = reservedRAMDiscount
  608. } else {
  609. nodeCPU, ierr := strconv.ParseInt(costDatum.NodeData.VCPU, 10, 64)
  610. nodeRAM, ferr := strconv.ParseFloat(costDatum.NodeData.RAMBytes, 64)
  611. if ierr == nil && ferr == nil {
  612. nodeRAMGB := nodeRAM / 1024 / 1024 / 1024
  613. reservedRAMGB := float64(reserved.ReservedRAM) / 1024 / 1024 / 1024
  614. nonReservedCPU := nodeCPU - reserved.ReservedCPU
  615. nonReservedRAM := nodeRAMGB - reservedRAMGB
  616. if nonReservedCPU == 0 {
  617. blendedCPUDiscount = reservedCPUDiscount
  618. } else {
  619. if nodeCPU == 0 {
  620. log.Warnf("No ram found for cluster '%s' node '%s'", costDatum.ClusterID, costDatum.NodeName)
  621. } else {
  622. blendedCPUDiscount = (float64(reserved.ReservedCPU) * reservedCPUDiscount) + (float64(nonReservedCPU)*discount)/float64(nodeCPU)
  623. }
  624. }
  625. if nonReservedRAM == 0 {
  626. blendedRAMDiscount = reservedRAMDiscount
  627. } else {
  628. if nodeRAMGB == 0 {
  629. log.Warnf("No ram found for cluster '%s' node '%s'", costDatum.ClusterID, costDatum.NodeName)
  630. } else {
  631. blendedRAMDiscount = (reservedRAMGB * reservedRAMDiscount) + (nonReservedRAM*discount)/nodeRAMGB
  632. }
  633. }
  634. }
  635. }
  636. }
  637. return blendedCPUDiscount, blendedRAMDiscount
  638. }
  639. func parseVectorPricing(cfg *models.CustomPricing, costDatum *CostData, cpuCostStr, ramCostStr, gpuCostStr, pvCostStr string) (float64, float64, float64, float64, bool) {
  640. usesCustom := false
  641. cpuCost, err := strconv.ParseFloat(cpuCostStr, 64)
  642. if err != nil || math.IsNaN(cpuCost) || math.IsInf(cpuCost, 0) || cpuCost == 0 {
  643. cpuCost, err = strconv.ParseFloat(cfg.CPU, 64)
  644. usesCustom = true
  645. if err != nil || math.IsNaN(cpuCost) || math.IsInf(cpuCost, 0) {
  646. cpuCost = 0
  647. }
  648. }
  649. ramCost, err := strconv.ParseFloat(ramCostStr, 64)
  650. if err != nil || math.IsNaN(ramCost) || math.IsInf(ramCost, 0) || ramCost == 0 {
  651. ramCost, err = strconv.ParseFloat(cfg.RAM, 64)
  652. usesCustom = true
  653. if err != nil || math.IsNaN(ramCost) || math.IsInf(ramCost, 0) {
  654. ramCost = 0
  655. }
  656. }
  657. gpuCost, err := strconv.ParseFloat(gpuCostStr, 64)
  658. if err != nil || math.IsNaN(gpuCost) || math.IsInf(gpuCost, 0) {
  659. gpuCost, err = strconv.ParseFloat(cfg.GPU, 64)
  660. if err != nil || math.IsNaN(gpuCost) || math.IsInf(gpuCost, 0) {
  661. gpuCost = 0
  662. }
  663. }
  664. pvCost, err := strconv.ParseFloat(pvCostStr, 64)
  665. if err != nil || math.IsNaN(cpuCost) || math.IsInf(cpuCost, 0) {
  666. pvCost, err = strconv.ParseFloat(cfg.Storage, 64)
  667. if err != nil || math.IsNaN(pvCost) || math.IsInf(pvCost, 0) {
  668. pvCost = 0
  669. }
  670. }
  671. return cpuCost, ramCost, gpuCost, pvCost, usesCustom
  672. }
  673. func getPriceVectors(cp models.Provider, costDatum *CostData, rate string, discount float64, customDiscount float64, idleCoefficient float64) ([]*util.Vector, []*util.Vector, []*util.Vector, [][]*util.Vector, []*util.Vector) {
  674. var cpuCost float64
  675. var ramCost float64
  676. var gpuCost float64
  677. var pvCost float64
  678. var usesCustom bool
  679. // If custom pricing is enabled and can be retrieved, replace
  680. // default cost values with custom values
  681. customPricing, err := cp.GetConfig()
  682. if err != nil {
  683. log.Errorf("failed to load custom pricing: %s", err)
  684. }
  685. if provider.CustomPricesEnabled(cp) && err == nil {
  686. var cpuCostStr string
  687. var ramCostStr string
  688. var gpuCostStr string
  689. var pvCostStr string
  690. if costDatum.NodeData.IsSpot() {
  691. cpuCostStr = customPricing.SpotCPU
  692. ramCostStr = customPricing.SpotRAM
  693. gpuCostStr = customPricing.SpotGPU
  694. } else {
  695. cpuCostStr = customPricing.CPU
  696. ramCostStr = customPricing.RAM
  697. gpuCostStr = customPricing.GPU
  698. }
  699. pvCostStr = customPricing.Storage
  700. cpuCost, ramCost, gpuCost, pvCost, usesCustom = parseVectorPricing(customPricing, costDatum, cpuCostStr, ramCostStr, gpuCostStr, pvCostStr)
  701. } else if costDatum.NodeData == nil && err == nil {
  702. cpuCostStr := customPricing.CPU
  703. ramCostStr := customPricing.RAM
  704. gpuCostStr := customPricing.GPU
  705. pvCostStr := customPricing.Storage
  706. cpuCost, ramCost, gpuCost, pvCost, usesCustom = parseVectorPricing(customPricing, costDatum, cpuCostStr, ramCostStr, gpuCostStr, pvCostStr)
  707. } else {
  708. cpuCostStr := costDatum.NodeData.VCPUCost
  709. ramCostStr := costDatum.NodeData.RAMCost
  710. gpuCostStr := costDatum.NodeData.GPUCost
  711. pvCostStr := costDatum.NodeData.StorageCost
  712. cpuCost, ramCost, gpuCost, pvCost, usesCustom = parseVectorPricing(customPricing, costDatum, cpuCostStr, ramCostStr, gpuCostStr, pvCostStr)
  713. }
  714. if usesCustom {
  715. log.DedupedWarningf(5, "No pricing data found for node `%s` , using custom pricing", costDatum.NodeName)
  716. }
  717. cpuDiscount, ramDiscount := getDiscounts(costDatum, cpuCost, ramCost, discount)
  718. log.Debugf("Node Name: %s", costDatum.NodeName)
  719. log.Debugf("Blended CPU Discount: %f", cpuDiscount)
  720. log.Debugf("Blended RAM Discount: %f", ramDiscount)
  721. // TODO should we try to apply the rate coefficient here or leave it as a totals-only metric?
  722. rateCoeff := 1.0
  723. if idleCoefficient == 0 {
  724. idleCoefficient = 1.0
  725. }
  726. cpuv := make([]*util.Vector, 0, len(costDatum.CPUAllocation))
  727. for _, val := range costDatum.CPUAllocation {
  728. cpuv = append(cpuv, &util.Vector{
  729. Timestamp: math.Round(val.Timestamp/10) * 10,
  730. Value: (val.Value * cpuCost * (1 - cpuDiscount) * (1 - customDiscount) / idleCoefficient) * rateCoeff,
  731. })
  732. }
  733. ramv := make([]*util.Vector, 0, len(costDatum.RAMAllocation))
  734. for _, val := range costDatum.RAMAllocation {
  735. ramv = append(ramv, &util.Vector{
  736. Timestamp: math.Round(val.Timestamp/10) * 10,
  737. Value: ((val.Value / 1024 / 1024 / 1024) * ramCost * (1 - ramDiscount) * (1 - customDiscount) / idleCoefficient) * rateCoeff,
  738. })
  739. }
  740. gpuv := make([]*util.Vector, 0, len(costDatum.GPUReq))
  741. for _, val := range costDatum.GPUReq {
  742. gpuv = append(gpuv, &util.Vector{
  743. Timestamp: math.Round(val.Timestamp/10) * 10,
  744. Value: (val.Value * gpuCost * (1 - discount) * (1 - customDiscount) / idleCoefficient) * rateCoeff,
  745. })
  746. }
  747. pvvs := make([][]*util.Vector, 0, len(costDatum.PVCData))
  748. for _, pvcData := range costDatum.PVCData {
  749. pvv := make([]*util.Vector, 0, len(pvcData.Values))
  750. if pvcData.Volume != nil {
  751. cost, _ := strconv.ParseFloat(pvcData.Volume.Cost, 64)
  752. // override with custom pricing if enabled
  753. if provider.CustomPricesEnabled(cp) {
  754. cost = pvCost
  755. }
  756. for _, val := range pvcData.Values {
  757. pvv = append(pvv, &util.Vector{
  758. Timestamp: math.Round(val.Timestamp/10) * 10,
  759. Value: ((val.Value / 1024 / 1024 / 1024) * cost * (1 - customDiscount) / idleCoefficient) * rateCoeff,
  760. })
  761. }
  762. pvvs = append(pvvs, pvv)
  763. }
  764. }
  765. netv := make([]*util.Vector, 0, len(costDatum.NetworkData))
  766. for _, val := range costDatum.NetworkData {
  767. netv = append(netv, &util.Vector{
  768. Timestamp: math.Round(val.Timestamp/10) * 10,
  769. Value: val.Value,
  770. })
  771. }
  772. return cpuv, ramv, gpuv, pvvs, netv
  773. }
  774. func averageVectors(vectors []*util.Vector) float64 {
  775. if len(vectors) == 0 {
  776. return 0.0
  777. }
  778. return totalVectors(vectors) / float64(len(vectors))
  779. }
  780. func totalVectors(vectors []*util.Vector) float64 {
  781. total := 0.0
  782. for _, vector := range vectors {
  783. total += vector.Value
  784. }
  785. return total
  786. }
  787. // addVectors adds two slices of Vectors. Vector timestamps are rounded to the
  788. // nearest ten seconds to allow matching of Vectors within a delta allowance.
  789. // Matching Vectors are summed, while unmatched Vectors are passed through.
  790. // e.g. [(t=1, 1), (t=2, 2)] + [(t=2, 2), (t=3, 3)] = [(t=1, 1), (t=2, 4), (t=3, 3)]
  791. func addVectors(xvs []*util.Vector, yvs []*util.Vector) []*util.Vector {
  792. sumOp := func(result *util.Vector, x *float64, y *float64) bool {
  793. if x != nil && y != nil {
  794. result.Value = *x + *y
  795. } else if y != nil {
  796. result.Value = *y
  797. } else if x != nil {
  798. result.Value = *x
  799. }
  800. return true
  801. }
  802. return util.ApplyVectorOp(xvs, yvs, sumOp)
  803. }
  804. // minCostDataLength sets the minimum number of time series data required to
  805. // cache both raw and aggregated cost data
  806. const minCostDataLength = 2
  807. // EmptyDataError describes an error caused by empty cost data for some
  808. // defined interval
  809. type EmptyDataError struct {
  810. err error
  811. window opencost.Window
  812. }
  813. // Error implements the error interface
  814. func (ede *EmptyDataError) Error() string {
  815. err := fmt.Sprintf("empty data for range: %s", ede.window)
  816. if ede.err != nil {
  817. err += fmt.Sprintf(": %s", ede.err)
  818. }
  819. return err
  820. }
  821. func costDataTimeSeriesLength(costData map[string]*CostData) int {
  822. l := 0
  823. for _, cd := range costData {
  824. if l < len(cd.RAMAllocation) {
  825. l = len(cd.RAMAllocation)
  826. }
  827. if l < len(cd.CPUAllocation) {
  828. l = len(cd.CPUAllocation)
  829. }
  830. }
  831. return l
  832. }
  833. // ScaleHourlyCostData converts per-hour cost data to per-resolution data. If the target resolution is higher (i.e. < 1.0h)
  834. // then we can do simple multiplication by the fraction-of-an-hour and retain accuracy. If the target resolution is
  835. // lower (i.e. > 1.0h) then we sum groups of hourly data by resolution to maintain fidelity.
  836. // e.g. (100 hours of per-hour hourly data, resolutionHours=10) => 10 data points, grouped and summed by 10-hour window
  837. // e.g. (20 minutes of per-minute hourly data, resolutionHours=1/60) => 20 data points, scaled down by a factor of 60
  838. func ScaleHourlyCostData(data map[string]*CostData, resolutionHours float64) map[string]*CostData {
  839. scaled := map[string]*CostData{}
  840. for key, datum := range data {
  841. datum.RAMReq = scaleVectorSeries(datum.RAMReq, resolutionHours)
  842. datum.RAMUsed = scaleVectorSeries(datum.RAMUsed, resolutionHours)
  843. datum.RAMAllocation = scaleVectorSeries(datum.RAMAllocation, resolutionHours)
  844. datum.CPUReq = scaleVectorSeries(datum.CPUReq, resolutionHours)
  845. datum.CPUUsed = scaleVectorSeries(datum.CPUUsed, resolutionHours)
  846. datum.CPUAllocation = scaleVectorSeries(datum.CPUAllocation, resolutionHours)
  847. datum.GPUReq = scaleVectorSeries(datum.GPUReq, resolutionHours)
  848. datum.NetworkData = scaleVectorSeries(datum.NetworkData, resolutionHours)
  849. for _, pvcDatum := range datum.PVCData {
  850. pvcDatum.Values = scaleVectorSeries(pvcDatum.Values, resolutionHours)
  851. }
  852. scaled[key] = datum
  853. }
  854. return scaled
  855. }
  856. func scaleVectorSeries(vs []*util.Vector, resolutionHours float64) []*util.Vector {
  857. // if scaling to a lower resolution, compress the hourly data for maximum accuracy
  858. if resolutionHours > 1.0 {
  859. return compressVectorSeries(vs, resolutionHours)
  860. }
  861. // if scaling to a higher resolution, simply scale each value down by the fraction of an hour
  862. for _, v := range vs {
  863. v.Value *= resolutionHours
  864. }
  865. return vs
  866. }
  867. func compressVectorSeries(vs []*util.Vector, resolutionHours float64) []*util.Vector {
  868. if len(vs) == 0 {
  869. return vs
  870. }
  871. compressed := []*util.Vector{}
  872. threshold := float64(60 * 60 * resolutionHours)
  873. var acc *util.Vector
  874. for i, v := range vs {
  875. if acc == nil {
  876. // start a new accumulation from current datum
  877. acc = &util.Vector{
  878. Value: vs[i].Value,
  879. Timestamp: vs[i].Timestamp,
  880. }
  881. continue
  882. }
  883. if v.Timestamp-acc.Timestamp < threshold {
  884. // v should be accumulated in current datum
  885. acc.Value += v.Value
  886. } else {
  887. // v falls outside current datum's threshold; append and start a new one
  888. compressed = append(compressed, acc)
  889. acc = &util.Vector{
  890. Value: vs[i].Value,
  891. Timestamp: vs[i].Timestamp,
  892. }
  893. }
  894. }
  895. // append any remaining, incomplete accumulation
  896. if acc != nil {
  897. compressed = append(compressed, acc)
  898. }
  899. return compressed
  900. }
  901. type AggregateQueryOpts struct {
  902. Rate string
  903. Filters map[string]string
  904. SharedResources *SharedResourceInfo
  905. ShareSplit string
  906. AllocateIdle bool
  907. IncludeTimeSeries bool
  908. IncludeEfficiency bool
  909. DisableAggregateCostModelCache bool
  910. ClearCache bool
  911. NoCache bool
  912. NoExpireCache bool
  913. RemoteEnabled bool
  914. DisableSharedOverhead bool
  915. UseETLAdapter bool
  916. }
  917. func DefaultAggregateQueryOpts() *AggregateQueryOpts {
  918. return &AggregateQueryOpts{
  919. Rate: "",
  920. Filters: map[string]string{},
  921. SharedResources: nil,
  922. ShareSplit: SplitTypeWeighted,
  923. AllocateIdle: false,
  924. IncludeTimeSeries: true,
  925. IncludeEfficiency: true,
  926. DisableAggregateCostModelCache: env.IsAggregateCostModelCacheDisabled(),
  927. ClearCache: false,
  928. NoCache: false,
  929. NoExpireCache: false,
  930. RemoteEnabled: env.IsRemoteEnabled(),
  931. DisableSharedOverhead: false,
  932. UseETLAdapter: false,
  933. }
  934. }
  935. // TODO deprecate and delete? this is absurd with tenantID...
  936. // ComputeAggregateCostModel computes cost data for the given window, then aggregates it by the given fields.
  937. // Data is cached on two levels: the aggregation is cached as well as the underlying cost data.
  938. func (a *Accesses) ComputeAggregateCostModel(promClient prometheusClient.Client, tenantID string, window opencost.Window, field string, subfields []string, opts *AggregateQueryOpts) (map[string]*Aggregation, string, error) {
  939. // Window is the range of the query, i.e. (start, end)
  940. // It must be closed, i.e. neither start nor end can be nil
  941. if window.IsOpen() {
  942. return nil, "", fmt.Errorf("illegal window: %s", window)
  943. }
  944. // Resolution is the duration of each datum in the cost model range query,
  945. // which corresponds to both the step size given to Prometheus query_range
  946. // and to the window passed to the range queries.
  947. // i.e. by default, we support 1h resolution for queries of windows defined
  948. // in terms of days or integer multiples of hours (e.g. 1d, 12h)
  949. resolution := time.Hour
  950. // Determine resolution by size of duration and divisibility of window.
  951. // By default, resolution is 1hr. If the window is smaller than 1hr, then
  952. // resolution goes down to 1m. If the window is not a multiple of 1hr, then
  953. // resolution goes down to 1m. If the window is greater than 1d, then
  954. // resolution gets scaled up to improve performance by reducing the amount
  955. // of data being computed.
  956. durMins := int64(math.Trunc(window.Minutes()))
  957. if durMins < 24*60 { // less than 1d
  958. // TODO should we have additional options for going by
  959. // e.g. 30m? 10m? 5m?
  960. if durMins%60 != 0 || durMins < 3*60 { // not divisible by 1h or less than 3h
  961. resolution = time.Minute
  962. }
  963. } else { // greater than 1d
  964. if durMins >= 7*24*60 { // greater than (or equal to) 7 days
  965. resolution = 24.0 * time.Hour
  966. } else if durMins >= 2*24*60 { // greater than (or equal to) 2 days
  967. resolution = 2.0 * time.Hour
  968. }
  969. }
  970. // Parse options
  971. if opts == nil {
  972. opts = DefaultAggregateQueryOpts()
  973. }
  974. rate := opts.Rate
  975. filters := opts.Filters
  976. sri := opts.SharedResources
  977. shared := opts.ShareSplit
  978. allocateIdle := opts.AllocateIdle
  979. includeTimeSeries := opts.IncludeTimeSeries
  980. includeEfficiency := opts.IncludeEfficiency
  981. disableAggregateCostModelCache := opts.DisableAggregateCostModelCache
  982. clearCache := opts.ClearCache
  983. noCache := opts.NoCache
  984. noExpireCache := opts.NoExpireCache
  985. remoteEnabled := opts.RemoteEnabled
  986. disableSharedOverhead := opts.DisableSharedOverhead
  987. // retainFuncs override filterFuncs. Make sure shared resources do not
  988. // get filtered out.
  989. retainFuncs := []FilterFunc{}
  990. retainFuncs = append(retainFuncs, func(cd *CostData) (bool, string) {
  991. if sri != nil {
  992. return sri.IsSharedResource(cd), ""
  993. }
  994. return false, ""
  995. })
  996. // Parse cost data filters into FilterFuncs
  997. filterFuncs := []FilterFunc{}
  998. aggregateEnvironment := func(costDatum *CostData) string {
  999. if field == "cluster" {
  1000. return costDatum.ClusterID
  1001. } else if field == "node" {
  1002. return costDatum.NodeName
  1003. } else if field == "namespace" {
  1004. return costDatum.Namespace
  1005. } else if field == "service" {
  1006. if len(costDatum.Services) > 0 {
  1007. return costDatum.Namespace + "/" + costDatum.Services[0]
  1008. }
  1009. } else if field == "deployment" {
  1010. if len(costDatum.Deployments) > 0 {
  1011. return costDatum.Namespace + "/" + costDatum.Deployments[0]
  1012. }
  1013. } else if field == "daemonset" {
  1014. if len(costDatum.Daemonsets) > 0 {
  1015. return costDatum.Namespace + "/" + costDatum.Daemonsets[0]
  1016. }
  1017. } else if field == "statefulset" {
  1018. if len(costDatum.Statefulsets) > 0 {
  1019. return costDatum.Namespace + "/" + costDatum.Statefulsets[0]
  1020. }
  1021. } else if field == "label" {
  1022. if costDatum.Labels != nil {
  1023. for _, sf := range subfields {
  1024. if subfieldName, ok := costDatum.Labels[sf]; ok {
  1025. return fmt.Sprintf("%s=%s", sf, subfieldName)
  1026. }
  1027. }
  1028. }
  1029. } else if field == "annotation" {
  1030. if costDatum.Annotations != nil {
  1031. for _, sf := range subfields {
  1032. if subfieldName, ok := costDatum.Annotations[sf]; ok {
  1033. return fmt.Sprintf("%s=%s", sf, subfieldName)
  1034. }
  1035. }
  1036. }
  1037. } else if field == "pod" {
  1038. return costDatum.Namespace + "/" + costDatum.PodName
  1039. } else if field == "container" {
  1040. return costDatum.Namespace + "/" + costDatum.PodName + "/" + costDatum.Name
  1041. }
  1042. return ""
  1043. }
  1044. if filters["podprefix"] != "" {
  1045. pps := []string{}
  1046. for _, fp := range strings.Split(filters["podprefix"], ",") {
  1047. if fp != "" {
  1048. cleanedFilter := strings.TrimSpace(fp)
  1049. pps = append(pps, cleanedFilter)
  1050. }
  1051. }
  1052. filterFuncs = append(filterFuncs, func(cd *CostData) (bool, string) {
  1053. aggEnv := aggregateEnvironment(cd)
  1054. for _, pp := range pps {
  1055. cleanedFilter := strings.TrimSpace(pp)
  1056. if strings.HasPrefix(cd.PodName, cleanedFilter) {
  1057. return true, aggEnv
  1058. }
  1059. }
  1060. return false, aggEnv
  1061. })
  1062. }
  1063. if filters["namespace"] != "" {
  1064. // namespaces may be comma-separated, e.g. kubecost,default
  1065. // multiple namespaces are evaluated as an OR relationship
  1066. nss := strings.Split(filters["namespace"], ",")
  1067. filterFuncs = append(filterFuncs, func(cd *CostData) (bool, string) {
  1068. aggEnv := aggregateEnvironment(cd)
  1069. for _, ns := range nss {
  1070. nsTrim := strings.TrimSpace(ns)
  1071. if cd.Namespace == nsTrim {
  1072. return true, aggEnv
  1073. } else if strings.HasSuffix(nsTrim, "*") { // trigger wildcard prefix filtering
  1074. nsTrimAsterisk := strings.TrimSuffix(nsTrim, "*")
  1075. if strings.HasPrefix(cd.Namespace, nsTrimAsterisk) {
  1076. return true, aggEnv
  1077. }
  1078. }
  1079. }
  1080. return false, aggEnv
  1081. })
  1082. }
  1083. if filters["node"] != "" {
  1084. // nodes may be comma-separated, e.g. aws-node-1,aws-node-2
  1085. // multiple nodes are evaluated as an OR relationship
  1086. nodes := strings.Split(filters["node"], ",")
  1087. filterFuncs = append(filterFuncs, func(cd *CostData) (bool, string) {
  1088. aggEnv := aggregateEnvironment(cd)
  1089. for _, node := range nodes {
  1090. nodeTrim := strings.TrimSpace(node)
  1091. if cd.NodeName == nodeTrim {
  1092. return true, aggEnv
  1093. } else if strings.HasSuffix(nodeTrim, "*") { // trigger wildcard prefix filtering
  1094. nodeTrimAsterisk := strings.TrimSuffix(nodeTrim, "*")
  1095. if strings.HasPrefix(cd.NodeName, nodeTrimAsterisk) {
  1096. return true, aggEnv
  1097. }
  1098. }
  1099. }
  1100. return false, aggEnv
  1101. })
  1102. }
  1103. if filters["cluster"] != "" {
  1104. // clusters may be comma-separated, e.g. cluster-one,cluster-two
  1105. // multiple clusters are evaluated as an OR relationship
  1106. cs := strings.Split(filters["cluster"], ",")
  1107. filterFuncs = append(filterFuncs, func(cd *CostData) (bool, string) {
  1108. aggEnv := aggregateEnvironment(cd)
  1109. for _, c := range cs {
  1110. cTrim := strings.TrimSpace(c)
  1111. id, name := cd.ClusterID, cd.ClusterName
  1112. if id == cTrim || name == cTrim {
  1113. return true, aggEnv
  1114. } else if strings.HasSuffix(cTrim, "*") { // trigger wildcard prefix filtering
  1115. cTrimAsterisk := strings.TrimSuffix(cTrim, "*")
  1116. if strings.HasPrefix(id, cTrimAsterisk) || strings.HasPrefix(name, cTrimAsterisk) {
  1117. return true, aggEnv
  1118. }
  1119. }
  1120. }
  1121. return false, aggEnv
  1122. })
  1123. }
  1124. if filters["labels"] != "" {
  1125. // labels are expected to be comma-separated and to take the form key=value
  1126. // e.g. app=cost-analyzer,app.kubernetes.io/instance=kubecost
  1127. // each different label will be applied as an AND
  1128. // multiple values for a single label will be evaluated as an OR
  1129. labelValues := map[string][]string{}
  1130. ls := strings.Split(filters["labels"], ",")
  1131. for _, l := range ls {
  1132. lTrim := strings.TrimSpace(l)
  1133. label := strings.Split(lTrim, "=")
  1134. if len(label) == 2 {
  1135. ln := promutil.SanitizeLabelName(strings.TrimSpace(label[0]))
  1136. lv := strings.TrimSpace(label[1])
  1137. labelValues[ln] = append(labelValues[ln], lv)
  1138. } else {
  1139. // label is not of the form name=value, so log it and move on
  1140. log.Warnf("ComputeAggregateCostModel: skipping illegal label filter: %s", l)
  1141. }
  1142. }
  1143. // Generate FilterFunc for each set of label filters by invoking a function instead of accessing
  1144. // values by closure to prevent reference-type looping bug.
  1145. // (see https://github.com/golang/go/wiki/CommonMistakes#using-reference-to-loop-iterator-variable)
  1146. for label, values := range labelValues {
  1147. ff := (func(l string, vs []string) FilterFunc {
  1148. return func(cd *CostData) (bool, string) {
  1149. ae := aggregateEnvironment(cd)
  1150. for _, v := range vs {
  1151. if v == "__unallocated__" { // Special case. __unallocated__ means return all pods without the attached label
  1152. if _, ok := cd.Labels[l]; !ok {
  1153. return true, ae
  1154. }
  1155. }
  1156. if cd.Labels[l] == v {
  1157. return true, ae
  1158. } else if strings.HasSuffix(v, "*") { // trigger wildcard prefix filtering
  1159. vTrim := strings.TrimSuffix(v, "*")
  1160. if strings.HasPrefix(cd.Labels[l], vTrim) {
  1161. return true, ae
  1162. }
  1163. }
  1164. }
  1165. return false, ae
  1166. }
  1167. })(label, values)
  1168. filterFuncs = append(filterFuncs, ff)
  1169. }
  1170. }
  1171. if filters["annotations"] != "" {
  1172. // annotations are expected to be comma-separated and to take the form key=value
  1173. // e.g. app=cost-analyzer,app.kubernetes.io/instance=kubecost
  1174. // each different annotation will be applied as an AND
  1175. // multiple values for a single annotation will be evaluated as an OR
  1176. annotationValues := map[string][]string{}
  1177. as := strings.Split(filters["annotations"], ",")
  1178. for _, annot := range as {
  1179. aTrim := strings.TrimSpace(annot)
  1180. annotation := strings.Split(aTrim, "=")
  1181. if len(annotation) == 2 {
  1182. an := promutil.SanitizeLabelName(strings.TrimSpace(annotation[0]))
  1183. av := strings.TrimSpace(annotation[1])
  1184. annotationValues[an] = append(annotationValues[an], av)
  1185. } else {
  1186. // annotation is not of the form name=value, so log it and move on
  1187. log.Warnf("ComputeAggregateCostModel: skipping illegal annotation filter: %s", annot)
  1188. }
  1189. }
  1190. // Generate FilterFunc for each set of annotation filters by invoking a function instead of accessing
  1191. // values by closure to prevent reference-type looping bug.
  1192. // (see https://github.com/golang/go/wiki/CommonMistakes#using-reference-to-loop-iterator-variable)
  1193. for annotation, values := range annotationValues {
  1194. ff := (func(l string, vs []string) FilterFunc {
  1195. return func(cd *CostData) (bool, string) {
  1196. ae := aggregateEnvironment(cd)
  1197. for _, v := range vs {
  1198. if v == "__unallocated__" { // Special case. __unallocated__ means return all pods without the attached label
  1199. if _, ok := cd.Annotations[l]; !ok {
  1200. return true, ae
  1201. }
  1202. }
  1203. if cd.Annotations[l] == v {
  1204. return true, ae
  1205. } else if strings.HasSuffix(v, "*") { // trigger wildcard prefix filtering
  1206. vTrim := strings.TrimSuffix(v, "*")
  1207. if strings.HasPrefix(cd.Annotations[l], vTrim) {
  1208. return true, ae
  1209. }
  1210. }
  1211. }
  1212. return false, ae
  1213. }
  1214. })(annotation, values)
  1215. filterFuncs = append(filterFuncs, ff)
  1216. }
  1217. }
  1218. // clear cache prior to checking the cache so that a clearCache=true
  1219. // request always returns a freshly computed value
  1220. if clearCache {
  1221. a.AggregateCache.Flush()
  1222. a.CostDataCache.Flush()
  1223. }
  1224. cacheExpiry := a.GetCacheExpiration(window.Duration())
  1225. if noExpireCache {
  1226. cacheExpiry = cache.NoExpiration
  1227. }
  1228. // parametrize cache key by all request parameters
  1229. aggKey := GenerateAggKey(window, field, subfields, opts)
  1230. thanosOffset := time.Now().Add(-thanos.OffsetDuration())
  1231. if a.ThanosClient != nil && window.End().After(thanosOffset) {
  1232. log.Infof("ComputeAggregateCostModel: setting end time backwards to first present data")
  1233. // Apply offsets to both end and start times to maintain correct time range
  1234. deltaDuration := window.End().Sub(thanosOffset)
  1235. s := window.Start().Add(-1 * deltaDuration)
  1236. e := time.Now().Add(-thanos.OffsetDuration())
  1237. window.Set(&s, &e)
  1238. }
  1239. dur, off := window.DurationOffsetStrings()
  1240. key := fmt.Sprintf(`%s:%s:%fh:%t`, dur, off, resolution.Hours(), remoteEnabled)
  1241. // report message about which of the two caches hit. by default report a miss
  1242. cacheMessage := fmt.Sprintf("ComputeAggregateCostModel: L1 cache miss: %s L2 cache miss: %s", aggKey, key)
  1243. // check the cache for aggregated response; if cache is hit and not disabled, return response
  1244. if value, found := a.AggregateCache.Get(aggKey); found && !disableAggregateCostModelCache && !noCache {
  1245. result, ok := value.(map[string]*Aggregation)
  1246. if !ok {
  1247. // disable cache and recompute if type cast fails
  1248. log.Errorf("ComputeAggregateCostModel: caching error: failed to cast aggregate data to struct: %s", aggKey)
  1249. return a.ComputeAggregateCostModel(promClient, tenantID, window, field, subfields, opts)
  1250. }
  1251. return result, fmt.Sprintf("aggregate cache hit: %s", aggKey), nil
  1252. }
  1253. if window.Hours() >= 1.0 {
  1254. // exclude the last window of the time frame to match Prometheus definitions of range, offset, and resolution
  1255. start := window.Start().Add(resolution)
  1256. window.Set(&start, window.End())
  1257. } else {
  1258. // don't cache requests for durations of less than one hour
  1259. disableAggregateCostModelCache = true
  1260. }
  1261. // attempt to retrieve cost data from cache
  1262. var costData map[string]*CostData
  1263. var err error
  1264. cacheData, found := a.CostDataCache.Get(key)
  1265. if found && !disableAggregateCostModelCache && !noCache {
  1266. ok := false
  1267. costData, ok = cacheData.(map[string]*CostData)
  1268. cacheMessage = fmt.Sprintf("ComputeAggregateCostModel: L1 cache miss: %s, L2 cost data cache hit: %s", aggKey, key)
  1269. if !ok {
  1270. log.Errorf("ComputeAggregateCostModel: caching error: failed to cast cost data to struct: %s", key)
  1271. }
  1272. } else {
  1273. log.Infof("ComputeAggregateCostModel: missed cache: %s (found %t, disableAggregateCostModelCache %t, noCache %t)", key, found, disableAggregateCostModelCache, noCache)
  1274. costData, err = a.Model.ComputeCostDataRange(promClient, a.CloudProvider, window, resolution, "", "", remoteEnabled)
  1275. if err != nil {
  1276. if prom.IsErrorCollection(err) {
  1277. return nil, "", err
  1278. }
  1279. if pce, ok := err.(prom.CommError); ok {
  1280. return nil, "", pce
  1281. }
  1282. if strings.Contains(err.Error(), "data is empty") {
  1283. return nil, "", &EmptyDataError{err: err, window: window}
  1284. }
  1285. return nil, "", err
  1286. }
  1287. // compute length of the time series in the cost data and only compute
  1288. // aggregates and cache if the length is sufficiently high
  1289. costDataLen := costDataTimeSeriesLength(costData)
  1290. if costDataLen == 0 {
  1291. return nil, "", &EmptyDataError{window: window}
  1292. }
  1293. if costDataLen >= minCostDataLength && !noCache {
  1294. log.Infof("ComputeAggregateCostModel: setting L2 cache: %s", key)
  1295. a.CostDataCache.Set(key, costData, cacheExpiry)
  1296. }
  1297. }
  1298. c, err := a.CloudProvider.GetConfig()
  1299. if err != nil {
  1300. return nil, "", err
  1301. }
  1302. discount, err := ParsePercentString(c.Discount)
  1303. if err != nil {
  1304. return nil, "", err
  1305. }
  1306. customDiscount, err := ParsePercentString(c.NegotiatedDiscount)
  1307. if err != nil {
  1308. return nil, "", err
  1309. }
  1310. sc := make(map[string]*SharedCostInfo)
  1311. if !disableSharedOverhead {
  1312. costPerMonth := c.GetSharedOverheadCostPerMonth()
  1313. durationCoefficient := window.Hours() / timeutil.HoursPerMonth
  1314. sc["total"] = &SharedCostInfo{
  1315. Name: "total",
  1316. Cost: costPerMonth * durationCoefficient,
  1317. }
  1318. }
  1319. idleCoefficients := make(map[string]float64)
  1320. if allocateIdle {
  1321. dur, off, err := window.DurationOffset()
  1322. if err != nil {
  1323. log.Errorf("ComputeAggregateCostModel: error computing idle coefficient: illegal window: %s (%s)", window, err)
  1324. return nil, "", err
  1325. }
  1326. if a.ThanosClient != nil && off < thanos.OffsetDuration() {
  1327. // Determine difference between the Thanos offset and the requested
  1328. // offset; e.g. off=1h, thanosOffsetDuration=3h => diff=2h
  1329. diff := thanos.OffsetDuration() - off
  1330. // Reduce duration by difference and increase offset by difference
  1331. // e.g. 24h offset 0h => 21h offset 3h
  1332. dur = dur - diff
  1333. off = thanos.OffsetDuration()
  1334. log.Infof("ComputeAggregateCostModel: setting duration, offset to %s, %s due to Thanos", dur, off)
  1335. // Idle computation cannot be fulfilled for some windows, specifically
  1336. // those with sum(duration, offset) < Thanos offset, because there is
  1337. // no data within that window.
  1338. if dur <= 0 {
  1339. return nil, "", fmt.Errorf("requested idle coefficients from Thanos for illegal duration, offset: %s, %s (original window %s)", dur, off, window)
  1340. }
  1341. }
  1342. idleCoefficients, err = a.ComputeIdleCoefficient(costData, promClient, a.CloudProvider, discount, customDiscount, dur, off)
  1343. if err != nil {
  1344. durStr, offStr := timeutil.DurationOffsetStrings(dur, off)
  1345. log.Errorf("ComputeAggregateCostModel: error computing idle coefficient: duration=%s, offset=%s, err=%s", durStr, offStr, err)
  1346. return nil, "", err
  1347. }
  1348. }
  1349. totalContainerCost := 0.0
  1350. if shared == SplitTypeWeighted {
  1351. totalContainerCost = GetTotalContainerCost(costData, rate, a.CloudProvider, discount, customDiscount, idleCoefficients)
  1352. }
  1353. // filter cost data by namespace and cluster after caching for maximal cache hits
  1354. costData, filteredContainerCount, filteredEnvironments := FilterCostData(costData, retainFuncs, filterFuncs)
  1355. // aggregate cost model data by given fields and cache the result for the default expiration
  1356. aggOpts := &AggregationOptions{
  1357. Discount: discount,
  1358. CustomDiscount: customDiscount,
  1359. IdleCoefficients: idleCoefficients,
  1360. IncludeEfficiency: includeEfficiency,
  1361. IncludeTimeSeries: includeTimeSeries,
  1362. Rate: rate,
  1363. ResolutionHours: resolution.Hours(),
  1364. SharedResourceInfo: sri,
  1365. SharedCosts: sc,
  1366. FilteredContainerCount: filteredContainerCount,
  1367. FilteredEnvironments: filteredEnvironments,
  1368. TotalContainerCost: totalContainerCost,
  1369. SharedSplit: shared,
  1370. }
  1371. result := AggregateCostData(costData, field, subfields, a.CloudProvider, aggOpts)
  1372. // If sending time series data back, switch scale back to hourly data. At this point,
  1373. // resolutionHours may have converted our hourly data to more- or less-than hourly data.
  1374. if includeTimeSeries {
  1375. for _, aggs := range result {
  1376. ScaleAggregationTimeSeries(aggs, resolution.Hours())
  1377. }
  1378. }
  1379. // compute length of the time series in the cost data and only cache
  1380. // aggregation results if the length is sufficiently high
  1381. costDataLen := costDataTimeSeriesLength(costData)
  1382. if costDataLen >= minCostDataLength && window.Hours() > 1.0 && !noCache {
  1383. // Set the result map (rather than a pointer to it) because map is a reference type
  1384. log.Infof("ComputeAggregateCostModel: setting aggregate cache: %s", aggKey)
  1385. a.AggregateCache.Set(aggKey, result, cacheExpiry)
  1386. } else {
  1387. log.Infof("ComputeAggregateCostModel: not setting aggregate cache: %s (not enough data: %t; duration less than 1h: %t; noCache: %t)", key, costDataLen < minCostDataLength, window.Hours() < 1, noCache)
  1388. }
  1389. return result, cacheMessage, nil
  1390. }
  1391. // ScaleAggregationTimeSeries reverses the scaling done by ScaleHourlyCostData, returning
  1392. // the aggregation's time series to hourly data.
  1393. func ScaleAggregationTimeSeries(aggregation *Aggregation, resolutionHours float64) {
  1394. for _, v := range aggregation.CPUCostVector {
  1395. v.Value /= resolutionHours
  1396. }
  1397. for _, v := range aggregation.GPUCostVector {
  1398. v.Value /= resolutionHours
  1399. }
  1400. for _, v := range aggregation.RAMCostVector {
  1401. v.Value /= resolutionHours
  1402. }
  1403. for _, v := range aggregation.PVCostVector {
  1404. v.Value /= resolutionHours
  1405. }
  1406. for _, v := range aggregation.NetworkCostVector {
  1407. v.Value /= resolutionHours
  1408. }
  1409. for _, v := range aggregation.TotalCostVector {
  1410. v.Value /= resolutionHours
  1411. }
  1412. return
  1413. }
  1414. // String returns a string representation of the encapsulated shared resources, which
  1415. // can be used to uniquely identify a set of shared resources. Sorting sets of shared
  1416. // resources ensures that strings representing permutations of the same combination match.
  1417. func (s *SharedResourceInfo) String() string {
  1418. if s == nil {
  1419. return ""
  1420. }
  1421. nss := []string{}
  1422. for ns := range s.SharedNamespace {
  1423. nss = append(nss, ns)
  1424. }
  1425. sort.Strings(nss)
  1426. nsStr := strings.Join(nss, ",")
  1427. labels := []string{}
  1428. for lbl, vals := range s.LabelSelectors {
  1429. for val := range vals {
  1430. if lbl != "" && val != "" {
  1431. labels = append(labels, fmt.Sprintf("%s=%s", lbl, val))
  1432. }
  1433. }
  1434. }
  1435. sort.Strings(labels)
  1436. labelStr := strings.Join(labels, ",")
  1437. return fmt.Sprintf("%s:%s", nsStr, labelStr)
  1438. }
  1439. type aggKeyParams struct {
  1440. duration string
  1441. offset string
  1442. filters map[string]string
  1443. field string
  1444. subfields []string
  1445. rate string
  1446. sri *SharedResourceInfo
  1447. shareType string
  1448. idle bool
  1449. timeSeries bool
  1450. efficiency bool
  1451. }
  1452. // GenerateAggKey generates a parameter-unique key for caching the aggregate cost model
  1453. func GenerateAggKey(window opencost.Window, field string, subfields []string, opts *AggregateQueryOpts) string {
  1454. if opts == nil {
  1455. opts = DefaultAggregateQueryOpts()
  1456. }
  1457. // Covert to duration, offset so that cache hits occur, even when timestamps have
  1458. // shifted slightly.
  1459. duration, offset := window.DurationOffsetStrings()
  1460. // parse, trim, and sort podprefix filters
  1461. podPrefixFilters := []string{}
  1462. if ppfs, ok := opts.Filters["podprefix"]; ok && ppfs != "" {
  1463. for _, psf := range strings.Split(ppfs, ",") {
  1464. podPrefixFilters = append(podPrefixFilters, strings.TrimSpace(psf))
  1465. }
  1466. }
  1467. sort.Strings(podPrefixFilters)
  1468. podPrefixFiltersStr := strings.Join(podPrefixFilters, ",")
  1469. // parse, trim, and sort namespace filters
  1470. nsFilters := []string{}
  1471. if nsfs, ok := opts.Filters["namespace"]; ok && nsfs != "" {
  1472. for _, nsf := range strings.Split(nsfs, ",") {
  1473. nsFilters = append(nsFilters, strings.TrimSpace(nsf))
  1474. }
  1475. }
  1476. sort.Strings(nsFilters)
  1477. nsFilterStr := strings.Join(nsFilters, ",")
  1478. // parse, trim, and sort node filters
  1479. nodeFilters := []string{}
  1480. if nodefs, ok := opts.Filters["node"]; ok && nodefs != "" {
  1481. for _, nodef := range strings.Split(nodefs, ",") {
  1482. nodeFilters = append(nodeFilters, strings.TrimSpace(nodef))
  1483. }
  1484. }
  1485. sort.Strings(nodeFilters)
  1486. nodeFilterStr := strings.Join(nodeFilters, ",")
  1487. // parse, trim, and sort cluster filters
  1488. cFilters := []string{}
  1489. if cfs, ok := opts.Filters["cluster"]; ok && cfs != "" {
  1490. for _, cf := range strings.Split(cfs, ",") {
  1491. cFilters = append(cFilters, strings.TrimSpace(cf))
  1492. }
  1493. }
  1494. sort.Strings(cFilters)
  1495. cFilterStr := strings.Join(cFilters, ",")
  1496. // parse, trim, and sort label filters
  1497. lFilters := []string{}
  1498. if lfs, ok := opts.Filters["labels"]; ok && lfs != "" {
  1499. for _, lf := range strings.Split(lfs, ",") {
  1500. // trim whitespace from the label name and the label value
  1501. // of each label name/value pair, then reconstruct
  1502. // e.g. "tier = frontend, app = kubecost" == "app=kubecost,tier=frontend"
  1503. lfa := strings.Split(lf, "=")
  1504. if len(lfa) == 2 {
  1505. lfn := strings.TrimSpace(lfa[0])
  1506. lfv := strings.TrimSpace(lfa[1])
  1507. lFilters = append(lFilters, fmt.Sprintf("%s=%s", lfn, lfv))
  1508. } else {
  1509. // label is not of the form name=value, so log it and move on
  1510. log.Warnf("GenerateAggKey: skipping illegal label filter: %s", lf)
  1511. }
  1512. }
  1513. }
  1514. sort.Strings(lFilters)
  1515. lFilterStr := strings.Join(lFilters, ",")
  1516. // parse, trim, and sort annotation filters
  1517. aFilters := []string{}
  1518. if afs, ok := opts.Filters["annotations"]; ok && afs != "" {
  1519. for _, af := range strings.Split(afs, ",") {
  1520. // trim whitespace from the annotation name and the annotation value
  1521. // of each annotation name/value pair, then reconstruct
  1522. // e.g. "tier = frontend, app = kubecost" == "app=kubecost,tier=frontend"
  1523. afa := strings.Split(af, "=")
  1524. if len(afa) == 2 {
  1525. afn := strings.TrimSpace(afa[0])
  1526. afv := strings.TrimSpace(afa[1])
  1527. aFilters = append(aFilters, fmt.Sprintf("%s=%s", afn, afv))
  1528. } else {
  1529. // annotation is not of the form name=value, so log it and move on
  1530. log.Warnf("GenerateAggKey: skipping illegal annotation filter: %s", af)
  1531. }
  1532. }
  1533. }
  1534. sort.Strings(aFilters)
  1535. aFilterStr := strings.Join(aFilters, ",")
  1536. filterStr := fmt.Sprintf("%s:%s:%s:%s:%s:%s", nsFilterStr, nodeFilterStr, cFilterStr, lFilterStr, aFilterStr, podPrefixFiltersStr)
  1537. sort.Strings(subfields)
  1538. fieldStr := fmt.Sprintf("%s:%s", field, strings.Join(subfields, ","))
  1539. if offset == "1m" {
  1540. offset = ""
  1541. }
  1542. return fmt.Sprintf("%s:%s:%s:%s:%s:%s:%s:%t:%t:%t", duration, offset, filterStr, fieldStr, opts.Rate,
  1543. opts.SharedResources, opts.ShareSplit, opts.AllocateIdle, opts.IncludeTimeSeries,
  1544. opts.IncludeEfficiency)
  1545. }
  1546. // TODO this is deprecated, right? Can I delete??
  1547. // Aggregator is capable of computing the aggregated cost model. This is
  1548. // a brutal interface, which should be cleaned up, but it's necessary for
  1549. // being able to swap in an ETL-backed implementation.
  1550. type Aggregator interface {
  1551. ComputeAggregateCostModel(promClient prometheusClient.Client, tenantID string, window opencost.Window, field string, subfields []string, opts *AggregateQueryOpts) (map[string]*Aggregation, string, error)
  1552. }
  1553. var (
  1554. // Convert UTC-RFC3339 pairs to configured UTC offset
  1555. // e.g. with UTC offset of -0600, 2020-07-01T00:00:00Z becomes
  1556. // 2020-07-01T06:00:00Z == 2020-07-01T00:00:00-0600
  1557. // TODO niko/etl fix the frontend because this is confusing if you're
  1558. // actually asking for UTC time (...Z) and we swap that "Z" out for the
  1559. // configured UTC offset without asking
  1560. rfc3339 = `\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\dZ`
  1561. rfc3339Regex = regexp.MustCompile(fmt.Sprintf(`(%s),(%s)`, rfc3339, rfc3339))
  1562. durRegex = regexp.MustCompile(`^(\d+)(m|h|d|s)$`)
  1563. percentRegex = regexp.MustCompile(`(\d+\.*\d*)%`)
  1564. )
  1565. // TODO tenantID for this function is absurd. Can I delete?
  1566. // AggregateCostModelHandler handles requests to the aggregated cost model API. See
  1567. // ComputeAggregateCostModel for details.
  1568. func (a *Accesses) AggregateCostModelHandler(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  1569. w.Header().Set("Content-Type", "application/json")
  1570. // TODO replace with real auth service (token, authenication, etc.)
  1571. tenantID := r.Header.Get("Tenant")
  1572. if tenantID == "" {
  1573. http.Error(w, "Unauthorized", http.StatusUnauthorized)
  1574. return
  1575. }
  1576. windowStr := r.URL.Query().Get("window")
  1577. match := rfc3339Regex.FindStringSubmatch(windowStr)
  1578. if match != nil {
  1579. start, _ := time.Parse(time.RFC3339, match[1])
  1580. start = start.Add(-env.GetParsedUTCOffset()).In(time.UTC)
  1581. end, _ := time.Parse(time.RFC3339, match[2])
  1582. end = end.Add(-env.GetParsedUTCOffset()).In(time.UTC)
  1583. windowStr = fmt.Sprintf("%sZ,%sZ", start.Format("2006-01-02T15:04:05"), end.Format("2006-01-02T15:04:05Z"))
  1584. }
  1585. // determine duration and offset from query parameters
  1586. window, err := opencost.ParseWindowWithOffset(windowStr, env.GetParsedUTCOffset())
  1587. if err != nil || window.Start() == nil {
  1588. WriteError(w, BadRequest(fmt.Sprintf("invalid window: %s", err)))
  1589. return
  1590. }
  1591. isDurationStr := durRegex.MatchString(windowStr)
  1592. // legacy offset option should override window offset
  1593. if r.URL.Query().Get("offset") != "" {
  1594. offset := r.URL.Query().Get("offset")
  1595. // Shift window by offset, but only when manually set with separate
  1596. // parameter and window was provided as a duration string. Otherwise,
  1597. // do not alter the (duration, offset) from ParseWindowWithOffset.
  1598. if offset != "1m" && isDurationStr {
  1599. match := durRegex.FindStringSubmatch(offset)
  1600. if match != nil && len(match) == 3 {
  1601. dur := time.Minute
  1602. if match[2] == "h" {
  1603. dur = time.Hour
  1604. }
  1605. if match[2] == "d" {
  1606. dur = 24 * time.Hour
  1607. }
  1608. if match[2] == "s" {
  1609. dur = time.Second
  1610. }
  1611. num, _ := strconv.ParseInt(match[1], 10, 64)
  1612. window = window.Shift(-time.Duration(num) * dur)
  1613. }
  1614. }
  1615. }
  1616. opts := DefaultAggregateQueryOpts()
  1617. // parse remaining query parameters
  1618. namespace := r.URL.Query().Get("namespace")
  1619. cluster := r.URL.Query().Get("cluster")
  1620. labels := r.URL.Query().Get("labels")
  1621. annotations := r.URL.Query().Get("annotations")
  1622. podprefix := r.URL.Query().Get("podprefix")
  1623. field := r.URL.Query().Get("aggregation")
  1624. sharedNamespaces := r.URL.Query().Get("sharedNamespaces")
  1625. sharedLabelNames := r.URL.Query().Get("sharedLabelNames")
  1626. sharedLabelValues := r.URL.Query().Get("sharedLabelValues")
  1627. remote := r.URL.Query().Get("remote") != "false"
  1628. subfieldStr := r.URL.Query().Get("aggregationSubfield")
  1629. subfields := []string{}
  1630. if len(subfieldStr) > 0 {
  1631. s := strings.Split(r.URL.Query().Get("aggregationSubfield"), ",")
  1632. for _, rawLabel := range s {
  1633. subfields = append(subfields, promutil.SanitizeLabelName(rawLabel))
  1634. }
  1635. }
  1636. idleFlag := r.URL.Query().Get("allocateIdle")
  1637. if idleFlag == "default" {
  1638. c, _ := a.CloudProvider.GetConfig()
  1639. opts.AllocateIdle = (c.DefaultIdle == "true")
  1640. } else {
  1641. opts.AllocateIdle = (idleFlag == "true")
  1642. }
  1643. opts.Rate = r.URL.Query().Get("rate")
  1644. opts.ShareSplit = r.URL.Query().Get("sharedSplit")
  1645. // timeSeries == true maintains the time series dimension of the data,
  1646. // which by default gets summed over the entire interval
  1647. opts.IncludeTimeSeries = r.URL.Query().Get("timeSeries") == "true"
  1648. // efficiency has been deprecated in favor of a default to always send efficiency
  1649. opts.IncludeEfficiency = true
  1650. // TODO niko/caching rename "recomputeCache"
  1651. // disableCache, if set to "true", tells this function to recompute and
  1652. // cache the requested data
  1653. opts.DisableAggregateCostModelCache = r.URL.Query().Get("disableCache") == "true"
  1654. // clearCache, if set to "true", tells this function to flush the cache,
  1655. // then recompute and cache the requested data
  1656. opts.ClearCache = r.URL.Query().Get("clearCache") == "true"
  1657. // noCache avoids the cache altogether, both reading from and writing to
  1658. opts.NoCache = r.URL.Query().Get("noCache") == "true"
  1659. // noExpireCache should only be used by cache warming to set non-expiring caches
  1660. opts.NoExpireCache = false
  1661. // etl triggers ETL adapter
  1662. opts.UseETLAdapter = r.URL.Query().Get("etl") == "true"
  1663. // aggregation field is required
  1664. if field == "" {
  1665. WriteError(w, BadRequest("Missing aggregation field parameter"))
  1666. return
  1667. }
  1668. // aggregation subfield is required when aggregation field is "label"
  1669. if (field == "label" || field == "annotation") && len(subfields) == 0 {
  1670. WriteError(w, BadRequest("Missing aggregation subfield parameter"))
  1671. return
  1672. }
  1673. // enforce one of the available rate options
  1674. if opts.Rate != "" && opts.Rate != "hourly" && opts.Rate != "daily" && opts.Rate != "monthly" {
  1675. WriteError(w, BadRequest("Rate parameter only supports: hourly, daily, monthly or empty"))
  1676. return
  1677. }
  1678. // parse cost data filters
  1679. // namespace and cluster are exact-string-matches
  1680. // labels are expected to be comma-separated and to take the form key=value
  1681. // e.g. app=cost-analyzer,app.kubernetes.io/instance=kubecost
  1682. opts.Filters = map[string]string{
  1683. "namespace": namespace,
  1684. "cluster": cluster,
  1685. "labels": labels,
  1686. "annotations": annotations,
  1687. "podprefix": podprefix,
  1688. }
  1689. // parse shared resources
  1690. sn := []string{}
  1691. sln := []string{}
  1692. slv := []string{}
  1693. if sharedNamespaces != "" {
  1694. sn = strings.Split(sharedNamespaces, ",")
  1695. }
  1696. if sharedLabelNames != "" {
  1697. sln = strings.Split(sharedLabelNames, ",")
  1698. slv = strings.Split(sharedLabelValues, ",")
  1699. if len(sln) != len(slv) || slv[0] == "" {
  1700. WriteError(w, BadRequest("Supply exactly one shared label value per shared label name"))
  1701. return
  1702. }
  1703. }
  1704. if len(sn) > 0 || len(sln) > 0 {
  1705. opts.SharedResources = NewSharedResourceInfo(true, sn, sln, slv)
  1706. }
  1707. // enable remote if it is available and not disabled
  1708. opts.RemoteEnabled = remote && env.IsRemoteEnabled()
  1709. promClient := a.GetPrometheusClient(remote)
  1710. var data map[string]*Aggregation
  1711. var message string
  1712. data, message, err = a.AggAPI.ComputeAggregateCostModel(promClient, tenantID, window, field, subfields, opts)
  1713. // Find any warnings in http request context
  1714. warning, _ := httputil.GetWarning(r)
  1715. if err != nil {
  1716. if emptyErr, ok := err.(*EmptyDataError); ok {
  1717. if warning == "" {
  1718. w.Write(WrapData(map[string]interface{}{}, emptyErr))
  1719. } else {
  1720. w.Write(WrapDataWithWarning(map[string]interface{}{}, emptyErr, warning))
  1721. }
  1722. return
  1723. }
  1724. if boundaryErr, ok := err.(*opencost.BoundaryError); ok {
  1725. if window.Start() != nil && window.Start().After(time.Now().Add(-90*24*time.Hour)) {
  1726. // Asking for data within a 90 day period: it will be available
  1727. // after the pipeline builds
  1728. msg := "Data will be available after ETL is built"
  1729. match := percentRegex.FindStringSubmatch(boundaryErr.Message)
  1730. if len(match) > 1 {
  1731. completionPct, err := strconv.ParseFloat(match[1], 64)
  1732. if err == nil {
  1733. msg = fmt.Sprintf("%s (%.1f%% complete)", msg, completionPct)
  1734. }
  1735. }
  1736. WriteError(w, InternalServerError(msg))
  1737. } else {
  1738. // Boundary error outside of 90 day period; may not be available
  1739. WriteError(w, InternalServerError(boundaryErr.Error()))
  1740. }
  1741. return
  1742. }
  1743. errStr := fmt.Sprintf("error computing aggregate cost model: %s", err)
  1744. WriteError(w, InternalServerError(errStr))
  1745. return
  1746. }
  1747. if warning == "" {
  1748. w.Write(WrapDataWithMessage(data, nil, message))
  1749. } else {
  1750. w.Write(WrapDataWithMessageAndWarning(data, nil, message, warning))
  1751. }
  1752. }
  1753. // ParseAggregationProperties attempts to parse and return aggregation properties
  1754. // encoded under the given key. If none exist, or if parsing fails, an error
  1755. // is returned with empty AllocationProperties.
  1756. func ParseAggregationProperties(aggregations []string) ([]string, error) {
  1757. aggregateBy := []string{}
  1758. // In case of no aggregation option, aggregate to the container, with a key Cluster/Node/Namespace/Pod/Container
  1759. if len(aggregations) == 0 {
  1760. aggregateBy = []string{
  1761. opencost.AllocationClusterProp,
  1762. opencost.AllocationNodeProp,
  1763. opencost.AllocationNamespaceProp,
  1764. opencost.AllocationPodProp,
  1765. opencost.AllocationContainerProp,
  1766. }
  1767. } else if len(aggregations) == 1 && aggregations[0] == "all" {
  1768. aggregateBy = []string{}
  1769. } else {
  1770. for _, agg := range aggregations {
  1771. aggregate := strings.TrimSpace(agg)
  1772. if aggregate != "" {
  1773. if prop, err := opencost.ParseProperty(aggregate); err == nil {
  1774. aggregateBy = append(aggregateBy, string(prop))
  1775. } else if strings.HasPrefix(aggregate, "label:") {
  1776. aggregateBy = append(aggregateBy, aggregate)
  1777. } else if strings.HasPrefix(aggregate, "annotation:") {
  1778. aggregateBy = append(aggregateBy, aggregate)
  1779. }
  1780. }
  1781. }
  1782. }
  1783. return aggregateBy, nil
  1784. }
  1785. func (a *Accesses) ComputeAllocationHandlerSummary(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  1786. w.Header().Set("Content-Type", "application/json")
  1787. qp := httputil.NewQueryParams(r.URL.Query())
  1788. // Window is a required field describing the window of time over which to
  1789. // compute allocation data.
  1790. window, err := opencost.ParseWindowWithOffset(qp.Get("window", ""), env.GetParsedUTCOffset())
  1791. if err != nil {
  1792. http.Error(w, fmt.Sprintf("Invalid 'window' parameter: %s", err), http.StatusBadRequest)
  1793. }
  1794. // Step is an optional parameter that defines the duration per-set, i.e.
  1795. // the window for an AllocationSet, of the AllocationSetRange to be
  1796. // computed. Defaults to the window size, making one set.
  1797. step := qp.GetDuration("step", window.Duration())
  1798. // Resolution is an optional parameter, defaulting to the configured ETL
  1799. // resolution.
  1800. resolution := qp.GetDuration("resolution", env.GetETLResolution())
  1801. // Aggregation is a required comma-separated list of fields by which to
  1802. // aggregate results. Some fields allow a sub-field, which is distinguished
  1803. // with a colon; e.g. "label:app".
  1804. // Examples: "namespace", "namespace,label:app"
  1805. aggregations := qp.GetList("aggregate", ",")
  1806. aggregateBy, err := ParseAggregationProperties(aggregations)
  1807. if err != nil {
  1808. http.Error(w, fmt.Sprintf("Invalid 'aggregate' parameter: %s", err), http.StatusBadRequest)
  1809. }
  1810. // Accumulate is an optional parameter, defaulting to false, which if true
  1811. // sums each Set in the Range, producing one Set.
  1812. accumulate := qp.GetBool("accumulate", false)
  1813. // Query for AllocationSets in increments of the given step duration,
  1814. // appending each to the AllocationSetRange.
  1815. asr := opencost.NewAllocationSetRange()
  1816. stepStart := *window.Start()
  1817. for window.End().After(stepStart) {
  1818. stepEnd := stepStart.Add(step)
  1819. stepWindow := opencost.NewWindow(&stepStart, &stepEnd)
  1820. as, err := a.Model.ComputeAllocation(*stepWindow.Start(), *stepWindow.End(), resolution)
  1821. if err != nil {
  1822. WriteError(w, InternalServerError(err.Error()))
  1823. return
  1824. }
  1825. asr.Append(as)
  1826. stepStart = stepEnd
  1827. }
  1828. // Aggregate, if requested
  1829. if len(aggregateBy) > 0 {
  1830. err = asr.AggregateBy(aggregateBy, nil)
  1831. if err != nil {
  1832. WriteError(w, InternalServerError(err.Error()))
  1833. return
  1834. }
  1835. }
  1836. // Accumulate, if requested
  1837. if accumulate {
  1838. asr, err = asr.Accumulate(opencost.AccumulateOptionAll)
  1839. if err != nil {
  1840. WriteError(w, InternalServerError(err.Error()))
  1841. return
  1842. }
  1843. }
  1844. sasl := []*opencost.SummaryAllocationSet{}
  1845. for _, as := range asr.Slice() {
  1846. sas := opencost.NewSummaryAllocationSet(as, nil, nil, false, false)
  1847. sasl = append(sasl, sas)
  1848. }
  1849. sasr := opencost.NewSummaryAllocationSetRange(sasl...)
  1850. w.Write(WrapData(sasr, nil))
  1851. }
  1852. // ComputeAllocationHandler computes an AllocationSetRange from the CostModel.
  1853. func (a *Accesses) ComputeAllocationHandler(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  1854. w.Header().Set("Content-Type", "application/json")
  1855. qp := httputil.NewQueryParams(r.URL.Query())
  1856. // Window is a required field describing the window of time over which to
  1857. // compute allocation data.
  1858. window, err := opencost.ParseWindowWithOffset(qp.Get("window", ""), env.GetParsedUTCOffset())
  1859. if err != nil {
  1860. http.Error(w, fmt.Sprintf("Invalid 'window' parameter: %s", err), http.StatusBadRequest)
  1861. }
  1862. // Resolution is an optional parameter, defaulting to the configured ETL
  1863. // resolution.
  1864. resolution := qp.GetDuration("resolution", env.GetETLResolution())
  1865. // Step is an optional parameter that defines the duration per-set, i.e.
  1866. // the window for an AllocationSet, of the AllocationSetRange to be
  1867. // computed. Defaults to the window size, making one set.
  1868. step := qp.GetDuration("step", window.Duration())
  1869. // Aggregation is an optional comma-separated list of fields by which to
  1870. // aggregate results. Some fields allow a sub-field, which is distinguished
  1871. // with a colon; e.g. "label:app".
  1872. // Examples: "namespace", "namespace,label:app"
  1873. aggregations := qp.GetList("aggregate", ",")
  1874. aggregateBy, err := ParseAggregationProperties(aggregations)
  1875. if err != nil {
  1876. http.Error(w, fmt.Sprintf("Invalid 'aggregate' parameter: %s", err), http.StatusBadRequest)
  1877. }
  1878. // IncludeIdle, if true, uses Asset data to incorporate Idle Allocation
  1879. includeIdle := qp.GetBool("includeIdle", false)
  1880. // Accumulate is an optional parameter, defaulting to false, which if true
  1881. // sums each Set in the Range, producing one Set.
  1882. accumulate := qp.GetBool("accumulate", false)
  1883. // Accumulate is an optional parameter that accumulates an AllocationSetRange
  1884. // by the resolution of the given time duration.
  1885. // Defaults to 0. If a value is not passed then the parameter is not used.
  1886. accumulateBy := opencost.AccumulateOption(qp.Get("accumulateBy", ""))
  1887. // if accumulateBy is not explicitly set, and accumulate is true, ensure result is accumulated
  1888. if accumulateBy == opencost.AccumulateOptionNone && accumulate {
  1889. accumulateBy = opencost.AccumulateOptionAll
  1890. }
  1891. // IdleByNode, if true, computes idle allocations at the node level.
  1892. // Otherwise it is computed at the cluster level. (Not relevant if idle
  1893. // is not included.)
  1894. idleByNode := qp.GetBool("idleByNode", false)
  1895. sharedLoadBalancer := qp.GetBool("sharelb", false)
  1896. // IncludeProportionalAssetResourceCosts, if true,
  1897. includeProportionalAssetResourceCosts := qp.GetBool("includeProportionalAssetResourceCosts", false)
  1898. // include aggregated labels/annotations if true
  1899. includeAggregatedMetadata := qp.GetBool("includeAggregatedMetadata", false)
  1900. asr, err := a.Model.QueryAllocation(window, resolution, step, aggregateBy, includeIdle, idleByNode, includeProportionalAssetResourceCosts, includeAggregatedMetadata, sharedLoadBalancer, accumulateBy)
  1901. if err != nil {
  1902. if strings.Contains(strings.ToLower(err.Error()), "bad request") {
  1903. WriteError(w, BadRequest(err.Error()))
  1904. } else {
  1905. WriteError(w, InternalServerError(err.Error()))
  1906. }
  1907. return
  1908. }
  1909. w.Write(WrapData(asr, nil))
  1910. }
  1911. // The below was transferred from a different package in order to maintain
  1912. // previous behavior. Ultimately, we should clean this up at some point.
  1913. // TODO move to util and/or standardize everything
  1914. type Error struct {
  1915. StatusCode int
  1916. Body string
  1917. }
  1918. func WriteError(w http.ResponseWriter, err Error) {
  1919. status := err.StatusCode
  1920. if status == 0 {
  1921. status = http.StatusInternalServerError
  1922. }
  1923. w.WriteHeader(status)
  1924. resp, _ := json.Marshal(&Response{
  1925. Code: status,
  1926. Message: fmt.Sprintf("Error: %s", err.Body),
  1927. })
  1928. w.Write(resp)
  1929. }
  1930. func BadRequest(message string) Error {
  1931. return Error{
  1932. StatusCode: http.StatusBadRequest,
  1933. Body: message,
  1934. }
  1935. }
  1936. func InternalServerError(message string) Error {
  1937. if message == "" {
  1938. message = "Internal Server Error"
  1939. }
  1940. return Error{
  1941. StatusCode: http.StatusInternalServerError,
  1942. Body: message,
  1943. }
  1944. }
  1945. func NotFound() Error {
  1946. return Error{
  1947. StatusCode: http.StatusNotFound,
  1948. Body: "Not Found",
  1949. }
  1950. }