aggregation.go 85 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343
  1. package costmodel
  2. import (
  3. "fmt"
  4. "math"
  5. "net/http"
  6. "regexp"
  7. "sort"
  8. "strconv"
  9. "strings"
  10. "time"
  11. "github.com/opencost/opencost/pkg/util/httputil"
  12. "github.com/opencost/opencost/pkg/util/timeutil"
  13. "github.com/julienschmidt/httprouter"
  14. "github.com/opencost/opencost/pkg/cloud"
  15. "github.com/opencost/opencost/pkg/env"
  16. "github.com/opencost/opencost/pkg/errors"
  17. "github.com/opencost/opencost/pkg/kubecost"
  18. "github.com/opencost/opencost/pkg/log"
  19. "github.com/opencost/opencost/pkg/prom"
  20. "github.com/opencost/opencost/pkg/thanos"
  21. "github.com/opencost/opencost/pkg/util"
  22. "github.com/opencost/opencost/pkg/util/json"
  23. "github.com/patrickmn/go-cache"
  24. prometheusClient "github.com/prometheus/client_golang/api"
  25. )
  26. const (
  27. // SplitTypeWeighted signals that shared costs should be shared
  28. // proportionally, rather than evenly
  29. SplitTypeWeighted = "weighted"
  30. // UnallocatedSubfield indicates an allocation datum that does not have the
  31. // chosen Aggregator; e.g. during aggregation by some label, there may be
  32. // cost data that do not have the given label.
  33. UnallocatedSubfield = "__unallocated__"
  34. clusterCostsCacheMinutes = 5.0
  35. )
  36. // Aggregation describes aggregated cost data, containing cumulative cost and
  37. // allocation data per resource, vectors of rate data per resource, efficiency
  38. // data, and metadata describing the type of aggregation operation.
  39. type Aggregation struct {
  40. Aggregator string `json:"aggregation"`
  41. Subfields []string `json:"subfields,omitempty"`
  42. Environment string `json:"environment"`
  43. Cluster string `json:"cluster,omitempty"`
  44. Properties *kubecost.AllocationProperties `json:"-"`
  45. Start time.Time `json:"-"`
  46. End time.Time `json:"-"`
  47. CPUAllocationHourlyAverage float64 `json:"cpuAllocationAverage"`
  48. CPUAllocationVectors []*util.Vector `json:"-"`
  49. CPUAllocationTotal float64 `json:"-"`
  50. CPUCost float64 `json:"cpuCost"`
  51. CPUCostVector []*util.Vector `json:"cpuCostVector,omitempty"`
  52. CPUEfficiency float64 `json:"cpuEfficiency"`
  53. CPURequestedVectors []*util.Vector `json:"-"`
  54. CPUUsedVectors []*util.Vector `json:"-"`
  55. Efficiency float64 `json:"efficiency"`
  56. GPUAllocationHourlyAverage float64 `json:"gpuAllocationAverage"`
  57. GPUAllocationVectors []*util.Vector `json:"-"`
  58. GPUCost float64 `json:"gpuCost"`
  59. GPUCostVector []*util.Vector `json:"gpuCostVector,omitempty"`
  60. GPUAllocationTotal float64 `json:"-"`
  61. RAMAllocationHourlyAverage float64 `json:"ramAllocationAverage"`
  62. RAMAllocationVectors []*util.Vector `json:"-"`
  63. RAMAllocationTotal float64 `json:"-"`
  64. RAMCost float64 `json:"ramCost"`
  65. RAMCostVector []*util.Vector `json:"ramCostVector,omitempty"`
  66. RAMEfficiency float64 `json:"ramEfficiency"`
  67. RAMRequestedVectors []*util.Vector `json:"-"`
  68. RAMUsedVectors []*util.Vector `json:"-"`
  69. PVAllocationHourlyAverage float64 `json:"pvAllocationAverage"`
  70. PVAllocationVectors []*util.Vector `json:"-"`
  71. PVAllocationTotal float64 `json:"-"`
  72. PVCost float64 `json:"pvCost"`
  73. PVCostVector []*util.Vector `json:"pvCostVector,omitempty"`
  74. NetworkCost float64 `json:"networkCost"`
  75. NetworkCostVector []*util.Vector `json:"networkCostVector,omitempty"`
  76. SharedCost float64 `json:"sharedCost"`
  77. TotalCost float64 `json:"totalCost"`
  78. TotalCostVector []*util.Vector `json:"totalCostVector,omitempty"`
  79. }
  80. // TotalHours determines the amount of hours the Aggregation covers, as a
  81. // function of the cost vectors and the resolution of those vectors' data
  82. func (a *Aggregation) TotalHours(resolutionHours float64) float64 {
  83. length := 1
  84. if length < len(a.CPUCostVector) {
  85. length = len(a.CPUCostVector)
  86. }
  87. if length < len(a.RAMCostVector) {
  88. length = len(a.RAMCostVector)
  89. }
  90. if length < len(a.PVCostVector) {
  91. length = len(a.PVCostVector)
  92. }
  93. if length < len(a.GPUCostVector) {
  94. length = len(a.GPUCostVector)
  95. }
  96. if length < len(a.NetworkCostVector) {
  97. length = len(a.NetworkCostVector)
  98. }
  99. return float64(length) * resolutionHours
  100. }
  101. // RateCoefficient computes the coefficient by which the total cost needs to be
  102. // multiplied in order to convert totals costs into per-rate costs.
  103. func (a *Aggregation) RateCoefficient(rateStr string, resolutionHours float64) float64 {
  104. // monthly rate = (730.0)*(total cost)/(total hours)
  105. // daily rate = (24.0)*(total cost)/(total hours)
  106. // hourly rate = (1.0)*(total cost)/(total hours)
  107. // default to hourly rate
  108. coeff := 1.0
  109. switch rateStr {
  110. case "daily":
  111. coeff = timeutil.HoursPerDay
  112. case "monthly":
  113. coeff = timeutil.HoursPerMonth
  114. }
  115. return coeff / a.TotalHours(resolutionHours)
  116. }
  117. type SharedResourceInfo struct {
  118. ShareResources bool
  119. SharedNamespace map[string]bool
  120. LabelSelectors map[string]map[string]bool
  121. }
  122. type SharedCostInfo struct {
  123. Name string
  124. Cost float64
  125. ShareType string
  126. }
  127. func (s *SharedResourceInfo) IsSharedResource(costDatum *CostData) bool {
  128. // exists in a shared namespace
  129. if _, ok := s.SharedNamespace[costDatum.Namespace]; ok {
  130. return true
  131. }
  132. // has at least one shared label (OR, not AND in the case of multiple labels)
  133. for labelName, labelValues := range s.LabelSelectors {
  134. if val, ok := costDatum.Labels[labelName]; ok && labelValues[val] {
  135. return true
  136. }
  137. }
  138. return false
  139. }
  140. func NewSharedResourceInfo(shareResources bool, sharedNamespaces []string, labelNames []string, labelValues []string) *SharedResourceInfo {
  141. sr := &SharedResourceInfo{
  142. ShareResources: shareResources,
  143. SharedNamespace: make(map[string]bool),
  144. LabelSelectors: make(map[string]map[string]bool),
  145. }
  146. for _, ns := range sharedNamespaces {
  147. sr.SharedNamespace[strings.Trim(ns, " ")] = true
  148. }
  149. // Creating a map of label name to label value, but only if
  150. // the cardinality matches
  151. if len(labelNames) == len(labelValues) {
  152. for i := range labelNames {
  153. cleanedLname := prom.SanitizeLabelName(strings.Trim(labelNames[i], " "))
  154. if values, ok := sr.LabelSelectors[cleanedLname]; ok {
  155. values[strings.Trim(labelValues[i], " ")] = true
  156. } else {
  157. sr.LabelSelectors[cleanedLname] = map[string]bool{strings.Trim(labelValues[i], " "): true}
  158. }
  159. }
  160. }
  161. return sr
  162. }
  163. func GetTotalContainerCost(costData map[string]*CostData, rate string, cp cloud.Provider, discount float64, customDiscount float64, idleCoefficients map[string]float64) float64 {
  164. totalContainerCost := 0.0
  165. for _, costDatum := range costData {
  166. clusterID := costDatum.ClusterID
  167. cpuv, ramv, gpuv, pvvs, netv := getPriceVectors(cp, costDatum, rate, discount, customDiscount, idleCoefficients[clusterID])
  168. totalContainerCost += totalVectors(cpuv)
  169. totalContainerCost += totalVectors(ramv)
  170. totalContainerCost += totalVectors(gpuv)
  171. for _, pv := range pvvs {
  172. totalContainerCost += totalVectors(pv)
  173. }
  174. totalContainerCost += totalVectors(netv)
  175. }
  176. return totalContainerCost
  177. }
  178. func (a *Accesses) ComputeIdleCoefficient(costData map[string]*CostData, cli prometheusClient.Client, cp cloud.Provider, discount float64, customDiscount float64, window, offset time.Duration) (map[string]float64, error) {
  179. coefficients := make(map[string]float64)
  180. profileName := "ComputeIdleCoefficient: ComputeClusterCosts"
  181. profileStart := time.Now()
  182. var clusterCosts map[string]*ClusterCosts
  183. var err error
  184. fmtWindow, fmtOffset := timeutil.DurationOffsetStrings(window, offset)
  185. key := fmt.Sprintf("%s:%s", fmtWindow, fmtOffset)
  186. if data, valid := a.ClusterCostsCache.Get(key); valid {
  187. clusterCosts = data.(map[string]*ClusterCosts)
  188. } else {
  189. clusterCosts, err = a.ComputeClusterCosts(cli, cp, window, offset, false)
  190. if err != nil {
  191. return nil, err
  192. }
  193. }
  194. measureTime(profileStart, profileThreshold, profileName)
  195. for cid, costs := range clusterCosts {
  196. if costs.CPUCumulative == 0 && costs.RAMCumulative == 0 && costs.StorageCumulative == 0 {
  197. log.Warnf("No ClusterCosts data for cluster '%s'. Is it emitting data?", cid)
  198. coefficients[cid] = 1.0
  199. continue
  200. }
  201. if costs.TotalCumulative == 0 {
  202. return nil, fmt.Errorf("TotalCumulative cluster cost for cluster '%s' returned 0 over window '%s' offset '%s'", cid, fmtWindow, fmtOffset)
  203. }
  204. totalContainerCost := 0.0
  205. for _, costDatum := range costData {
  206. if costDatum.ClusterID == cid {
  207. cpuv, ramv, gpuv, pvvs, _ := getPriceVectors(cp, costDatum, "", discount, customDiscount, 1)
  208. totalContainerCost += totalVectors(cpuv)
  209. totalContainerCost += totalVectors(ramv)
  210. totalContainerCost += totalVectors(gpuv)
  211. for _, pv := range pvvs {
  212. totalContainerCost += totalVectors(pv)
  213. }
  214. }
  215. }
  216. coeff := totalContainerCost / costs.TotalCumulative
  217. coefficients[cid] = coeff
  218. }
  219. return coefficients, nil
  220. }
  221. // AggregationOptions provides optional parameters to AggregateCostData, allowing callers to perform more complex operations
  222. type AggregationOptions struct {
  223. Discount float64 // percent by which to discount CPU, RAM, and GPU cost
  224. CustomDiscount float64 // additional custom discount applied to all prices
  225. IdleCoefficients map[string]float64 // scales costs by amount of idle resources on a per-cluster basis
  226. IncludeEfficiency bool // set to true to receive efficiency/usage data
  227. IncludeTimeSeries bool // set to true to receive time series data
  228. Rate string // set to "hourly", "daily", or "monthly" to receive cost rate, rather than cumulative cost
  229. ResolutionHours float64
  230. SharedResourceInfo *SharedResourceInfo
  231. SharedCosts map[string]*SharedCostInfo
  232. FilteredContainerCount int
  233. FilteredEnvironments map[string]int
  234. SharedSplit string
  235. TotalContainerCost float64
  236. }
  237. // Helper method to test request/usgae values against allocation averages for efficiency scores. Generate a warning log if
  238. // clamp is required
  239. func clampAverage(requestsAvg float64, usedAverage float64, allocationAvg float64, resource string) (float64, float64) {
  240. rAvg := requestsAvg
  241. if rAvg > allocationAvg {
  242. log.Debugf("Average %s Requested (%f) > Average %s Allocated (%f). Clamping.", resource, rAvg, resource, allocationAvg)
  243. rAvg = allocationAvg
  244. }
  245. uAvg := usedAverage
  246. if uAvg > allocationAvg {
  247. log.Debugf(" Average %s Used (%f) > Average %s Allocated (%f). Clamping.", resource, uAvg, resource, allocationAvg)
  248. uAvg = allocationAvg
  249. }
  250. return rAvg, uAvg
  251. }
  252. // AggregateCostData aggregates raw cost data by field; e.g. namespace, cluster, service, or label. In the case of label, callers
  253. // must pass a slice of subfields indicating the labels by which to group. Provider is used to define custom resource pricing.
  254. // See AggregationOptions for optional parameters.
  255. func AggregateCostData(costData map[string]*CostData, field string, subfields []string, cp cloud.Provider, opts *AggregationOptions) map[string]*Aggregation {
  256. discount := opts.Discount
  257. customDiscount := opts.CustomDiscount
  258. idleCoefficients := opts.IdleCoefficients
  259. includeTimeSeries := opts.IncludeTimeSeries
  260. includeEfficiency := opts.IncludeEfficiency
  261. rate := opts.Rate
  262. sr := opts.SharedResourceInfo
  263. resolutionHours := 1.0
  264. if opts.ResolutionHours > 0.0 {
  265. resolutionHours = opts.ResolutionHours
  266. }
  267. if idleCoefficients == nil {
  268. idleCoefficients = make(map[string]float64)
  269. }
  270. // aggregations collects key-value pairs of resource group-to-aggregated data
  271. // e.g. namespace-to-data or label-value-to-data
  272. aggregations := make(map[string]*Aggregation)
  273. // sharedResourceCost is the running total cost of resources that should be reported
  274. // as shared across all other resources, rather than reported as a stand-alone category
  275. sharedResourceCost := 0.0
  276. for _, costDatum := range costData {
  277. idleCoefficient, ok := idleCoefficients[costDatum.ClusterID]
  278. if !ok {
  279. idleCoefficient = 1.0
  280. }
  281. if sr != nil && sr.ShareResources && sr.IsSharedResource(costDatum) {
  282. cpuv, ramv, gpuv, pvvs, netv := getPriceVectors(cp, costDatum, rate, discount, customDiscount, idleCoefficient)
  283. sharedResourceCost += totalVectors(cpuv)
  284. sharedResourceCost += totalVectors(ramv)
  285. sharedResourceCost += totalVectors(gpuv)
  286. sharedResourceCost += totalVectors(netv)
  287. for _, pv := range pvvs {
  288. sharedResourceCost += totalVectors(pv)
  289. }
  290. } else {
  291. if field == "cluster" {
  292. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.ClusterID, discount, customDiscount, idleCoefficient, false)
  293. } else if field == "node" {
  294. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.NodeName, discount, customDiscount, idleCoefficient, false)
  295. } else if field == "namespace" {
  296. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace, discount, customDiscount, idleCoefficient, false)
  297. } else if field == "service" {
  298. if len(costDatum.Services) > 0 {
  299. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace+"/"+costDatum.Services[0], discount, customDiscount, idleCoefficient, false)
  300. } else {
  301. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, UnallocatedSubfield, discount, customDiscount, idleCoefficient, false)
  302. }
  303. } else if field == "deployment" {
  304. if len(costDatum.Deployments) > 0 {
  305. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace+"/"+costDatum.Deployments[0], discount, customDiscount, idleCoefficient, false)
  306. } else {
  307. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, UnallocatedSubfield, discount, customDiscount, idleCoefficient, false)
  308. }
  309. } else if field == "statefulset" {
  310. if len(costDatum.Statefulsets) > 0 {
  311. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace+"/"+costDatum.Statefulsets[0], discount, customDiscount, idleCoefficient, false)
  312. } else {
  313. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, UnallocatedSubfield, discount, customDiscount, idleCoefficient, false)
  314. }
  315. } else if field == "daemonset" {
  316. if len(costDatum.Daemonsets) > 0 {
  317. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace+"/"+costDatum.Daemonsets[0], discount, customDiscount, idleCoefficient, false)
  318. } else {
  319. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, UnallocatedSubfield, discount, customDiscount, idleCoefficient, false)
  320. }
  321. } else if field == "controller" {
  322. if controller, kind, hasController := costDatum.GetController(); hasController {
  323. key := fmt.Sprintf("%s/%s:%s", costDatum.Namespace, kind, controller)
  324. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, key, discount, customDiscount, idleCoefficient, false)
  325. } else {
  326. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, UnallocatedSubfield, discount, customDiscount, idleCoefficient, false)
  327. }
  328. } else if field == "label" {
  329. found := false
  330. if costDatum.Labels != nil {
  331. for _, sf := range subfields {
  332. if subfieldName, ok := costDatum.Labels[sf]; ok {
  333. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, subfieldName, discount, customDiscount, idleCoefficient, false)
  334. found = true
  335. break
  336. }
  337. }
  338. }
  339. if !found {
  340. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, UnallocatedSubfield, discount, customDiscount, idleCoefficient, false)
  341. }
  342. } else if field == "annotation" {
  343. found := false
  344. if costDatum.Annotations != nil {
  345. for _, sf := range subfields {
  346. if subfieldName, ok := costDatum.Annotations[sf]; ok {
  347. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, subfieldName, discount, customDiscount, idleCoefficient, false)
  348. found = true
  349. break
  350. }
  351. }
  352. }
  353. if !found {
  354. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, UnallocatedSubfield, discount, customDiscount, idleCoefficient, false)
  355. }
  356. } else if field == "pod" {
  357. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace+"/"+costDatum.PodName, discount, customDiscount, idleCoefficient, false)
  358. } else if field == "container" {
  359. key := fmt.Sprintf("%s/%s/%s/%s", costDatum.ClusterID, costDatum.Namespace, costDatum.PodName, costDatum.Name)
  360. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, key, discount, customDiscount, idleCoefficient, true)
  361. }
  362. }
  363. }
  364. for key, agg := range aggregations {
  365. sharedCoefficient := 1 / float64(len(opts.FilteredEnvironments)+len(aggregations))
  366. agg.CPUCost = totalVectors(agg.CPUCostVector)
  367. agg.RAMCost = totalVectors(agg.RAMCostVector)
  368. agg.GPUCost = totalVectors(agg.GPUCostVector)
  369. agg.PVCost = totalVectors(agg.PVCostVector)
  370. agg.NetworkCost = totalVectors(agg.NetworkCostVector)
  371. if opts.SharedSplit == SplitTypeWeighted {
  372. d := opts.TotalContainerCost - sharedResourceCost
  373. if d == 0 {
  374. log.Warnf("Total container cost '%f' and shared resource cost '%f are the same'. Setting sharedCoefficient to 1", opts.TotalContainerCost, sharedResourceCost)
  375. sharedCoefficient = 1.0
  376. } else {
  377. sharedCoefficient = (agg.CPUCost + agg.RAMCost + agg.GPUCost + agg.PVCost + agg.NetworkCost) / d
  378. }
  379. }
  380. agg.SharedCost = sharedResourceCost * sharedCoefficient
  381. for _, v := range opts.SharedCosts {
  382. agg.SharedCost += v.Cost * sharedCoefficient
  383. }
  384. if rate != "" {
  385. rateCoeff := agg.RateCoefficient(rate, resolutionHours)
  386. agg.CPUCost *= rateCoeff
  387. agg.RAMCost *= rateCoeff
  388. agg.GPUCost *= rateCoeff
  389. agg.PVCost *= rateCoeff
  390. agg.NetworkCost *= rateCoeff
  391. agg.SharedCost *= rateCoeff
  392. }
  393. agg.TotalCost = agg.CPUCost + agg.RAMCost + agg.GPUCost + agg.PVCost + agg.NetworkCost + agg.SharedCost
  394. // Evicted and Completed Pods can still show up here, but have 0 cost.
  395. // Filter these by default. Any reason to keep them?
  396. if agg.TotalCost == 0 {
  397. delete(aggregations, key)
  398. continue
  399. }
  400. // CPU, RAM, and PV allocation are cumulative per-datum, whereas GPU is rate per-datum
  401. agg.CPUAllocationHourlyAverage = totalVectors(agg.CPUAllocationVectors) / agg.TotalHours(resolutionHours)
  402. agg.RAMAllocationHourlyAverage = totalVectors(agg.RAMAllocationVectors) / agg.TotalHours(resolutionHours)
  403. agg.GPUAllocationHourlyAverage = averageVectors(agg.GPUAllocationVectors)
  404. agg.PVAllocationHourlyAverage = totalVectors(agg.PVAllocationVectors) / agg.TotalHours(resolutionHours)
  405. // TODO niko/etl does this check out for GPU data? Do we need to rewrite GPU queries to be
  406. // culumative?
  407. agg.CPUAllocationTotal = totalVectors(agg.CPUAllocationVectors)
  408. agg.GPUAllocationTotal = totalVectors(agg.GPUAllocationVectors)
  409. agg.PVAllocationTotal = totalVectors(agg.PVAllocationVectors)
  410. agg.RAMAllocationTotal = totalVectors(agg.RAMAllocationVectors)
  411. if includeEfficiency {
  412. // Default both RAM and CPU to 0% efficiency so that a 0-requested, 0-allocated, 0-used situation
  413. // returns 0% efficiency, which should be a red-flag.
  414. //
  415. // If non-zero numbers are available, then efficiency is defined as:
  416. // idlePercentage = (requested - used) / allocated
  417. // efficiency = (1.0 - idlePercentage)
  418. //
  419. // It is possible to score > 100% efficiency, which is meant to be interpreted as a red flag.
  420. // It is not possible to score < 0% efficiency.
  421. agg.CPUEfficiency = 0.0
  422. CPUIdle := 0.0
  423. if agg.CPUAllocationHourlyAverage > 0.0 {
  424. avgCPURequested := averageVectors(agg.CPURequestedVectors)
  425. avgCPUUsed := averageVectors(agg.CPUUsedVectors)
  426. // Clamp averages, log range violations
  427. avgCPURequested, avgCPUUsed = clampAverage(avgCPURequested, avgCPUUsed, agg.CPUAllocationHourlyAverage, "CPU")
  428. CPUIdle = ((avgCPURequested - avgCPUUsed) / agg.CPUAllocationHourlyAverage)
  429. agg.CPUEfficiency = 1.0 - CPUIdle
  430. }
  431. agg.RAMEfficiency = 0.0
  432. RAMIdle := 0.0
  433. if agg.RAMAllocationHourlyAverage > 0.0 {
  434. avgRAMRequested := averageVectors(agg.RAMRequestedVectors)
  435. avgRAMUsed := averageVectors(agg.RAMUsedVectors)
  436. // Clamp averages, log range violations
  437. avgRAMRequested, avgRAMUsed = clampAverage(avgRAMRequested, avgRAMUsed, agg.RAMAllocationHourlyAverage, "RAM")
  438. RAMIdle = ((avgRAMRequested - avgRAMUsed) / agg.RAMAllocationHourlyAverage)
  439. agg.RAMEfficiency = 1.0 - RAMIdle
  440. }
  441. // Score total efficiency by the sum of CPU and RAM efficiency, weighted by their
  442. // respective total costs.
  443. agg.Efficiency = 0.0
  444. if (agg.CPUCost + agg.RAMCost) > 0 {
  445. agg.Efficiency = ((agg.CPUCost * agg.CPUEfficiency) + (agg.RAMCost * agg.RAMEfficiency)) / (agg.CPUCost + agg.RAMCost)
  446. }
  447. }
  448. // convert RAM from bytes to GiB
  449. agg.RAMAllocationHourlyAverage = agg.RAMAllocationHourlyAverage / 1024 / 1024 / 1024
  450. // convert storage from bytes to GiB
  451. agg.PVAllocationHourlyAverage = agg.PVAllocationHourlyAverage / 1024 / 1024 / 1024
  452. // remove time series data if it is not explicitly requested
  453. if !includeTimeSeries {
  454. agg.CPUCostVector = nil
  455. agg.RAMCostVector = nil
  456. agg.GPUCostVector = nil
  457. agg.PVCostVector = nil
  458. agg.NetworkCostVector = nil
  459. agg.TotalCostVector = nil
  460. } else { // otherwise compute a totalcostvector
  461. v1 := addVectors(agg.CPUCostVector, agg.RAMCostVector)
  462. v2 := addVectors(v1, agg.GPUCostVector)
  463. v3 := addVectors(v2, agg.PVCostVector)
  464. v4 := addVectors(v3, agg.NetworkCostVector)
  465. agg.TotalCostVector = v4
  466. }
  467. // Typesafety checks
  468. if math.IsNaN(agg.CPUAllocationHourlyAverage) || math.IsInf(agg.CPUAllocationHourlyAverage, 0) {
  469. log.Warnf("CPUAllocationHourlyAverage is %f for '%s: %s/%s'", agg.CPUAllocationHourlyAverage, agg.Cluster, agg.Aggregator, agg.Environment)
  470. agg.CPUAllocationHourlyAverage = 0
  471. }
  472. if math.IsNaN(agg.CPUCost) || math.IsInf(agg.CPUCost, 0) {
  473. log.Warnf("CPUCost is %f for '%s: %s/%s'", agg.CPUCost, agg.Cluster, agg.Aggregator, agg.Environment)
  474. agg.CPUCost = 0
  475. }
  476. if math.IsNaN(agg.CPUEfficiency) || math.IsInf(agg.CPUEfficiency, 0) {
  477. log.Warnf("CPUEfficiency is %f for '%s: %s/%s'", agg.CPUEfficiency, agg.Cluster, agg.Aggregator, agg.Environment)
  478. agg.CPUEfficiency = 0
  479. }
  480. if math.IsNaN(agg.Efficiency) || math.IsInf(agg.Efficiency, 0) {
  481. log.Warnf("Efficiency is %f for '%s: %s/%s'", agg.Efficiency, agg.Cluster, agg.Aggregator, agg.Environment)
  482. agg.Efficiency = 0
  483. }
  484. if math.IsNaN(agg.GPUAllocationHourlyAverage) || math.IsInf(agg.GPUAllocationHourlyAverage, 0) {
  485. log.Warnf("GPUAllocationHourlyAverage is %f for '%s: %s/%s'", agg.GPUAllocationHourlyAverage, agg.Cluster, agg.Aggregator, agg.Environment)
  486. agg.GPUAllocationHourlyAverage = 0
  487. }
  488. if math.IsNaN(agg.GPUCost) || math.IsInf(agg.GPUCost, 0) {
  489. log.Warnf("GPUCost is %f for '%s: %s/%s'", agg.GPUCost, agg.Cluster, agg.Aggregator, agg.Environment)
  490. agg.GPUCost = 0
  491. }
  492. if math.IsNaN(agg.RAMAllocationHourlyAverage) || math.IsInf(agg.RAMAllocationHourlyAverage, 0) {
  493. log.Warnf("RAMAllocationHourlyAverage is %f for '%s: %s/%s'", agg.RAMAllocationHourlyAverage, agg.Cluster, agg.Aggregator, agg.Environment)
  494. agg.RAMAllocationHourlyAverage = 0
  495. }
  496. if math.IsNaN(agg.RAMCost) || math.IsInf(agg.RAMCost, 0) {
  497. log.Warnf("RAMCost is %f for '%s: %s/%s'", agg.RAMCost, agg.Cluster, agg.Aggregator, agg.Environment)
  498. agg.RAMCost = 0
  499. }
  500. if math.IsNaN(agg.RAMEfficiency) || math.IsInf(agg.RAMEfficiency, 0) {
  501. log.Warnf("RAMEfficiency is %f for '%s: %s/%s'", agg.RAMEfficiency, agg.Cluster, agg.Aggregator, agg.Environment)
  502. agg.RAMEfficiency = 0
  503. }
  504. if math.IsNaN(agg.PVAllocationHourlyAverage) || math.IsInf(agg.PVAllocationHourlyAverage, 0) {
  505. log.Warnf("PVAllocationHourlyAverage is %f for '%s: %s/%s'", agg.PVAllocationHourlyAverage, agg.Cluster, agg.Aggregator, agg.Environment)
  506. agg.PVAllocationHourlyAverage = 0
  507. }
  508. if math.IsNaN(agg.PVCost) || math.IsInf(agg.PVCost, 0) {
  509. log.Warnf("PVCost is %f for '%s: %s/%s'", agg.PVCost, agg.Cluster, agg.Aggregator, agg.Environment)
  510. agg.PVCost = 0
  511. }
  512. if math.IsNaN(agg.NetworkCost) || math.IsInf(agg.NetworkCost, 0) {
  513. log.Warnf("NetworkCost is %f for '%s: %s/%s'", agg.NetworkCost, agg.Cluster, agg.Aggregator, agg.Environment)
  514. agg.NetworkCost = 0
  515. }
  516. if math.IsNaN(agg.SharedCost) || math.IsInf(agg.SharedCost, 0) {
  517. log.Warnf("SharedCost is %f for '%s: %s/%s'", agg.SharedCost, agg.Cluster, agg.Aggregator, agg.Environment)
  518. agg.SharedCost = 0
  519. }
  520. if math.IsNaN(agg.TotalCost) || math.IsInf(agg.TotalCost, 0) {
  521. log.Warnf("TotalCost is %f for '%s: %s/%s'", agg.TotalCost, agg.Cluster, agg.Aggregator, agg.Environment)
  522. agg.TotalCost = 0
  523. }
  524. }
  525. return aggregations
  526. }
  527. func aggregateDatum(cp cloud.Provider, aggregations map[string]*Aggregation, costDatum *CostData, field string, subfields []string, rate string, key string, discount float64, customDiscount float64, idleCoefficient float64, includeProperties bool) {
  528. // add new entry to aggregation results if a new key is encountered
  529. if _, ok := aggregations[key]; !ok {
  530. agg := &Aggregation{
  531. Aggregator: field,
  532. Environment: key,
  533. }
  534. if len(subfields) > 0 {
  535. agg.Subfields = subfields
  536. }
  537. if includeProperties {
  538. props := &kubecost.AllocationProperties{}
  539. props.Cluster = costDatum.ClusterID
  540. props.Node = costDatum.NodeName
  541. if controller, kind, hasController := costDatum.GetController(); hasController {
  542. props.Controller = controller
  543. props.ControllerKind = kind
  544. }
  545. props.Labels = costDatum.Labels
  546. props.Annotations = costDatum.Annotations
  547. props.Namespace = costDatum.Namespace
  548. props.Pod = costDatum.PodName
  549. props.Services = costDatum.Services
  550. props.Container = costDatum.Name
  551. agg.Properties = props
  552. }
  553. aggregations[key] = agg
  554. }
  555. mergeVectors(cp, costDatum, aggregations[key], rate, discount, customDiscount, idleCoefficient)
  556. }
  557. func mergeVectors(cp cloud.Provider, costDatum *CostData, aggregation *Aggregation, rate string, discount float64, customDiscount float64, idleCoefficient float64) {
  558. aggregation.CPUAllocationVectors = addVectors(costDatum.CPUAllocation, aggregation.CPUAllocationVectors)
  559. aggregation.CPURequestedVectors = addVectors(costDatum.CPUReq, aggregation.CPURequestedVectors)
  560. aggregation.CPUUsedVectors = addVectors(costDatum.CPUUsed, aggregation.CPUUsedVectors)
  561. aggregation.RAMAllocationVectors = addVectors(costDatum.RAMAllocation, aggregation.RAMAllocationVectors)
  562. aggregation.RAMRequestedVectors = addVectors(costDatum.RAMReq, aggregation.RAMRequestedVectors)
  563. aggregation.RAMUsedVectors = addVectors(costDatum.RAMUsed, aggregation.RAMUsedVectors)
  564. aggregation.GPUAllocationVectors = addVectors(costDatum.GPUReq, aggregation.GPUAllocationVectors)
  565. for _, pvcd := range costDatum.PVCData {
  566. aggregation.PVAllocationVectors = addVectors(pvcd.Values, aggregation.PVAllocationVectors)
  567. }
  568. cpuv, ramv, gpuv, pvvs, netv := getPriceVectors(cp, costDatum, rate, discount, customDiscount, idleCoefficient)
  569. aggregation.CPUCostVector = addVectors(cpuv, aggregation.CPUCostVector)
  570. aggregation.RAMCostVector = addVectors(ramv, aggregation.RAMCostVector)
  571. aggregation.GPUCostVector = addVectors(gpuv, aggregation.GPUCostVector)
  572. aggregation.NetworkCostVector = addVectors(netv, aggregation.NetworkCostVector)
  573. for _, vectorList := range pvvs {
  574. aggregation.PVCostVector = addVectors(aggregation.PVCostVector, vectorList)
  575. }
  576. }
  577. // Returns the blended discounts applied to the node as a result of global discounts and reserved instance
  578. // discounts
  579. func getDiscounts(costDatum *CostData, cpuCost float64, ramCost float64, discount float64) (float64, float64) {
  580. if costDatum.NodeData == nil {
  581. return discount, discount
  582. }
  583. if costDatum.NodeData.IsSpot() {
  584. return 0, 0
  585. }
  586. reserved := costDatum.NodeData.Reserved
  587. // blended discounts
  588. blendedCPUDiscount := discount
  589. blendedRAMDiscount := discount
  590. if reserved != nil && reserved.CPUCost > 0 && reserved.RAMCost > 0 {
  591. reservedCPUDiscount := 0.0
  592. if cpuCost == 0 {
  593. log.Warnf("No cpu cost found for cluster '%s' node '%s'", costDatum.ClusterID, costDatum.NodeName)
  594. } else {
  595. reservedCPUDiscount = 1.0 - (reserved.CPUCost / cpuCost)
  596. }
  597. reservedRAMDiscount := 0.0
  598. if ramCost == 0 {
  599. log.Warnf("No ram cost found for cluster '%s' node '%s'", costDatum.ClusterID, costDatum.NodeName)
  600. } else {
  601. reservedRAMDiscount = 1.0 - (reserved.RAMCost / ramCost)
  602. }
  603. // AWS passes the # of reserved CPU and RAM as -1 to represent "All"
  604. if reserved.ReservedCPU < 0 && reserved.ReservedRAM < 0 {
  605. blendedCPUDiscount = reservedCPUDiscount
  606. blendedRAMDiscount = reservedRAMDiscount
  607. } else {
  608. nodeCPU, ierr := strconv.ParseInt(costDatum.NodeData.VCPU, 10, 64)
  609. nodeRAM, ferr := strconv.ParseFloat(costDatum.NodeData.RAMBytes, 64)
  610. if ierr == nil && ferr == nil {
  611. nodeRAMGB := nodeRAM / 1024 / 1024 / 1024
  612. reservedRAMGB := float64(reserved.ReservedRAM) / 1024 / 1024 / 1024
  613. nonReservedCPU := nodeCPU - reserved.ReservedCPU
  614. nonReservedRAM := nodeRAMGB - reservedRAMGB
  615. if nonReservedCPU == 0 {
  616. blendedCPUDiscount = reservedCPUDiscount
  617. } else {
  618. if nodeCPU == 0 {
  619. log.Warnf("No ram found for cluster '%s' node '%s'", costDatum.ClusterID, costDatum.NodeName)
  620. } else {
  621. blendedCPUDiscount = (float64(reserved.ReservedCPU) * reservedCPUDiscount) + (float64(nonReservedCPU)*discount)/float64(nodeCPU)
  622. }
  623. }
  624. if nonReservedRAM == 0 {
  625. blendedRAMDiscount = reservedRAMDiscount
  626. } else {
  627. if nodeRAMGB == 0 {
  628. log.Warnf("No ram found for cluster '%s' node '%s'", costDatum.ClusterID, costDatum.NodeName)
  629. } else {
  630. blendedRAMDiscount = (reservedRAMGB * reservedRAMDiscount) + (nonReservedRAM*discount)/nodeRAMGB
  631. }
  632. }
  633. }
  634. }
  635. }
  636. return blendedCPUDiscount, blendedRAMDiscount
  637. }
  638. func parseVectorPricing(cfg *cloud.CustomPricing, costDatum *CostData, cpuCostStr, ramCostStr, gpuCostStr, pvCostStr string) (float64, float64, float64, float64, bool) {
  639. usesCustom := false
  640. cpuCost, err := strconv.ParseFloat(cpuCostStr, 64)
  641. if err != nil || math.IsNaN(cpuCost) || math.IsInf(cpuCost, 0) || cpuCost == 0 {
  642. cpuCost, err = strconv.ParseFloat(cfg.CPU, 64)
  643. usesCustom = true
  644. if err != nil || math.IsNaN(cpuCost) || math.IsInf(cpuCost, 0) {
  645. cpuCost = 0
  646. }
  647. }
  648. ramCost, err := strconv.ParseFloat(ramCostStr, 64)
  649. if err != nil || math.IsNaN(ramCost) || math.IsInf(ramCost, 0) || ramCost == 0 {
  650. ramCost, err = strconv.ParseFloat(cfg.RAM, 64)
  651. usesCustom = true
  652. if err != nil || math.IsNaN(ramCost) || math.IsInf(ramCost, 0) {
  653. ramCost = 0
  654. }
  655. }
  656. gpuCost, err := strconv.ParseFloat(gpuCostStr, 64)
  657. if err != nil || math.IsNaN(gpuCost) || math.IsInf(gpuCost, 0) {
  658. gpuCost, err = strconv.ParseFloat(cfg.GPU, 64)
  659. if err != nil || math.IsNaN(gpuCost) || math.IsInf(gpuCost, 0) {
  660. gpuCost = 0
  661. }
  662. }
  663. pvCost, err := strconv.ParseFloat(pvCostStr, 64)
  664. if err != nil || math.IsNaN(cpuCost) || math.IsInf(cpuCost, 0) {
  665. pvCost, err = strconv.ParseFloat(cfg.Storage, 64)
  666. if err != nil || math.IsNaN(pvCost) || math.IsInf(pvCost, 0) {
  667. pvCost = 0
  668. }
  669. }
  670. return cpuCost, ramCost, gpuCost, pvCost, usesCustom
  671. }
  672. func getPriceVectors(cp cloud.Provider, costDatum *CostData, rate string, discount float64, customDiscount float64, idleCoefficient float64) ([]*util.Vector, []*util.Vector, []*util.Vector, [][]*util.Vector, []*util.Vector) {
  673. var cpuCost float64
  674. var ramCost float64
  675. var gpuCost float64
  676. var pvCost float64
  677. var usesCustom bool
  678. // If custom pricing is enabled and can be retrieved, replace
  679. // default cost values with custom values
  680. customPricing, err := cp.GetConfig()
  681. if err != nil {
  682. log.Errorf("failed to load custom pricing: %s", err)
  683. }
  684. if cloud.CustomPricesEnabled(cp) && err == nil {
  685. var cpuCostStr string
  686. var ramCostStr string
  687. var gpuCostStr string
  688. var pvCostStr string
  689. if costDatum.NodeData.IsSpot() {
  690. cpuCostStr = customPricing.SpotCPU
  691. ramCostStr = customPricing.SpotRAM
  692. gpuCostStr = customPricing.SpotGPU
  693. } else {
  694. cpuCostStr = customPricing.CPU
  695. ramCostStr = customPricing.RAM
  696. gpuCostStr = customPricing.GPU
  697. }
  698. pvCostStr = customPricing.Storage
  699. cpuCost, ramCost, gpuCost, pvCost, usesCustom = parseVectorPricing(customPricing, costDatum, cpuCostStr, ramCostStr, gpuCostStr, pvCostStr)
  700. } else if costDatum.NodeData == nil && err == nil {
  701. cpuCostStr := customPricing.CPU
  702. ramCostStr := customPricing.RAM
  703. gpuCostStr := customPricing.GPU
  704. pvCostStr := customPricing.Storage
  705. cpuCost, ramCost, gpuCost, pvCost, usesCustom = parseVectorPricing(customPricing, costDatum, cpuCostStr, ramCostStr, gpuCostStr, pvCostStr)
  706. } else {
  707. cpuCostStr := costDatum.NodeData.VCPUCost
  708. ramCostStr := costDatum.NodeData.RAMCost
  709. gpuCostStr := costDatum.NodeData.GPUCost
  710. pvCostStr := costDatum.NodeData.StorageCost
  711. cpuCost, ramCost, gpuCost, pvCost, usesCustom = parseVectorPricing(customPricing, costDatum, cpuCostStr, ramCostStr, gpuCostStr, pvCostStr)
  712. }
  713. if usesCustom {
  714. log.DedupedWarningf(5, "No pricing data found for node `%s` , using custom pricing", costDatum.NodeName)
  715. }
  716. cpuDiscount, ramDiscount := getDiscounts(costDatum, cpuCost, ramCost, discount)
  717. log.Debugf("Node Name: %s", costDatum.NodeName)
  718. log.Debugf("Blended CPU Discount: %f", cpuDiscount)
  719. log.Debugf("Blended RAM Discount: %f", ramDiscount)
  720. // TODO should we try to apply the rate coefficient here or leave it as a totals-only metric?
  721. rateCoeff := 1.0
  722. if idleCoefficient == 0 {
  723. idleCoefficient = 1.0
  724. }
  725. cpuv := make([]*util.Vector, 0, len(costDatum.CPUAllocation))
  726. for _, val := range costDatum.CPUAllocation {
  727. cpuv = append(cpuv, &util.Vector{
  728. Timestamp: math.Round(val.Timestamp/10) * 10,
  729. Value: (val.Value * cpuCost * (1 - cpuDiscount) * (1 - customDiscount) / idleCoefficient) * rateCoeff,
  730. })
  731. }
  732. ramv := make([]*util.Vector, 0, len(costDatum.RAMAllocation))
  733. for _, val := range costDatum.RAMAllocation {
  734. ramv = append(ramv, &util.Vector{
  735. Timestamp: math.Round(val.Timestamp/10) * 10,
  736. Value: ((val.Value / 1024 / 1024 / 1024) * ramCost * (1 - ramDiscount) * (1 - customDiscount) / idleCoefficient) * rateCoeff,
  737. })
  738. }
  739. gpuv := make([]*util.Vector, 0, len(costDatum.GPUReq))
  740. for _, val := range costDatum.GPUReq {
  741. gpuv = append(gpuv, &util.Vector{
  742. Timestamp: math.Round(val.Timestamp/10) * 10,
  743. Value: (val.Value * gpuCost * (1 - discount) * (1 - customDiscount) / idleCoefficient) * rateCoeff,
  744. })
  745. }
  746. pvvs := make([][]*util.Vector, 0, len(costDatum.PVCData))
  747. for _, pvcData := range costDatum.PVCData {
  748. pvv := make([]*util.Vector, 0, len(pvcData.Values))
  749. if pvcData.Volume != nil {
  750. cost, _ := strconv.ParseFloat(pvcData.Volume.Cost, 64)
  751. // override with custom pricing if enabled
  752. if cloud.CustomPricesEnabled(cp) {
  753. cost = pvCost
  754. }
  755. for _, val := range pvcData.Values {
  756. pvv = append(pvv, &util.Vector{
  757. Timestamp: math.Round(val.Timestamp/10) * 10,
  758. Value: ((val.Value / 1024 / 1024 / 1024) * cost * (1 - customDiscount) / idleCoefficient) * rateCoeff,
  759. })
  760. }
  761. pvvs = append(pvvs, pvv)
  762. }
  763. }
  764. netv := make([]*util.Vector, 0, len(costDatum.NetworkData))
  765. for _, val := range costDatum.NetworkData {
  766. netv = append(netv, &util.Vector{
  767. Timestamp: math.Round(val.Timestamp/10) * 10,
  768. Value: val.Value,
  769. })
  770. }
  771. return cpuv, ramv, gpuv, pvvs, netv
  772. }
  773. func averageVectors(vectors []*util.Vector) float64 {
  774. if len(vectors) == 0 {
  775. return 0.0
  776. }
  777. return totalVectors(vectors) / float64(len(vectors))
  778. }
  779. func totalVectors(vectors []*util.Vector) float64 {
  780. total := 0.0
  781. for _, vector := range vectors {
  782. total += vector.Value
  783. }
  784. return total
  785. }
  786. // addVectors adds two slices of Vectors. Vector timestamps are rounded to the
  787. // nearest ten seconds to allow matching of Vectors within a delta allowance.
  788. // Matching Vectors are summed, while unmatched Vectors are passed through.
  789. // e.g. [(t=1, 1), (t=2, 2)] + [(t=2, 2), (t=3, 3)] = [(t=1, 1), (t=2, 4), (t=3, 3)]
  790. func addVectors(xvs []*util.Vector, yvs []*util.Vector) []*util.Vector {
  791. sumOp := func(result *util.Vector, x *float64, y *float64) bool {
  792. if x != nil && y != nil {
  793. result.Value = *x + *y
  794. } else if y != nil {
  795. result.Value = *y
  796. } else if x != nil {
  797. result.Value = *x
  798. }
  799. return true
  800. }
  801. return util.ApplyVectorOp(xvs, yvs, sumOp)
  802. }
  803. // minCostDataLength sets the minimum number of time series data required to
  804. // cache both raw and aggregated cost data
  805. const minCostDataLength = 2
  806. // EmptyDataError describes an error caused by empty cost data for some
  807. // defined interval
  808. type EmptyDataError struct {
  809. err error
  810. window kubecost.Window
  811. }
  812. // Error implements the error interface
  813. func (ede *EmptyDataError) Error() string {
  814. err := fmt.Sprintf("empty data for range: %s", ede.window)
  815. if ede.err != nil {
  816. err += fmt.Sprintf(": %s", ede.err)
  817. }
  818. return err
  819. }
  820. func costDataTimeSeriesLength(costData map[string]*CostData) int {
  821. l := 0
  822. for _, cd := range costData {
  823. if l < len(cd.RAMAllocation) {
  824. l = len(cd.RAMAllocation)
  825. }
  826. if l < len(cd.CPUAllocation) {
  827. l = len(cd.CPUAllocation)
  828. }
  829. }
  830. return l
  831. }
  832. // ScaleHourlyCostData converts per-hour cost data to per-resolution data. If the target resolution is higher (i.e. < 1.0h)
  833. // then we can do simple multiplication by the fraction-of-an-hour and retain accuracy. If the target resolution is
  834. // lower (i.e. > 1.0h) then we sum groups of hourly data by resolution to maintain fidelity.
  835. // e.g. (100 hours of per-hour hourly data, resolutionHours=10) => 10 data points, grouped and summed by 10-hour window
  836. // e.g. (20 minutes of per-minute hourly data, resolutionHours=1/60) => 20 data points, scaled down by a factor of 60
  837. func ScaleHourlyCostData(data map[string]*CostData, resolutionHours float64) map[string]*CostData {
  838. scaled := map[string]*CostData{}
  839. for key, datum := range data {
  840. datum.RAMReq = scaleVectorSeries(datum.RAMReq, resolutionHours)
  841. datum.RAMUsed = scaleVectorSeries(datum.RAMUsed, resolutionHours)
  842. datum.RAMAllocation = scaleVectorSeries(datum.RAMAllocation, resolutionHours)
  843. datum.CPUReq = scaleVectorSeries(datum.CPUReq, resolutionHours)
  844. datum.CPUUsed = scaleVectorSeries(datum.CPUUsed, resolutionHours)
  845. datum.CPUAllocation = scaleVectorSeries(datum.CPUAllocation, resolutionHours)
  846. datum.GPUReq = scaleVectorSeries(datum.GPUReq, resolutionHours)
  847. datum.NetworkData = scaleVectorSeries(datum.NetworkData, resolutionHours)
  848. for _, pvcDatum := range datum.PVCData {
  849. pvcDatum.Values = scaleVectorSeries(pvcDatum.Values, resolutionHours)
  850. }
  851. scaled[key] = datum
  852. }
  853. return scaled
  854. }
  855. func scaleVectorSeries(vs []*util.Vector, resolutionHours float64) []*util.Vector {
  856. // if scaling to a lower resolution, compress the hourly data for maximum accuracy
  857. if resolutionHours > 1.0 {
  858. return compressVectorSeries(vs, resolutionHours)
  859. }
  860. // if scaling to a higher resolution, simply scale each value down by the fraction of an hour
  861. for _, v := range vs {
  862. v.Value *= resolutionHours
  863. }
  864. return vs
  865. }
  866. func compressVectorSeries(vs []*util.Vector, resolutionHours float64) []*util.Vector {
  867. if len(vs) == 0 {
  868. return vs
  869. }
  870. compressed := []*util.Vector{}
  871. threshold := float64(60 * 60 * resolutionHours)
  872. var acc *util.Vector
  873. for i, v := range vs {
  874. if acc == nil {
  875. // start a new accumulation from current datum
  876. acc = &util.Vector{
  877. Value: vs[i].Value,
  878. Timestamp: vs[i].Timestamp,
  879. }
  880. continue
  881. }
  882. if v.Timestamp-acc.Timestamp < threshold {
  883. // v should be accumulated in current datum
  884. acc.Value += v.Value
  885. } else {
  886. // v falls outside current datum's threshold; append and start a new one
  887. compressed = append(compressed, acc)
  888. acc = &util.Vector{
  889. Value: vs[i].Value,
  890. Timestamp: vs[i].Timestamp,
  891. }
  892. }
  893. }
  894. // append any remaining, incomplete accumulation
  895. if acc != nil {
  896. compressed = append(compressed, acc)
  897. }
  898. return compressed
  899. }
  900. type AggregateQueryOpts struct {
  901. Rate string
  902. Filters map[string]string
  903. SharedResources *SharedResourceInfo
  904. ShareSplit string
  905. AllocateIdle bool
  906. IncludeTimeSeries bool
  907. IncludeEfficiency bool
  908. DisableCache bool
  909. ClearCache bool
  910. NoCache bool
  911. NoExpireCache bool
  912. RemoteEnabled bool
  913. DisableSharedOverhead bool
  914. UseETLAdapter bool
  915. }
  916. func DefaultAggregateQueryOpts() *AggregateQueryOpts {
  917. return &AggregateQueryOpts{
  918. Rate: "",
  919. Filters: map[string]string{},
  920. SharedResources: nil,
  921. ShareSplit: SplitTypeWeighted,
  922. AllocateIdle: false,
  923. IncludeTimeSeries: true,
  924. IncludeEfficiency: true,
  925. DisableCache: false,
  926. ClearCache: false,
  927. NoCache: false,
  928. NoExpireCache: false,
  929. RemoteEnabled: env.IsRemoteEnabled(),
  930. DisableSharedOverhead: false,
  931. UseETLAdapter: false,
  932. }
  933. }
  934. // ComputeAggregateCostModel computes cost data for the given window, then aggregates it by the given fields.
  935. // Data is cached on two levels: the aggregation is cached as well as the underlying cost data.
  936. func (a *Accesses) ComputeAggregateCostModel(promClient prometheusClient.Client, window kubecost.Window, field string, subfields []string, opts *AggregateQueryOpts) (map[string]*Aggregation, string, error) {
  937. // Window is the range of the query, i.e. (start, end)
  938. // It must be closed, i.e. neither start nor end can be nil
  939. if window.IsOpen() {
  940. return nil, "", fmt.Errorf("illegal window: %s", window)
  941. }
  942. // Resolution is the duration of each datum in the cost model range query,
  943. // which corresponds to both the step size given to Prometheus query_range
  944. // and to the window passed to the range queries.
  945. // i.e. by default, we support 1h resolution for queries of windows defined
  946. // in terms of days or integer multiples of hours (e.g. 1d, 12h)
  947. resolution := time.Hour
  948. // Determine resolution by size of duration and divisibility of window.
  949. // By default, resolution is 1hr. If the window is smaller than 1hr, then
  950. // resolution goes down to 1m. If the window is not a multiple of 1hr, then
  951. // resolution goes down to 1m. If the window is greater than 1d, then
  952. // resolution gets scaled up to improve performance by reducing the amount
  953. // of data being computed.
  954. durMins := int64(math.Trunc(window.Minutes()))
  955. if durMins < 24*60 { // less than 1d
  956. // TODO should we have additional options for going by
  957. // e.g. 30m? 10m? 5m?
  958. if durMins%60 != 0 || durMins < 3*60 { // not divisible by 1h or less than 3h
  959. resolution = time.Minute
  960. }
  961. } else { // greater than 1d
  962. if durMins >= 7*24*60 { // greater than (or equal to) 7 days
  963. resolution = 24.0 * time.Hour
  964. } else if durMins >= 2*24*60 { // greater than (or equal to) 2 days
  965. resolution = 2.0 * time.Hour
  966. }
  967. }
  968. // Parse options
  969. if opts == nil {
  970. opts = DefaultAggregateQueryOpts()
  971. }
  972. rate := opts.Rate
  973. filters := opts.Filters
  974. sri := opts.SharedResources
  975. shared := opts.ShareSplit
  976. allocateIdle := opts.AllocateIdle
  977. includeTimeSeries := opts.IncludeTimeSeries
  978. includeEfficiency := opts.IncludeEfficiency
  979. disableCache := opts.DisableCache
  980. clearCache := opts.ClearCache
  981. noCache := opts.NoCache
  982. noExpireCache := opts.NoExpireCache
  983. remoteEnabled := opts.RemoteEnabled
  984. disableSharedOverhead := opts.DisableSharedOverhead
  985. // retainFuncs override filterFuncs. Make sure shared resources do not
  986. // get filtered out.
  987. retainFuncs := []FilterFunc{}
  988. retainFuncs = append(retainFuncs, func(cd *CostData) (bool, string) {
  989. if sri != nil {
  990. return sri.IsSharedResource(cd), ""
  991. }
  992. return false, ""
  993. })
  994. // Parse cost data filters into FilterFuncs
  995. filterFuncs := []FilterFunc{}
  996. aggregateEnvironment := func(costDatum *CostData) string {
  997. if field == "cluster" {
  998. return costDatum.ClusterID
  999. } else if field == "node" {
  1000. return costDatum.NodeName
  1001. } else if field == "namespace" {
  1002. return costDatum.Namespace
  1003. } else if field == "service" {
  1004. if len(costDatum.Services) > 0 {
  1005. return costDatum.Namespace + "/" + costDatum.Services[0]
  1006. }
  1007. } else if field == "deployment" {
  1008. if len(costDatum.Deployments) > 0 {
  1009. return costDatum.Namespace + "/" + costDatum.Deployments[0]
  1010. }
  1011. } else if field == "daemonset" {
  1012. if len(costDatum.Daemonsets) > 0 {
  1013. return costDatum.Namespace + "/" + costDatum.Daemonsets[0]
  1014. }
  1015. } else if field == "statefulset" {
  1016. if len(costDatum.Statefulsets) > 0 {
  1017. return costDatum.Namespace + "/" + costDatum.Statefulsets[0]
  1018. }
  1019. } else if field == "label" {
  1020. if costDatum.Labels != nil {
  1021. for _, sf := range subfields {
  1022. if subfieldName, ok := costDatum.Labels[sf]; ok {
  1023. return fmt.Sprintf("%s=%s", sf, subfieldName)
  1024. }
  1025. }
  1026. }
  1027. } else if field == "annotation" {
  1028. if costDatum.Annotations != nil {
  1029. for _, sf := range subfields {
  1030. if subfieldName, ok := costDatum.Annotations[sf]; ok {
  1031. return fmt.Sprintf("%s=%s", sf, subfieldName)
  1032. }
  1033. }
  1034. }
  1035. } else if field == "pod" {
  1036. return costDatum.Namespace + "/" + costDatum.PodName
  1037. } else if field == "container" {
  1038. return costDatum.Namespace + "/" + costDatum.PodName + "/" + costDatum.Name
  1039. }
  1040. return ""
  1041. }
  1042. if filters["podprefix"] != "" {
  1043. pps := []string{}
  1044. for _, fp := range strings.Split(filters["podprefix"], ",") {
  1045. if fp != "" {
  1046. cleanedFilter := strings.TrimSpace(fp)
  1047. pps = append(pps, cleanedFilter)
  1048. }
  1049. }
  1050. filterFuncs = append(filterFuncs, func(cd *CostData) (bool, string) {
  1051. aggEnv := aggregateEnvironment(cd)
  1052. for _, pp := range pps {
  1053. cleanedFilter := strings.TrimSpace(pp)
  1054. if strings.HasPrefix(cd.PodName, cleanedFilter) {
  1055. return true, aggEnv
  1056. }
  1057. }
  1058. return false, aggEnv
  1059. })
  1060. }
  1061. if filters["namespace"] != "" {
  1062. // namespaces may be comma-separated, e.g. kubecost,default
  1063. // multiple namespaces are evaluated as an OR relationship
  1064. nss := strings.Split(filters["namespace"], ",")
  1065. filterFuncs = append(filterFuncs, func(cd *CostData) (bool, string) {
  1066. aggEnv := aggregateEnvironment(cd)
  1067. for _, ns := range nss {
  1068. nsTrim := strings.TrimSpace(ns)
  1069. if cd.Namespace == nsTrim {
  1070. return true, aggEnv
  1071. } else if strings.HasSuffix(nsTrim, "*") { // trigger wildcard prefix filtering
  1072. nsTrimAsterisk := strings.TrimSuffix(nsTrim, "*")
  1073. if strings.HasPrefix(cd.Namespace, nsTrimAsterisk) {
  1074. return true, aggEnv
  1075. }
  1076. }
  1077. }
  1078. return false, aggEnv
  1079. })
  1080. }
  1081. if filters["node"] != "" {
  1082. // nodes may be comma-separated, e.g. aws-node-1,aws-node-2
  1083. // multiple nodes are evaluated as an OR relationship
  1084. nodes := strings.Split(filters["node"], ",")
  1085. filterFuncs = append(filterFuncs, func(cd *CostData) (bool, string) {
  1086. aggEnv := aggregateEnvironment(cd)
  1087. for _, node := range nodes {
  1088. nodeTrim := strings.TrimSpace(node)
  1089. if cd.NodeName == nodeTrim {
  1090. return true, aggEnv
  1091. } else if strings.HasSuffix(nodeTrim, "*") { // trigger wildcard prefix filtering
  1092. nodeTrimAsterisk := strings.TrimSuffix(nodeTrim, "*")
  1093. if strings.HasPrefix(cd.NodeName, nodeTrimAsterisk) {
  1094. return true, aggEnv
  1095. }
  1096. }
  1097. }
  1098. return false, aggEnv
  1099. })
  1100. }
  1101. if filters["cluster"] != "" {
  1102. // clusters may be comma-separated, e.g. cluster-one,cluster-two
  1103. // multiple clusters are evaluated as an OR relationship
  1104. cs := strings.Split(filters["cluster"], ",")
  1105. filterFuncs = append(filterFuncs, func(cd *CostData) (bool, string) {
  1106. aggEnv := aggregateEnvironment(cd)
  1107. for _, c := range cs {
  1108. cTrim := strings.TrimSpace(c)
  1109. id, name := cd.ClusterID, cd.ClusterName
  1110. if id == cTrim || name == cTrim {
  1111. return true, aggEnv
  1112. } else if strings.HasSuffix(cTrim, "*") { // trigger wildcard prefix filtering
  1113. cTrimAsterisk := strings.TrimSuffix(cTrim, "*")
  1114. if strings.HasPrefix(id, cTrimAsterisk) || strings.HasPrefix(name, cTrimAsterisk) {
  1115. return true, aggEnv
  1116. }
  1117. }
  1118. }
  1119. return false, aggEnv
  1120. })
  1121. }
  1122. if filters["labels"] != "" {
  1123. // labels are expected to be comma-separated and to take the form key=value
  1124. // e.g. app=cost-analyzer,app.kubernetes.io/instance=kubecost
  1125. // each different label will be applied as an AND
  1126. // multiple values for a single label will be evaluated as an OR
  1127. labelValues := map[string][]string{}
  1128. ls := strings.Split(filters["labels"], ",")
  1129. for _, l := range ls {
  1130. lTrim := strings.TrimSpace(l)
  1131. label := strings.Split(lTrim, "=")
  1132. if len(label) == 2 {
  1133. ln := prom.SanitizeLabelName(strings.TrimSpace(label[0]))
  1134. lv := strings.TrimSpace(label[1])
  1135. labelValues[ln] = append(labelValues[ln], lv)
  1136. } else {
  1137. // label is not of the form name=value, so log it and move on
  1138. log.Warnf("ComputeAggregateCostModel: skipping illegal label filter: %s", l)
  1139. }
  1140. }
  1141. // Generate FilterFunc for each set of label filters by invoking a function instead of accessing
  1142. // values by closure to prevent reference-type looping bug.
  1143. // (see https://github.com/golang/go/wiki/CommonMistakes#using-reference-to-loop-iterator-variable)
  1144. for label, values := range labelValues {
  1145. ff := (func(l string, vs []string) FilterFunc {
  1146. return func(cd *CostData) (bool, string) {
  1147. ae := aggregateEnvironment(cd)
  1148. for _, v := range vs {
  1149. if v == "__unallocated__" { // Special case. __unallocated__ means return all pods without the attached label
  1150. if _, ok := cd.Labels[l]; !ok {
  1151. return true, ae
  1152. }
  1153. }
  1154. if cd.Labels[l] == v {
  1155. return true, ae
  1156. } else if strings.HasSuffix(v, "*") { // trigger wildcard prefix filtering
  1157. vTrim := strings.TrimSuffix(v, "*")
  1158. if strings.HasPrefix(cd.Labels[l], vTrim) {
  1159. return true, ae
  1160. }
  1161. }
  1162. }
  1163. return false, ae
  1164. }
  1165. })(label, values)
  1166. filterFuncs = append(filterFuncs, ff)
  1167. }
  1168. }
  1169. if filters["annotations"] != "" {
  1170. // annotations are expected to be comma-separated and to take the form key=value
  1171. // e.g. app=cost-analyzer,app.kubernetes.io/instance=kubecost
  1172. // each different annotation will be applied as an AND
  1173. // multiple values for a single annotation will be evaluated as an OR
  1174. annotationValues := map[string][]string{}
  1175. as := strings.Split(filters["annotations"], ",")
  1176. for _, annot := range as {
  1177. aTrim := strings.TrimSpace(annot)
  1178. annotation := strings.Split(aTrim, "=")
  1179. if len(annotation) == 2 {
  1180. an := prom.SanitizeLabelName(strings.TrimSpace(annotation[0]))
  1181. av := strings.TrimSpace(annotation[1])
  1182. annotationValues[an] = append(annotationValues[an], av)
  1183. } else {
  1184. // annotation is not of the form name=value, so log it and move on
  1185. log.Warnf("ComputeAggregateCostModel: skipping illegal annotation filter: %s", annot)
  1186. }
  1187. }
  1188. // Generate FilterFunc for each set of annotation filters by invoking a function instead of accessing
  1189. // values by closure to prevent reference-type looping bug.
  1190. // (see https://github.com/golang/go/wiki/CommonMistakes#using-reference-to-loop-iterator-variable)
  1191. for annotation, values := range annotationValues {
  1192. ff := (func(l string, vs []string) FilterFunc {
  1193. return func(cd *CostData) (bool, string) {
  1194. ae := aggregateEnvironment(cd)
  1195. for _, v := range vs {
  1196. if v == "__unallocated__" { // Special case. __unallocated__ means return all pods without the attached label
  1197. if _, ok := cd.Annotations[l]; !ok {
  1198. return true, ae
  1199. }
  1200. }
  1201. if cd.Annotations[l] == v {
  1202. return true, ae
  1203. } else if strings.HasSuffix(v, "*") { // trigger wildcard prefix filtering
  1204. vTrim := strings.TrimSuffix(v, "*")
  1205. if strings.HasPrefix(cd.Annotations[l], vTrim) {
  1206. return true, ae
  1207. }
  1208. }
  1209. }
  1210. return false, ae
  1211. }
  1212. })(annotation, values)
  1213. filterFuncs = append(filterFuncs, ff)
  1214. }
  1215. }
  1216. // clear cache prior to checking the cache so that a clearCache=true
  1217. // request always returns a freshly computed value
  1218. if clearCache {
  1219. a.AggregateCache.Flush()
  1220. a.CostDataCache.Flush()
  1221. }
  1222. cacheExpiry := a.GetCacheExpiration(window.Duration())
  1223. if noExpireCache {
  1224. cacheExpiry = cache.NoExpiration
  1225. }
  1226. // parametrize cache key by all request parameters
  1227. aggKey := GenerateAggKey(window, field, subfields, opts)
  1228. thanosOffset := time.Now().Add(-thanos.OffsetDuration())
  1229. if a.ThanosClient != nil && window.End().After(thanosOffset) {
  1230. log.Infof("ComputeAggregateCostModel: setting end time backwards to first present data")
  1231. // Apply offsets to both end and start times to maintain correct time range
  1232. deltaDuration := window.End().Sub(thanosOffset)
  1233. s := window.Start().Add(-1 * deltaDuration)
  1234. e := time.Now().Add(-thanos.OffsetDuration())
  1235. window.Set(&s, &e)
  1236. }
  1237. dur, off := window.DurationOffsetStrings()
  1238. key := fmt.Sprintf(`%s:%s:%fh:%t`, dur, off, resolution.Hours(), remoteEnabled)
  1239. // report message about which of the two caches hit. by default report a miss
  1240. cacheMessage := fmt.Sprintf("ComputeAggregateCostModel: L1 cache miss: %s L2 cache miss: %s", aggKey, key)
  1241. // check the cache for aggregated response; if cache is hit and not disabled, return response
  1242. if value, found := a.AggregateCache.Get(aggKey); found && !disableCache && !noCache {
  1243. result, ok := value.(map[string]*Aggregation)
  1244. if !ok {
  1245. // disable cache and recompute if type cast fails
  1246. log.Errorf("ComputeAggregateCostModel: caching error: failed to cast aggregate data to struct: %s", aggKey)
  1247. return a.ComputeAggregateCostModel(promClient, window, field, subfields, opts)
  1248. }
  1249. return result, fmt.Sprintf("aggregate cache hit: %s", aggKey), nil
  1250. }
  1251. if window.Hours() >= 1.0 {
  1252. // exclude the last window of the time frame to match Prometheus definitions of range, offset, and resolution
  1253. start := window.Start().Add(resolution)
  1254. window.Set(&start, window.End())
  1255. } else {
  1256. // don't cache requests for durations of less than one hour
  1257. disableCache = true
  1258. }
  1259. // attempt to retrieve cost data from cache
  1260. var costData map[string]*CostData
  1261. var err error
  1262. cacheData, found := a.CostDataCache.Get(key)
  1263. if found && !disableCache && !noCache {
  1264. ok := false
  1265. costData, ok = cacheData.(map[string]*CostData)
  1266. cacheMessage = fmt.Sprintf("ComputeAggregateCostModel: L1 cache miss: %s, L2 cost data cache hit: %s", aggKey, key)
  1267. if !ok {
  1268. log.Errorf("ComputeAggregateCostModel: caching error: failed to cast cost data to struct: %s", key)
  1269. }
  1270. } else {
  1271. log.Infof("ComputeAggregateCostModel: missed cache: %s (found %t, disableCache %t, noCache %t)", key, found, disableCache, noCache)
  1272. costData, err = a.Model.ComputeCostDataRange(promClient, a.CloudProvider, window, resolution, "", "", remoteEnabled)
  1273. if err != nil {
  1274. if prom.IsErrorCollection(err) {
  1275. return nil, "", err
  1276. }
  1277. if pce, ok := err.(prom.CommError); ok {
  1278. return nil, "", pce
  1279. }
  1280. if strings.Contains(err.Error(), "data is empty") {
  1281. return nil, "", &EmptyDataError{err: err, window: window}
  1282. }
  1283. return nil, "", err
  1284. }
  1285. // compute length of the time series in the cost data and only compute
  1286. // aggregates and cache if the length is sufficiently high
  1287. costDataLen := costDataTimeSeriesLength(costData)
  1288. if costDataLen == 0 {
  1289. return nil, "", &EmptyDataError{window: window}
  1290. }
  1291. if costDataLen >= minCostDataLength && !noCache {
  1292. log.Infof("ComputeAggregateCostModel: setting L2 cache: %s", key)
  1293. a.CostDataCache.Set(key, costData, cacheExpiry)
  1294. }
  1295. }
  1296. c, err := a.CloudProvider.GetConfig()
  1297. if err != nil {
  1298. return nil, "", err
  1299. }
  1300. discount, err := ParsePercentString(c.Discount)
  1301. if err != nil {
  1302. return nil, "", err
  1303. }
  1304. customDiscount, err := ParsePercentString(c.NegotiatedDiscount)
  1305. if err != nil {
  1306. return nil, "", err
  1307. }
  1308. sc := make(map[string]*SharedCostInfo)
  1309. if !disableSharedOverhead {
  1310. costPerMonth := c.GetSharedOverheadCostPerMonth()
  1311. durationCoefficient := window.Hours() / timeutil.HoursPerMonth
  1312. sc["total"] = &SharedCostInfo{
  1313. Name: "total",
  1314. Cost: costPerMonth * durationCoefficient,
  1315. }
  1316. }
  1317. idleCoefficients := make(map[string]float64)
  1318. if allocateIdle {
  1319. dur, off, err := window.DurationOffset()
  1320. if err != nil {
  1321. log.Errorf("ComputeAggregateCostModel: error computing idle coefficient: illegal window: %s (%s)", window, err)
  1322. return nil, "", err
  1323. }
  1324. if a.ThanosClient != nil && off < thanos.OffsetDuration() {
  1325. // Determine difference between the Thanos offset and the requested
  1326. // offset; e.g. off=1h, thanosOffsetDuration=3h => diff=2h
  1327. diff := thanos.OffsetDuration() - off
  1328. // Reduce duration by difference and increase offset by difference
  1329. // e.g. 24h offset 0h => 21h offset 3h
  1330. dur = dur - diff
  1331. off = thanos.OffsetDuration()
  1332. log.Infof("ComputeAggregateCostModel: setting duration, offset to %s, %s due to Thanos", dur, off)
  1333. // Idle computation cannot be fulfilled for some windows, specifically
  1334. // those with sum(duration, offset) < Thanos offset, because there is
  1335. // no data within that window.
  1336. if dur <= 0 {
  1337. return nil, "", fmt.Errorf("requested idle coefficients from Thanos for illegal duration, offset: %s, %s (original window %s)", dur, off, window)
  1338. }
  1339. }
  1340. idleCoefficients, err = a.ComputeIdleCoefficient(costData, promClient, a.CloudProvider, discount, customDiscount, dur, off)
  1341. if err != nil {
  1342. durStr, offStr := timeutil.DurationOffsetStrings(dur, off)
  1343. log.Errorf("ComputeAggregateCostModel: error computing idle coefficient: duration=%s, offset=%s, err=%s", durStr, offStr, err)
  1344. return nil, "", err
  1345. }
  1346. }
  1347. totalContainerCost := 0.0
  1348. if shared == SplitTypeWeighted {
  1349. totalContainerCost = GetTotalContainerCost(costData, rate, a.CloudProvider, discount, customDiscount, idleCoefficients)
  1350. }
  1351. // filter cost data by namespace and cluster after caching for maximal cache hits
  1352. costData, filteredContainerCount, filteredEnvironments := FilterCostData(costData, retainFuncs, filterFuncs)
  1353. // aggregate cost model data by given fields and cache the result for the default expiration
  1354. aggOpts := &AggregationOptions{
  1355. Discount: discount,
  1356. CustomDiscount: customDiscount,
  1357. IdleCoefficients: idleCoefficients,
  1358. IncludeEfficiency: includeEfficiency,
  1359. IncludeTimeSeries: includeTimeSeries,
  1360. Rate: rate,
  1361. ResolutionHours: resolution.Hours(),
  1362. SharedResourceInfo: sri,
  1363. SharedCosts: sc,
  1364. FilteredContainerCount: filteredContainerCount,
  1365. FilteredEnvironments: filteredEnvironments,
  1366. TotalContainerCost: totalContainerCost,
  1367. SharedSplit: shared,
  1368. }
  1369. result := AggregateCostData(costData, field, subfields, a.CloudProvider, aggOpts)
  1370. // If sending time series data back, switch scale back to hourly data. At this point,
  1371. // resolutionHours may have converted our hourly data to more- or less-than hourly data.
  1372. if includeTimeSeries {
  1373. for _, aggs := range result {
  1374. ScaleAggregationTimeSeries(aggs, resolution.Hours())
  1375. }
  1376. }
  1377. // compute length of the time series in the cost data and only cache
  1378. // aggregation results if the length is sufficiently high
  1379. costDataLen := costDataTimeSeriesLength(costData)
  1380. if costDataLen >= minCostDataLength && window.Hours() > 1.0 && !noCache {
  1381. // Set the result map (rather than a pointer to it) because map is a reference type
  1382. log.Infof("ComputeAggregateCostModel: setting aggregate cache: %s", aggKey)
  1383. a.AggregateCache.Set(aggKey, result, cacheExpiry)
  1384. } else {
  1385. log.Infof("ComputeAggregateCostModel: not setting aggregate cache: %s (not enough data: %t; duration less than 1h: %t; noCache: %t)", key, costDataLen < minCostDataLength, window.Hours() < 1, noCache)
  1386. }
  1387. return result, cacheMessage, nil
  1388. }
  1389. // ScaleAggregationTimeSeries reverses the scaling done by ScaleHourlyCostData, returning
  1390. // the aggregation's time series to hourly data.
  1391. func ScaleAggregationTimeSeries(aggregation *Aggregation, resolutionHours float64) {
  1392. for _, v := range aggregation.CPUCostVector {
  1393. v.Value /= resolutionHours
  1394. }
  1395. for _, v := range aggregation.GPUCostVector {
  1396. v.Value /= resolutionHours
  1397. }
  1398. for _, v := range aggregation.RAMCostVector {
  1399. v.Value /= resolutionHours
  1400. }
  1401. for _, v := range aggregation.PVCostVector {
  1402. v.Value /= resolutionHours
  1403. }
  1404. for _, v := range aggregation.NetworkCostVector {
  1405. v.Value /= resolutionHours
  1406. }
  1407. for _, v := range aggregation.TotalCostVector {
  1408. v.Value /= resolutionHours
  1409. }
  1410. return
  1411. }
  1412. // String returns a string representation of the encapsulated shared resources, which
  1413. // can be used to uniquely identify a set of shared resources. Sorting sets of shared
  1414. // resources ensures that strings representing permutations of the same combination match.
  1415. func (s *SharedResourceInfo) String() string {
  1416. if s == nil {
  1417. return ""
  1418. }
  1419. nss := []string{}
  1420. for ns := range s.SharedNamespace {
  1421. nss = append(nss, ns)
  1422. }
  1423. sort.Strings(nss)
  1424. nsStr := strings.Join(nss, ",")
  1425. labels := []string{}
  1426. for lbl, vals := range s.LabelSelectors {
  1427. for val := range vals {
  1428. if lbl != "" && val != "" {
  1429. labels = append(labels, fmt.Sprintf("%s=%s", lbl, val))
  1430. }
  1431. }
  1432. }
  1433. sort.Strings(labels)
  1434. labelStr := strings.Join(labels, ",")
  1435. return fmt.Sprintf("%s:%s", nsStr, labelStr)
  1436. }
  1437. type aggKeyParams struct {
  1438. duration string
  1439. offset string
  1440. filters map[string]string
  1441. field string
  1442. subfields []string
  1443. rate string
  1444. sri *SharedResourceInfo
  1445. shareType string
  1446. idle bool
  1447. timeSeries bool
  1448. efficiency bool
  1449. }
  1450. // GenerateAggKey generates a parameter-unique key for caching the aggregate cost model
  1451. func GenerateAggKey(window kubecost.Window, field string, subfields []string, opts *AggregateQueryOpts) string {
  1452. if opts == nil {
  1453. opts = DefaultAggregateQueryOpts()
  1454. }
  1455. // Covert to duration, offset so that cache hits occur, even when timestamps have
  1456. // shifted slightly.
  1457. duration, offset := window.DurationOffsetStrings()
  1458. // parse, trim, and sort podprefix filters
  1459. podPrefixFilters := []string{}
  1460. if ppfs, ok := opts.Filters["podprefix"]; ok && ppfs != "" {
  1461. for _, psf := range strings.Split(ppfs, ",") {
  1462. podPrefixFilters = append(podPrefixFilters, strings.TrimSpace(psf))
  1463. }
  1464. }
  1465. sort.Strings(podPrefixFilters)
  1466. podPrefixFiltersStr := strings.Join(podPrefixFilters, ",")
  1467. // parse, trim, and sort namespace filters
  1468. nsFilters := []string{}
  1469. if nsfs, ok := opts.Filters["namespace"]; ok && nsfs != "" {
  1470. for _, nsf := range strings.Split(nsfs, ",") {
  1471. nsFilters = append(nsFilters, strings.TrimSpace(nsf))
  1472. }
  1473. }
  1474. sort.Strings(nsFilters)
  1475. nsFilterStr := strings.Join(nsFilters, ",")
  1476. // parse, trim, and sort node filters
  1477. nodeFilters := []string{}
  1478. if nodefs, ok := opts.Filters["node"]; ok && nodefs != "" {
  1479. for _, nodef := range strings.Split(nodefs, ",") {
  1480. nodeFilters = append(nodeFilters, strings.TrimSpace(nodef))
  1481. }
  1482. }
  1483. sort.Strings(nodeFilters)
  1484. nodeFilterStr := strings.Join(nodeFilters, ",")
  1485. // parse, trim, and sort cluster filters
  1486. cFilters := []string{}
  1487. if cfs, ok := opts.Filters["cluster"]; ok && cfs != "" {
  1488. for _, cf := range strings.Split(cfs, ",") {
  1489. cFilters = append(cFilters, strings.TrimSpace(cf))
  1490. }
  1491. }
  1492. sort.Strings(cFilters)
  1493. cFilterStr := strings.Join(cFilters, ",")
  1494. // parse, trim, and sort label filters
  1495. lFilters := []string{}
  1496. if lfs, ok := opts.Filters["labels"]; ok && lfs != "" {
  1497. for _, lf := range strings.Split(lfs, ",") {
  1498. // trim whitespace from the label name and the label value
  1499. // of each label name/value pair, then reconstruct
  1500. // e.g. "tier = frontend, app = kubecost" == "app=kubecost,tier=frontend"
  1501. lfa := strings.Split(lf, "=")
  1502. if len(lfa) == 2 {
  1503. lfn := strings.TrimSpace(lfa[0])
  1504. lfv := strings.TrimSpace(lfa[1])
  1505. lFilters = append(lFilters, fmt.Sprintf("%s=%s", lfn, lfv))
  1506. } else {
  1507. // label is not of the form name=value, so log it and move on
  1508. log.Warnf("GenerateAggKey: skipping illegal label filter: %s", lf)
  1509. }
  1510. }
  1511. }
  1512. sort.Strings(lFilters)
  1513. lFilterStr := strings.Join(lFilters, ",")
  1514. // parse, trim, and sort annotation filters
  1515. aFilters := []string{}
  1516. if afs, ok := opts.Filters["annotations"]; ok && afs != "" {
  1517. for _, af := range strings.Split(afs, ",") {
  1518. // trim whitespace from the annotation name and the annotation value
  1519. // of each annotation name/value pair, then reconstruct
  1520. // e.g. "tier = frontend, app = kubecost" == "app=kubecost,tier=frontend"
  1521. afa := strings.Split(af, "=")
  1522. if len(afa) == 2 {
  1523. afn := strings.TrimSpace(afa[0])
  1524. afv := strings.TrimSpace(afa[1])
  1525. aFilters = append(aFilters, fmt.Sprintf("%s=%s", afn, afv))
  1526. } else {
  1527. // annotation is not of the form name=value, so log it and move on
  1528. log.Warnf("GenerateAggKey: skipping illegal annotation filter: %s", af)
  1529. }
  1530. }
  1531. }
  1532. sort.Strings(aFilters)
  1533. aFilterStr := strings.Join(aFilters, ",")
  1534. filterStr := fmt.Sprintf("%s:%s:%s:%s:%s:%s", nsFilterStr, nodeFilterStr, cFilterStr, lFilterStr, aFilterStr, podPrefixFiltersStr)
  1535. sort.Strings(subfields)
  1536. fieldStr := fmt.Sprintf("%s:%s", field, strings.Join(subfields, ","))
  1537. if offset == "1m" {
  1538. offset = ""
  1539. }
  1540. return fmt.Sprintf("%s:%s:%s:%s:%s:%s:%s:%t:%t:%t", duration, offset, filterStr, fieldStr, opts.Rate,
  1541. opts.SharedResources, opts.ShareSplit, opts.AllocateIdle, opts.IncludeTimeSeries,
  1542. opts.IncludeEfficiency)
  1543. }
  1544. // Aggregator is capable of computing the aggregated cost model. This is
  1545. // a brutal interface, which should be cleaned up, but it's necessary for
  1546. // being able to swap in an ETL-backed implementation.
  1547. type Aggregator interface {
  1548. ComputeAggregateCostModel(promClient prometheusClient.Client, window kubecost.Window, field string, subfields []string, opts *AggregateQueryOpts) (map[string]*Aggregation, string, error)
  1549. }
  1550. func (a *Accesses) warmAggregateCostModelCache() {
  1551. // Only allow one concurrent cache-warming operation
  1552. sem := util.NewSemaphore(1)
  1553. // Set default values, pulling them from application settings where applicable, and warm the cache
  1554. // for the given duration. Cache is intentionally set to expire (i.e. noExpireCache=false) so that
  1555. // if the default parameters change, the old cached defaults with eventually expire. Thus, the
  1556. // timing of the cache expiry/refresh is the only mechanism ensuring 100% cache warmth.
  1557. warmFunc := func(duration, offset time.Duration, cacheEfficiencyData bool) (error, error) {
  1558. if a.ThanosClient != nil {
  1559. duration = thanos.OffsetDuration()
  1560. log.Infof("Setting Offset to %s", duration)
  1561. }
  1562. fmtDuration, fmtOffset := timeutil.DurationOffsetStrings(duration, offset)
  1563. durationHrs, err := timeutil.FormatDurationStringDaysToHours(fmtDuration)
  1564. promClient := a.GetPrometheusClient(true)
  1565. windowStr := fmt.Sprintf("%s offset %s", fmtDuration, fmtOffset)
  1566. window, err := kubecost.ParseWindowUTC(windowStr)
  1567. if err != nil {
  1568. return nil, fmt.Errorf("invalid window from window string: %s", windowStr)
  1569. }
  1570. field := "namespace"
  1571. subfields := []string{}
  1572. aggOpts := DefaultAggregateQueryOpts()
  1573. aggOpts.Rate = ""
  1574. aggOpts.Filters = map[string]string{}
  1575. aggOpts.IncludeTimeSeries = false
  1576. aggOpts.IncludeEfficiency = true
  1577. aggOpts.DisableCache = true
  1578. aggOpts.ClearCache = false
  1579. aggOpts.NoCache = false
  1580. aggOpts.NoExpireCache = false
  1581. aggOpts.ShareSplit = SplitTypeWeighted
  1582. aggOpts.RemoteEnabled = env.IsRemoteEnabled()
  1583. aggOpts.AllocateIdle = cloud.AllocateIdleByDefault(a.CloudProvider)
  1584. sharedNamespaces := cloud.SharedNamespaces(a.CloudProvider)
  1585. sharedLabelNames, sharedLabelValues := cloud.SharedLabels(a.CloudProvider)
  1586. if len(sharedNamespaces) > 0 || len(sharedLabelNames) > 0 {
  1587. aggOpts.SharedResources = NewSharedResourceInfo(true, sharedNamespaces, sharedLabelNames, sharedLabelValues)
  1588. }
  1589. aggKey := GenerateAggKey(window, field, subfields, aggOpts)
  1590. log.Infof("aggregation: cache warming defaults: %s", aggKey)
  1591. key := fmt.Sprintf("%s:%s", durationHrs, fmtOffset)
  1592. _, _, aggErr := a.ComputeAggregateCostModel(promClient, window, field, subfields, aggOpts)
  1593. if aggErr != nil {
  1594. log.Infof("Error building cache %s: %s", window, aggErr)
  1595. }
  1596. totals, err := a.ComputeClusterCosts(promClient, a.CloudProvider, duration, offset, cacheEfficiencyData)
  1597. if err != nil {
  1598. log.Infof("Error building cluster costs cache %s", key)
  1599. }
  1600. maxMinutesWithData := 0.0
  1601. for _, cluster := range totals {
  1602. if cluster.DataMinutes > maxMinutesWithData {
  1603. maxMinutesWithData = cluster.DataMinutes
  1604. }
  1605. }
  1606. if len(totals) > 0 && maxMinutesWithData > clusterCostsCacheMinutes {
  1607. a.ClusterCostsCache.Set(key, totals, a.GetCacheExpiration(window.Duration()))
  1608. log.Infof("caching %s cluster costs for %s", fmtDuration, a.GetCacheExpiration(window.Duration()))
  1609. } else {
  1610. log.Warnf("not caching %s cluster costs: no data or less than %f minutes data ", fmtDuration, clusterCostsCacheMinutes)
  1611. }
  1612. return aggErr, err
  1613. }
  1614. // 1 day
  1615. go func(sem *util.Semaphore) {
  1616. defer errors.HandlePanic()
  1617. offset := time.Minute
  1618. duration := 24 * time.Hour
  1619. for {
  1620. sem.Acquire()
  1621. warmFunc(duration, offset, true)
  1622. sem.Return()
  1623. log.Infof("aggregation: warm cache: %s", timeutil.DurationString(duration))
  1624. time.Sleep(a.GetCacheRefresh(duration))
  1625. }
  1626. }(sem)
  1627. if !env.IsETLEnabled() {
  1628. // 2 day
  1629. go func(sem *util.Semaphore) {
  1630. defer errors.HandlePanic()
  1631. offset := time.Minute
  1632. duration := 2 * 24 * time.Hour
  1633. for {
  1634. sem.Acquire()
  1635. warmFunc(duration, offset, false)
  1636. sem.Return()
  1637. log.Infof("aggregation: warm cache: %s", timeutil.DurationString(duration))
  1638. time.Sleep(a.GetCacheRefresh(duration))
  1639. }
  1640. }(sem)
  1641. // 7 day
  1642. go func(sem *util.Semaphore) {
  1643. defer errors.HandlePanic()
  1644. offset := time.Minute
  1645. duration := 7 * 24 * time.Hour
  1646. for {
  1647. sem.Acquire()
  1648. aggErr, err := warmFunc(duration, offset, false)
  1649. sem.Return()
  1650. log.Infof("aggregation: warm cache: %s", timeutil.DurationString(duration))
  1651. if aggErr == nil && err == nil {
  1652. time.Sleep(a.GetCacheRefresh(duration))
  1653. } else {
  1654. time.Sleep(5 * time.Minute)
  1655. }
  1656. }
  1657. }(sem)
  1658. // 30 day
  1659. go func(sem *util.Semaphore) {
  1660. defer errors.HandlePanic()
  1661. for {
  1662. offset := time.Minute
  1663. duration := 30 * 24 * time.Hour
  1664. sem.Acquire()
  1665. aggErr, err := warmFunc(duration, offset, false)
  1666. sem.Return()
  1667. if aggErr == nil && err == nil {
  1668. time.Sleep(a.GetCacheRefresh(duration))
  1669. } else {
  1670. time.Sleep(5 * time.Minute)
  1671. }
  1672. }
  1673. }(sem)
  1674. }
  1675. }
  1676. var (
  1677. // Convert UTC-RFC3339 pairs to configured UTC offset
  1678. // e.g. with UTC offset of -0600, 2020-07-01T00:00:00Z becomes
  1679. // 2020-07-01T06:00:00Z == 2020-07-01T00:00:00-0600
  1680. // TODO niko/etl fix the frontend because this is confusing if you're
  1681. // actually asking for UTC time (...Z) and we swap that "Z" out for the
  1682. // configured UTC offset without asking
  1683. rfc3339 = `\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\dZ`
  1684. rfc3339Regex = regexp.MustCompile(fmt.Sprintf(`(%s),(%s)`, rfc3339, rfc3339))
  1685. durRegex = regexp.MustCompile(`^(\d+)(m|h|d|s)$`)
  1686. percentRegex = regexp.MustCompile(`(\d+\.*\d*)%`)
  1687. )
  1688. // AggregateCostModelHandler handles requests to the aggregated cost model API. See
  1689. // ComputeAggregateCostModel for details.
  1690. func (a *Accesses) AggregateCostModelHandler(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  1691. w.Header().Set("Content-Type", "application/json")
  1692. windowStr := r.URL.Query().Get("window")
  1693. match := rfc3339Regex.FindStringSubmatch(windowStr)
  1694. if match != nil {
  1695. start, _ := time.Parse(time.RFC3339, match[1])
  1696. start = start.Add(-env.GetParsedUTCOffset()).In(time.UTC)
  1697. end, _ := time.Parse(time.RFC3339, match[2])
  1698. end = end.Add(-env.GetParsedUTCOffset()).In(time.UTC)
  1699. windowStr = fmt.Sprintf("%sZ,%sZ", start.Format("2006-01-02T15:04:05"), end.Format("2006-01-02T15:04:05Z"))
  1700. }
  1701. // determine duration and offset from query parameters
  1702. window, err := kubecost.ParseWindowWithOffset(windowStr, env.GetParsedUTCOffset())
  1703. if err != nil || window.Start() == nil {
  1704. WriteError(w, BadRequest(fmt.Sprintf("invalid window: %s", err)))
  1705. return
  1706. }
  1707. isDurationStr := durRegex.MatchString(windowStr)
  1708. // legacy offset option should override window offset
  1709. if r.URL.Query().Get("offset") != "" {
  1710. offset := r.URL.Query().Get("offset")
  1711. // Shift window by offset, but only when manually set with separate
  1712. // parameter and window was provided as a duration string. Otherwise,
  1713. // do not alter the (duration, offset) from ParseWindowWithOffset.
  1714. if offset != "1m" && isDurationStr {
  1715. match := durRegex.FindStringSubmatch(offset)
  1716. if match != nil && len(match) == 3 {
  1717. dur := time.Minute
  1718. if match[2] == "h" {
  1719. dur = time.Hour
  1720. }
  1721. if match[2] == "d" {
  1722. dur = 24 * time.Hour
  1723. }
  1724. if match[2] == "s" {
  1725. dur = time.Second
  1726. }
  1727. num, _ := strconv.ParseInt(match[1], 10, 64)
  1728. window = window.Shift(-time.Duration(num) * dur)
  1729. }
  1730. }
  1731. }
  1732. opts := DefaultAggregateQueryOpts()
  1733. // parse remaining query parameters
  1734. namespace := r.URL.Query().Get("namespace")
  1735. cluster := r.URL.Query().Get("cluster")
  1736. labels := r.URL.Query().Get("labels")
  1737. annotations := r.URL.Query().Get("annotations")
  1738. podprefix := r.URL.Query().Get("podprefix")
  1739. field := r.URL.Query().Get("aggregation")
  1740. sharedNamespaces := r.URL.Query().Get("sharedNamespaces")
  1741. sharedLabelNames := r.URL.Query().Get("sharedLabelNames")
  1742. sharedLabelValues := r.URL.Query().Get("sharedLabelValues")
  1743. remote := r.URL.Query().Get("remote") != "false"
  1744. subfieldStr := r.URL.Query().Get("aggregationSubfield")
  1745. subfields := []string{}
  1746. if len(subfieldStr) > 0 {
  1747. s := strings.Split(r.URL.Query().Get("aggregationSubfield"), ",")
  1748. for _, rawLabel := range s {
  1749. subfields = append(subfields, prom.SanitizeLabelName(rawLabel))
  1750. }
  1751. }
  1752. idleFlag := r.URL.Query().Get("allocateIdle")
  1753. if idleFlag == "default" {
  1754. c, _ := a.CloudProvider.GetConfig()
  1755. opts.AllocateIdle = (c.DefaultIdle == "true")
  1756. } else {
  1757. opts.AllocateIdle = (idleFlag == "true")
  1758. }
  1759. opts.Rate = r.URL.Query().Get("rate")
  1760. opts.ShareSplit = r.URL.Query().Get("sharedSplit")
  1761. // timeSeries == true maintains the time series dimension of the data,
  1762. // which by default gets summed over the entire interval
  1763. opts.IncludeTimeSeries = r.URL.Query().Get("timeSeries") == "true"
  1764. // efficiency has been deprecated in favor of a default to always send efficiency
  1765. opts.IncludeEfficiency = true
  1766. // TODO niko/caching rename "recomputeCache"
  1767. // disableCache, if set to "true", tells this function to recompute and
  1768. // cache the requested data
  1769. opts.DisableCache = r.URL.Query().Get("disableCache") == "true"
  1770. // clearCache, if set to "true", tells this function to flush the cache,
  1771. // then recompute and cache the requested data
  1772. opts.ClearCache = r.URL.Query().Get("clearCache") == "true"
  1773. // noCache avoids the cache altogether, both reading from and writing to
  1774. opts.NoCache = r.URL.Query().Get("noCache") == "true"
  1775. // noExpireCache should only be used by cache warming to set non-expiring caches
  1776. opts.NoExpireCache = false
  1777. // etl triggers ETL adapter
  1778. opts.UseETLAdapter = r.URL.Query().Get("etl") == "true"
  1779. // aggregation field is required
  1780. if field == "" {
  1781. WriteError(w, BadRequest("Missing aggregation field parameter"))
  1782. return
  1783. }
  1784. // aggregation subfield is required when aggregation field is "label"
  1785. if (field == "label" || field == "annotation") && len(subfields) == 0 {
  1786. WriteError(w, BadRequest("Missing aggregation subfield parameter"))
  1787. return
  1788. }
  1789. // enforce one of the available rate options
  1790. if opts.Rate != "" && opts.Rate != "hourly" && opts.Rate != "daily" && opts.Rate != "monthly" {
  1791. WriteError(w, BadRequest("Rate parameter only supports: hourly, daily, monthly or empty"))
  1792. return
  1793. }
  1794. // parse cost data filters
  1795. // namespace and cluster are exact-string-matches
  1796. // labels are expected to be comma-separated and to take the form key=value
  1797. // e.g. app=cost-analyzer,app.kubernetes.io/instance=kubecost
  1798. opts.Filters = map[string]string{
  1799. "namespace": namespace,
  1800. "cluster": cluster,
  1801. "labels": labels,
  1802. "annotations": annotations,
  1803. "podprefix": podprefix,
  1804. }
  1805. // parse shared resources
  1806. sn := []string{}
  1807. sln := []string{}
  1808. slv := []string{}
  1809. if sharedNamespaces != "" {
  1810. sn = strings.Split(sharedNamespaces, ",")
  1811. }
  1812. if sharedLabelNames != "" {
  1813. sln = strings.Split(sharedLabelNames, ",")
  1814. slv = strings.Split(sharedLabelValues, ",")
  1815. if len(sln) != len(slv) || slv[0] == "" {
  1816. WriteError(w, BadRequest("Supply exactly one shared label value per shared label name"))
  1817. return
  1818. }
  1819. }
  1820. if len(sn) > 0 || len(sln) > 0 {
  1821. opts.SharedResources = NewSharedResourceInfo(true, sn, sln, slv)
  1822. }
  1823. // enable remote if it is available and not disabled
  1824. opts.RemoteEnabled = remote && env.IsRemoteEnabled()
  1825. promClient := a.GetPrometheusClient(remote)
  1826. var data map[string]*Aggregation
  1827. var message string
  1828. data, message, err = a.AggAPI.ComputeAggregateCostModel(promClient, window, field, subfields, opts)
  1829. // Find any warnings in http request context
  1830. warning, _ := httputil.GetWarning(r)
  1831. if err != nil {
  1832. if emptyErr, ok := err.(*EmptyDataError); ok {
  1833. if warning == "" {
  1834. w.Write(WrapData(map[string]interface{}{}, emptyErr))
  1835. } else {
  1836. w.Write(WrapDataWithWarning(map[string]interface{}{}, emptyErr, warning))
  1837. }
  1838. return
  1839. }
  1840. if boundaryErr, ok := err.(*kubecost.BoundaryError); ok {
  1841. if window.Start() != nil && window.Start().After(time.Now().Add(-90*24*time.Hour)) {
  1842. // Asking for data within a 90 day period: it will be available
  1843. // after the pipeline builds
  1844. msg := "Data will be available after ETL is built"
  1845. match := percentRegex.FindStringSubmatch(boundaryErr.Message)
  1846. if len(match) > 1 {
  1847. completionPct, err := strconv.ParseFloat(match[1], 64)
  1848. if err == nil {
  1849. msg = fmt.Sprintf("%s (%.1f%% complete)", msg, completionPct)
  1850. }
  1851. }
  1852. WriteError(w, InternalServerError(msg))
  1853. } else {
  1854. // Boundary error outside of 90 day period; may not be available
  1855. WriteError(w, InternalServerError(boundaryErr.Error()))
  1856. }
  1857. return
  1858. }
  1859. errStr := fmt.Sprintf("error computing aggregate cost model: %s", err)
  1860. WriteError(w, InternalServerError(errStr))
  1861. return
  1862. }
  1863. if warning == "" {
  1864. w.Write(WrapDataWithMessage(data, nil, message))
  1865. } else {
  1866. w.Write(WrapDataWithMessageAndWarning(data, nil, message, warning))
  1867. }
  1868. }
  1869. // ParseAggregationProperties attempts to parse and return aggregation properties
  1870. // encoded under the given key. If none exist, or if parsing fails, an error
  1871. // is returned with empty AllocationProperties.
  1872. func ParseAggregationProperties(qp httputil.QueryParams, key string) ([]string, error) {
  1873. aggregateBy := []string{}
  1874. for _, agg := range qp.GetList(key, ",") {
  1875. aggregate := strings.TrimSpace(agg)
  1876. if aggregate != "" {
  1877. if prop, err := kubecost.ParseProperty(aggregate); err == nil {
  1878. aggregateBy = append(aggregateBy, string(prop))
  1879. } else if strings.HasPrefix(aggregate, "label:") {
  1880. aggregateBy = append(aggregateBy, aggregate)
  1881. } else if strings.HasPrefix(aggregate, "annotation:") {
  1882. aggregateBy = append(aggregateBy, aggregate)
  1883. }
  1884. }
  1885. }
  1886. return aggregateBy, nil
  1887. }
  1888. func (a *Accesses) ComputeAllocationHandlerSummary(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  1889. w.Header().Set("Content-Type", "application/json")
  1890. qp := httputil.NewQueryParams(r.URL.Query())
  1891. // Window is a required field describing the window of time over which to
  1892. // compute allocation data.
  1893. window, err := kubecost.ParseWindowWithOffset(qp.Get("window", ""), env.GetParsedUTCOffset())
  1894. if err != nil {
  1895. http.Error(w, fmt.Sprintf("Invalid 'window' parameter: %s", err), http.StatusBadRequest)
  1896. }
  1897. // Step is an optional parameter that defines the duration per-set, i.e.
  1898. // the window for an AllocationSet, of the AllocationSetRange to be
  1899. // computed. Defaults to the window size, making one set.
  1900. step := qp.GetDuration("step", window.Duration())
  1901. // Resolution is an optional parameter, defaulting to the configured ETL
  1902. // resolution.
  1903. resolution := qp.GetDuration("resolution", env.GetETLResolution())
  1904. // Aggregation is a required comma-separated list of fields by which to
  1905. // aggregate results. Some fields allow a sub-field, which is distinguished
  1906. // with a colon; e.g. "label:app".
  1907. // Examples: "namespace", "namespace,label:app"
  1908. aggregateBy, err := ParseAggregationProperties(qp, "aggregate")
  1909. if err != nil {
  1910. http.Error(w, fmt.Sprintf("Invalid 'aggregate' parameter: %s", err), http.StatusBadRequest)
  1911. }
  1912. // Accumulate is an optional parameter, defaulting to false, which if true
  1913. // sums each Set in the Range, producing one Set.
  1914. accumulate := qp.GetBool("accumulate", false)
  1915. // Query for AllocationSets in increments of the given step duration,
  1916. // appending each to the AllocationSetRange.
  1917. asr := kubecost.NewAllocationSetRange()
  1918. stepStart := *window.Start()
  1919. for window.End().After(stepStart) {
  1920. stepEnd := stepStart.Add(step)
  1921. stepWindow := kubecost.NewWindow(&stepStart, &stepEnd)
  1922. as, err := a.Model.ComputeAllocation(*stepWindow.Start(), *stepWindow.End(), resolution)
  1923. if err != nil {
  1924. WriteError(w, InternalServerError(err.Error()))
  1925. return
  1926. }
  1927. asr.Append(as)
  1928. stepStart = stepEnd
  1929. }
  1930. // Aggregate, if requested
  1931. if len(aggregateBy) > 0 {
  1932. err = asr.AggregateBy(aggregateBy, nil)
  1933. if err != nil {
  1934. WriteError(w, InternalServerError(err.Error()))
  1935. return
  1936. }
  1937. }
  1938. // Accumulate, if requested
  1939. if accumulate {
  1940. as, err := asr.Accumulate()
  1941. if err != nil {
  1942. WriteError(w, InternalServerError(err.Error()))
  1943. return
  1944. }
  1945. asr = kubecost.NewAllocationSetRange(as)
  1946. }
  1947. sasl := []*kubecost.SummaryAllocationSet{}
  1948. for _, as := range asr.Slice() {
  1949. sas := kubecost.NewSummaryAllocationSet(as, []kubecost.AllocationMatchFunc{}, []kubecost.AllocationMatchFunc{}, false, false)
  1950. sasl = append(sasl, sas)
  1951. }
  1952. sasr := kubecost.NewSummaryAllocationSetRange(sasl...)
  1953. w.Write(WrapData(sasr, nil))
  1954. }
  1955. // ComputeAllocationHandler computes an AllocationSetRange from the CostModel.
  1956. func (a *Accesses) ComputeAllocationHandler(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  1957. w.Header().Set("Content-Type", "application/json")
  1958. qp := httputil.NewQueryParams(r.URL.Query())
  1959. // Window is a required field describing the window of time over which to
  1960. // compute allocation data.
  1961. window, err := kubecost.ParseWindowWithOffset(qp.Get("window", ""), env.GetParsedUTCOffset())
  1962. if err != nil {
  1963. http.Error(w, fmt.Sprintf("Invalid 'window' parameter: %s", err), http.StatusBadRequest)
  1964. }
  1965. // Step is an optional parameter that defines the duration per-set, i.e.
  1966. // the window for an AllocationSet, of the AllocationSetRange to be
  1967. // computed. Defaults to the window size, making one set.
  1968. step := qp.GetDuration("step", window.Duration())
  1969. // Resolution is an optional parameter, defaulting to the configured ETL
  1970. // resolution.
  1971. resolution := qp.GetDuration("resolution", env.GetETLResolution())
  1972. // Aggregation is a required comma-separated list of fields by which to
  1973. // aggregate results. Some fields allow a sub-field, which is distinguished
  1974. // with a colon; e.g. "label:app".
  1975. // Examples: "namespace", "namespace,label:app"
  1976. aggregateBy, err := ParseAggregationProperties(qp, "aggregate")
  1977. if err != nil {
  1978. http.Error(w, fmt.Sprintf("Invalid 'aggregate' parameter: %s", err), http.StatusBadRequest)
  1979. }
  1980. // Accumulate is an optional parameter, defaulting to false, which if true
  1981. // sums each Set in the Range, producing one Set.
  1982. accumulate := qp.GetBool("accumulate", false)
  1983. // AccumulateBy is an optional parameter that accumulates an AllocationSetRange
  1984. // by the resolution of the given time duration.
  1985. // Defaults to 0. If a value is not passed then the parameter is not used.
  1986. accumulateBy := qp.GetDuration("accumulateBy", 0)
  1987. // Query for AllocationSets in increments of the given step duration,
  1988. // appending each to the AllocationSetRange.
  1989. asr := kubecost.NewAllocationSetRange()
  1990. stepStart := *window.Start()
  1991. for window.End().After(stepStart) {
  1992. stepEnd := stepStart.Add(step)
  1993. stepWindow := kubecost.NewWindow(&stepStart, &stepEnd)
  1994. as, err := a.Model.ComputeAllocation(*stepWindow.Start(), *stepWindow.End(), resolution)
  1995. if err != nil {
  1996. WriteError(w, InternalServerError(err.Error()))
  1997. return
  1998. }
  1999. asr.Append(as)
  2000. stepStart = stepEnd
  2001. }
  2002. // Aggregate, if requested
  2003. if len(aggregateBy) > 0 {
  2004. err = asr.AggregateBy(aggregateBy, nil)
  2005. if err != nil {
  2006. WriteError(w, InternalServerError(err.Error()))
  2007. return
  2008. }
  2009. }
  2010. // Accumulate, if requested
  2011. if accumulateBy != 0 {
  2012. asr, err = asr.AccumulateBy(accumulateBy)
  2013. if err != nil {
  2014. WriteError(w, InternalServerError(err.Error()))
  2015. return
  2016. }
  2017. } else if accumulate {
  2018. as, err := asr.Accumulate()
  2019. if err != nil {
  2020. WriteError(w, InternalServerError(err.Error()))
  2021. return
  2022. }
  2023. asr = kubecost.NewAllocationSetRange(as)
  2024. }
  2025. w.Write(WrapData(asr, nil))
  2026. }
  2027. // The below was transferred from a different package in order to maintain
  2028. // previous behavior. Ultimately, we should clean this up at some point.
  2029. // TODO move to util and/or standardize everything
  2030. type Error struct {
  2031. StatusCode int
  2032. Body string
  2033. }
  2034. func WriteError(w http.ResponseWriter, err Error) {
  2035. status := err.StatusCode
  2036. if status == 0 {
  2037. status = http.StatusInternalServerError
  2038. }
  2039. w.WriteHeader(status)
  2040. resp, _ := json.Marshal(&Response{
  2041. Code: status,
  2042. Message: fmt.Sprintf("Error: %s", err.Body),
  2043. })
  2044. w.Write(resp)
  2045. }
  2046. func BadRequest(message string) Error {
  2047. return Error{
  2048. StatusCode: http.StatusBadRequest,
  2049. Body: message,
  2050. }
  2051. }
  2052. func InternalServerError(message string) Error {
  2053. if message == "" {
  2054. message = "Internal Server Error"
  2055. }
  2056. return Error{
  2057. StatusCode: http.StatusInternalServerError,
  2058. Body: message,
  2059. }
  2060. }
  2061. func NotFound() Error {
  2062. return Error{
  2063. StatusCode: http.StatusNotFound,
  2064. Body: "Not Found",
  2065. }
  2066. }