aggregation.go 77 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133
  1. package costmodel
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "math"
  6. "net/http"
  7. "regexp"
  8. "sort"
  9. "strconv"
  10. "strings"
  11. "time"
  12. "github.com/julienschmidt/httprouter"
  13. "github.com/kubecost/cost-model/pkg/cloud"
  14. "github.com/kubecost/cost-model/pkg/env"
  15. "github.com/kubecost/cost-model/pkg/errors"
  16. "github.com/kubecost/cost-model/pkg/kubecost"
  17. "github.com/kubecost/cost-model/pkg/log"
  18. "github.com/kubecost/cost-model/pkg/prom"
  19. "github.com/kubecost/cost-model/pkg/thanos"
  20. "github.com/kubecost/cost-model/pkg/util"
  21. "github.com/patrickmn/go-cache"
  22. prometheusClient "github.com/prometheus/client_golang/api"
  23. "k8s.io/klog"
  24. )
  25. const (
  26. // SplitTypeWeighted signals that shared costs should be shared
  27. // proportionally, rather than evenly
  28. SplitTypeWeighted = "weighted"
  29. // UnallocatedSubfield indicates an allocation datum that does not have the
  30. // chosen Aggregator; e.g. during aggregation by some label, there may be
  31. // cost data that do not have the given label.
  32. UnallocatedSubfield = "__unallocated__"
  33. clusterCostsCacheMinutes = 5.0
  34. )
  35. // Aggregation describes aggregated cost data, containing cumulative cost and
  36. // allocation data per resource, vectors of rate data per resource, efficiency
  37. // data, and metadata describing the type of aggregation operation.
  38. type Aggregation struct {
  39. Aggregator string `json:"aggregation"`
  40. Subfields []string `json:"subfields,omitempty"`
  41. Environment string `json:"environment"`
  42. Cluster string `json:"cluster,omitempty"`
  43. Properties *kubecost.Properties `json:"-"`
  44. CPUAllocationHourlyAverage float64 `json:"cpuAllocationAverage"`
  45. CPUAllocationVectors []*util.Vector `json:"-"`
  46. CPUAllocationTotal float64 `json:"-"`
  47. CPUCost float64 `json:"cpuCost"`
  48. CPUCostVector []*util.Vector `json:"cpuCostVector,omitempty"`
  49. CPUEfficiency float64 `json:"cpuEfficiency"`
  50. CPURequestedVectors []*util.Vector `json:"-"`
  51. CPUUsedVectors []*util.Vector `json:"-"`
  52. Efficiency float64 `json:"efficiency"`
  53. GPUAllocationHourlyAverage float64 `json:"gpuAllocationAverage"`
  54. GPUAllocationVectors []*util.Vector `json:"-"`
  55. GPUCost float64 `json:"gpuCost"`
  56. GPUCostVector []*util.Vector `json:"gpuCostVector,omitempty"`
  57. GPUAllocationTotal float64 `json:"-"`
  58. RAMAllocationHourlyAverage float64 `json:"ramAllocationAverage"`
  59. RAMAllocationVectors []*util.Vector `json:"-"`
  60. RAMAllocationTotal float64 `json:"-"`
  61. RAMCost float64 `json:"ramCost"`
  62. RAMCostVector []*util.Vector `json:"ramCostVector,omitempty"`
  63. RAMEfficiency float64 `json:"ramEfficiency"`
  64. RAMRequestedVectors []*util.Vector `json:"-"`
  65. RAMUsedVectors []*util.Vector `json:"-"`
  66. PVAllocationHourlyAverage float64 `json:"pvAllocationAverage"`
  67. PVAllocationVectors []*util.Vector `json:"-"`
  68. PVAllocationTotal float64 `json:"-"`
  69. PVCost float64 `json:"pvCost"`
  70. PVCostVector []*util.Vector `json:"pvCostVector,omitempty"`
  71. NetworkCost float64 `json:"networkCost"`
  72. NetworkCostVector []*util.Vector `json:"networkCostVector,omitempty"`
  73. SharedCost float64 `json:"sharedCost"`
  74. TotalCost float64 `json:"totalCost"`
  75. TotalCostVector []*util.Vector `json:"totalCostVector,omitempty"`
  76. }
  77. // TotalHours determines the amount of hours the Aggregation covers, as a
  78. // function of the cost vectors and the resolution of those vectors' data
  79. func (a *Aggregation) TotalHours(resolutionHours float64) float64 {
  80. length := 1
  81. if length < len(a.CPUCostVector) {
  82. length = len(a.CPUCostVector)
  83. }
  84. if length < len(a.RAMCostVector) {
  85. length = len(a.RAMCostVector)
  86. }
  87. if length < len(a.PVCostVector) {
  88. length = len(a.PVCostVector)
  89. }
  90. if length < len(a.GPUCostVector) {
  91. length = len(a.GPUCostVector)
  92. }
  93. if length < len(a.NetworkCostVector) {
  94. length = len(a.NetworkCostVector)
  95. }
  96. return float64(length) * resolutionHours
  97. }
  98. // RateCoefficient computes the coefficient by which the total cost needs to be
  99. // multiplied in order to convert totals costs into per-rate costs.
  100. func (a *Aggregation) RateCoefficient(rateStr string, resolutionHours float64) float64 {
  101. // monthly rate = (730.0)*(total cost)/(total hours)
  102. // daily rate = (24.0)*(total cost)/(total hours)
  103. // hourly rate = (1.0)*(total cost)/(total hours)
  104. // default to hourly rate
  105. coeff := 1.0
  106. switch rateStr {
  107. case "daily":
  108. coeff = util.HoursPerDay
  109. case "monthly":
  110. coeff = util.HoursPerMonth
  111. }
  112. return coeff / a.TotalHours(resolutionHours)
  113. }
  114. type SharedResourceInfo struct {
  115. ShareResources bool
  116. SharedNamespace map[string]bool
  117. LabelSelectors map[string]map[string]bool
  118. }
  119. type SharedCostInfo struct {
  120. Name string
  121. Cost float64
  122. ShareType string
  123. }
  124. func (s *SharedResourceInfo) IsSharedResource(costDatum *CostData) bool {
  125. // exists in a shared namespace
  126. if _, ok := s.SharedNamespace[costDatum.Namespace]; ok {
  127. return true
  128. }
  129. // has at least one shared label (OR, not AND in the case of multiple labels)
  130. for labelName, labelValues := range s.LabelSelectors {
  131. if val, ok := costDatum.Labels[labelName]; ok && labelValues[val] {
  132. return true
  133. }
  134. }
  135. return false
  136. }
  137. func NewSharedResourceInfo(shareResources bool, sharedNamespaces []string, labelNames []string, labelValues []string) *SharedResourceInfo {
  138. sr := &SharedResourceInfo{
  139. ShareResources: shareResources,
  140. SharedNamespace: make(map[string]bool),
  141. LabelSelectors: make(map[string]map[string]bool),
  142. }
  143. for _, ns := range sharedNamespaces {
  144. sr.SharedNamespace[strings.Trim(ns, " ")] = true
  145. }
  146. // Creating a map of label name to label value, but only if
  147. // the cardinality matches
  148. if len(labelNames) == len(labelValues) {
  149. for i := range labelNames {
  150. cleanedLname := prom.SanitizeLabelName(strings.Trim(labelNames[i], " "))
  151. if values, ok := sr.LabelSelectors[cleanedLname]; ok {
  152. values[strings.Trim(labelValues[i], " ")] = true
  153. } else {
  154. sr.LabelSelectors[cleanedLname] = map[string]bool{strings.Trim(labelValues[i], " "): true}
  155. }
  156. }
  157. }
  158. return sr
  159. }
  160. func GetTotalContainerCost(costData map[string]*CostData, rate string, cp cloud.Provider, discount float64, customDiscount float64, idleCoefficients map[string]float64) float64 {
  161. totalContainerCost := 0.0
  162. for _, costDatum := range costData {
  163. clusterID := costDatum.ClusterID
  164. cpuv, ramv, gpuv, pvvs, netv := getPriceVectors(cp, costDatum, rate, discount, customDiscount, idleCoefficients[clusterID])
  165. totalContainerCost += totalVectors(cpuv)
  166. totalContainerCost += totalVectors(ramv)
  167. totalContainerCost += totalVectors(gpuv)
  168. for _, pv := range pvvs {
  169. totalContainerCost += totalVectors(pv)
  170. }
  171. totalContainerCost += totalVectors(netv)
  172. }
  173. return totalContainerCost
  174. }
  175. func (a *Accesses) ComputeIdleCoefficient(costData map[string]*CostData, cli prometheusClient.Client, cp cloud.Provider, discount float64, customDiscount float64, windowString, offset string) (map[string]float64, error) {
  176. coefficients := make(map[string]float64)
  177. profileName := "ComputeIdleCoefficient: ComputeClusterCosts"
  178. profileStart := time.Now()
  179. var clusterCosts map[string]*ClusterCosts
  180. var err error
  181. key := fmt.Sprintf("%s:%s", windowString, offset)
  182. if data, valid := a.ClusterCostsCache.Get(key); valid {
  183. clusterCosts = data.(map[string]*ClusterCosts)
  184. } else {
  185. clusterCosts, err = a.ComputeClusterCosts(cli, cp, windowString, offset, false)
  186. if err != nil {
  187. return nil, err
  188. }
  189. }
  190. measureTime(profileStart, profileThreshold, profileName)
  191. for cid, costs := range clusterCosts {
  192. if costs.CPUCumulative == 0 && costs.RAMCumulative == 0 && costs.StorageCumulative == 0 {
  193. klog.V(1).Infof("[Warning] No ClusterCosts data for cluster '%s'. Is it emitting data?", cid)
  194. coefficients[cid] = 1.0
  195. continue
  196. }
  197. if costs.TotalCumulative == 0 {
  198. return nil, fmt.Errorf("TotalCumulative cluster cost for cluster '%s' returned 0 over window '%s' offset '%s'", cid, windowString, offset)
  199. }
  200. totalContainerCost := 0.0
  201. for _, costDatum := range costData {
  202. if costDatum.ClusterID == cid {
  203. cpuv, ramv, gpuv, pvvs, _ := getPriceVectors(cp, costDatum, "", discount, customDiscount, 1)
  204. totalContainerCost += totalVectors(cpuv)
  205. totalContainerCost += totalVectors(ramv)
  206. totalContainerCost += totalVectors(gpuv)
  207. for _, pv := range pvvs {
  208. totalContainerCost += totalVectors(pv)
  209. }
  210. }
  211. }
  212. coeff := totalContainerCost / costs.TotalCumulative
  213. coefficients[cid] = coeff
  214. }
  215. return coefficients, nil
  216. }
  217. // AggregationOptions provides optional parameters to AggregateCostData, allowing callers to perform more complex operations
  218. type AggregationOptions struct {
  219. Discount float64 // percent by which to discount CPU, RAM, and GPU cost
  220. CustomDiscount float64 // additional custom discount applied to all prices
  221. IdleCoefficients map[string]float64 // scales costs by amount of idle resources on a per-cluster basis
  222. IncludeEfficiency bool // set to true to receive efficiency/usage data
  223. IncludeTimeSeries bool // set to true to receive time series data
  224. Rate string // set to "hourly", "daily", or "monthly" to receive cost rate, rather than cumulative cost
  225. ResolutionHours float64
  226. SharedResourceInfo *SharedResourceInfo
  227. SharedCosts map[string]*SharedCostInfo
  228. FilteredContainerCount int
  229. FilteredEnvironments map[string]int
  230. SharedSplit string
  231. TotalContainerCost float64
  232. }
  233. // Helper method to test request/usgae values against allocation averages for efficiency scores. Generate a warning log if
  234. // clamp is required
  235. func clampAverage(requestsAvg float64, usedAverage float64, allocationAvg float64, resource string) (float64, float64) {
  236. rAvg := requestsAvg
  237. if rAvg > allocationAvg {
  238. klog.V(4).Infof("[Warning] Average %s Requested (%f) > Average %s Allocated (%f). Clamping.", resource, rAvg, resource, allocationAvg)
  239. rAvg = allocationAvg
  240. }
  241. uAvg := usedAverage
  242. if uAvg > allocationAvg {
  243. klog.V(4).Infof("[Warning]: Average %s Used (%f) > Average %s Allocated (%f). Clamping.", resource, uAvg, resource, allocationAvg)
  244. uAvg = allocationAvg
  245. }
  246. return rAvg, uAvg
  247. }
  248. // AggregateCostData aggregates raw cost data by field; e.g. namespace, cluster, service, or label. In the case of label, callers
  249. // must pass a slice of subfields indicating the labels by which to group. Provider is used to define custom resource pricing.
  250. // See AggregationOptions for optional parameters.
  251. func AggregateCostData(costData map[string]*CostData, field string, subfields []string, cp cloud.Provider, opts *AggregationOptions) map[string]*Aggregation {
  252. discount := opts.Discount
  253. customDiscount := opts.CustomDiscount
  254. idleCoefficients := opts.IdleCoefficients
  255. includeTimeSeries := opts.IncludeTimeSeries
  256. includeEfficiency := opts.IncludeEfficiency
  257. rate := opts.Rate
  258. sr := opts.SharedResourceInfo
  259. resolutionHours := 1.0
  260. if opts.ResolutionHours > 0.0 {
  261. resolutionHours = opts.ResolutionHours
  262. }
  263. if idleCoefficients == nil {
  264. idleCoefficients = make(map[string]float64)
  265. }
  266. // aggregations collects key-value pairs of resource group-to-aggregated data
  267. // e.g. namespace-to-data or label-value-to-data
  268. aggregations := make(map[string]*Aggregation)
  269. // sharedResourceCost is the running total cost of resources that should be reported
  270. // as shared across all other resources, rather than reported as a stand-alone category
  271. sharedResourceCost := 0.0
  272. for _, costDatum := range costData {
  273. idleCoefficient, ok := idleCoefficients[costDatum.ClusterID]
  274. if !ok {
  275. idleCoefficient = 1.0
  276. }
  277. if sr != nil && sr.ShareResources && sr.IsSharedResource(costDatum) {
  278. cpuv, ramv, gpuv, pvvs, netv := getPriceVectors(cp, costDatum, rate, discount, customDiscount, idleCoefficient)
  279. sharedResourceCost += totalVectors(cpuv)
  280. sharedResourceCost += totalVectors(ramv)
  281. sharedResourceCost += totalVectors(gpuv)
  282. sharedResourceCost += totalVectors(netv)
  283. for _, pv := range pvvs {
  284. sharedResourceCost += totalVectors(pv)
  285. }
  286. } else {
  287. if field == "cluster" {
  288. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.ClusterID, discount, customDiscount, idleCoefficient, false)
  289. } else if field == "node" {
  290. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.NodeName, discount, customDiscount, idleCoefficient, false)
  291. } else if field == "namespace" {
  292. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace, discount, customDiscount, idleCoefficient, false)
  293. } else if field == "service" {
  294. if len(costDatum.Services) > 0 {
  295. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace+"/"+costDatum.Services[0], discount, customDiscount, idleCoefficient, false)
  296. } else {
  297. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, UnallocatedSubfield, discount, customDiscount, idleCoefficient, false)
  298. }
  299. } else if field == "deployment" {
  300. if len(costDatum.Deployments) > 0 {
  301. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace+"/"+costDatum.Deployments[0], discount, customDiscount, idleCoefficient, false)
  302. } else {
  303. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, UnallocatedSubfield, discount, customDiscount, idleCoefficient, false)
  304. }
  305. } else if field == "statefulset" {
  306. if len(costDatum.Statefulsets) > 0 {
  307. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace+"/"+costDatum.Statefulsets[0], discount, customDiscount, idleCoefficient, false)
  308. } else {
  309. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, UnallocatedSubfield, discount, customDiscount, idleCoefficient, false)
  310. }
  311. } else if field == "daemonset" {
  312. if len(costDatum.Daemonsets) > 0 {
  313. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace+"/"+costDatum.Daemonsets[0], discount, customDiscount, idleCoefficient, false)
  314. } else {
  315. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, UnallocatedSubfield, discount, customDiscount, idleCoefficient, false)
  316. }
  317. } else if field == "controller" {
  318. if controller, kind, hasController := costDatum.GetController(); hasController {
  319. key := fmt.Sprintf("%s/%s:%s", costDatum.Namespace, kind, controller)
  320. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, key, discount, customDiscount, idleCoefficient, false)
  321. } else {
  322. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, UnallocatedSubfield, discount, customDiscount, idleCoefficient, false)
  323. }
  324. } else if field == "label" {
  325. found := false
  326. if costDatum.Labels != nil {
  327. for _, sf := range subfields {
  328. if subfieldName, ok := costDatum.Labels[sf]; ok {
  329. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, subfieldName, discount, customDiscount, idleCoefficient, false)
  330. found = true
  331. break
  332. }
  333. }
  334. }
  335. if !found {
  336. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, UnallocatedSubfield, discount, customDiscount, idleCoefficient, false)
  337. }
  338. } else if field == "pod" {
  339. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace+"/"+costDatum.PodName, discount, customDiscount, idleCoefficient, false)
  340. } else if field == "container" {
  341. key := fmt.Sprintf("%s/%s/%s/%s", costDatum.ClusterID, costDatum.Namespace, costDatum.PodName, costDatum.Name)
  342. aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, key, discount, customDiscount, idleCoefficient, true)
  343. }
  344. }
  345. }
  346. for key, agg := range aggregations {
  347. sharedCoefficient := 1 / float64(len(opts.FilteredEnvironments)+len(aggregations))
  348. agg.CPUCost = totalVectors(agg.CPUCostVector)
  349. agg.RAMCost = totalVectors(agg.RAMCostVector)
  350. agg.GPUCost = totalVectors(agg.GPUCostVector)
  351. agg.PVCost = totalVectors(agg.PVCostVector)
  352. agg.NetworkCost = totalVectors(agg.NetworkCostVector)
  353. if opts.SharedSplit == SplitTypeWeighted {
  354. d := opts.TotalContainerCost - sharedResourceCost
  355. if d == 0 {
  356. klog.V(1).Infof("[Warning] Total container cost '%f' and shared resource cost '%f are the same'. Setting sharedCoefficient to 1", opts.TotalContainerCost, sharedResourceCost)
  357. sharedCoefficient = 1.0
  358. } else {
  359. sharedCoefficient = (agg.CPUCost + agg.RAMCost + agg.GPUCost + agg.PVCost + agg.NetworkCost) / d
  360. }
  361. }
  362. agg.SharedCost = sharedResourceCost * sharedCoefficient
  363. for _, v := range opts.SharedCosts {
  364. agg.SharedCost += v.Cost * sharedCoefficient
  365. }
  366. if rate != "" {
  367. rateCoeff := agg.RateCoefficient(rate, resolutionHours)
  368. agg.CPUCost *= rateCoeff
  369. agg.RAMCost *= rateCoeff
  370. agg.GPUCost *= rateCoeff
  371. agg.PVCost *= rateCoeff
  372. agg.NetworkCost *= rateCoeff
  373. agg.SharedCost *= rateCoeff
  374. }
  375. agg.TotalCost = agg.CPUCost + agg.RAMCost + agg.GPUCost + agg.PVCost + agg.NetworkCost + agg.SharedCost
  376. // Evicted and Completed Pods can still show up here, but have 0 cost.
  377. // Filter these by default. Any reason to keep them?
  378. if agg.TotalCost == 0 {
  379. delete(aggregations, key)
  380. continue
  381. }
  382. // CPU, RAM, and PV allocation are cumulative per-datum, whereas GPU is rate per-datum
  383. agg.CPUAllocationHourlyAverage = totalVectors(agg.CPUAllocationVectors) / agg.TotalHours(resolutionHours)
  384. agg.RAMAllocationHourlyAverage = totalVectors(agg.RAMAllocationVectors) / agg.TotalHours(resolutionHours)
  385. agg.GPUAllocationHourlyAverage = averageVectors(agg.GPUAllocationVectors)
  386. agg.PVAllocationHourlyAverage = totalVectors(agg.PVAllocationVectors) / agg.TotalHours(resolutionHours)
  387. // TODO niko/etl does this check out for GPU data? Do we need to rewrite GPU queries to be
  388. // culumative?
  389. agg.CPUAllocationTotal = totalVectors(agg.CPUAllocationVectors)
  390. agg.GPUAllocationTotal = totalVectors(agg.GPUAllocationVectors)
  391. agg.PVAllocationTotal = totalVectors(agg.PVAllocationVectors)
  392. agg.RAMAllocationTotal = totalVectors(agg.RAMAllocationVectors)
  393. if includeEfficiency {
  394. // Default both RAM and CPU to 0% efficiency so that a 0-requested, 0-allocated, 0-used situation
  395. // returns 0% efficiency, which should be a red-flag.
  396. //
  397. // If non-zero numbers are available, then efficiency is defined as:
  398. // idlePercentage = (requested - used) / allocated
  399. // efficiency = (1.0 - idlePercentage)
  400. //
  401. // It is possible to score > 100% efficiency, which is meant to be interpreted as a red flag.
  402. // It is not possible to score < 0% efficiency.
  403. agg.CPUEfficiency = 0.0
  404. CPUIdle := 0.0
  405. if agg.CPUAllocationHourlyAverage > 0.0 {
  406. avgCPURequested := averageVectors(agg.CPURequestedVectors)
  407. avgCPUUsed := averageVectors(agg.CPUUsedVectors)
  408. // Clamp averages, log range violations
  409. avgCPURequested, avgCPUUsed = clampAverage(avgCPURequested, avgCPUUsed, agg.CPUAllocationHourlyAverage, "CPU")
  410. CPUIdle = ((avgCPURequested - avgCPUUsed) / agg.CPUAllocationHourlyAverage)
  411. agg.CPUEfficiency = 1.0 - CPUIdle
  412. }
  413. agg.RAMEfficiency = 0.0
  414. RAMIdle := 0.0
  415. if agg.RAMAllocationHourlyAverage > 0.0 {
  416. avgRAMRequested := averageVectors(agg.RAMRequestedVectors)
  417. avgRAMUsed := averageVectors(agg.RAMUsedVectors)
  418. // Clamp averages, log range violations
  419. avgRAMRequested, avgRAMUsed = clampAverage(avgRAMRequested, avgRAMUsed, agg.RAMAllocationHourlyAverage, "RAM")
  420. RAMIdle = ((avgRAMRequested - avgRAMUsed) / agg.RAMAllocationHourlyAverage)
  421. agg.RAMEfficiency = 1.0 - RAMIdle
  422. }
  423. // Score total efficiency by the sum of CPU and RAM efficiency, weighted by their
  424. // respective total costs.
  425. agg.Efficiency = 0.0
  426. if (agg.CPUCost + agg.RAMCost) > 0 {
  427. agg.Efficiency = ((agg.CPUCost * agg.CPUEfficiency) + (agg.RAMCost * agg.RAMEfficiency)) / (agg.CPUCost + agg.RAMCost)
  428. }
  429. }
  430. // convert RAM from bytes to GiB
  431. agg.RAMAllocationHourlyAverage = agg.RAMAllocationHourlyAverage / 1024 / 1024 / 1024
  432. // convert storage from bytes to GiB
  433. agg.PVAllocationHourlyAverage = agg.PVAllocationHourlyAverage / 1024 / 1024 / 1024
  434. // remove time series data if it is not explicitly requested
  435. if !includeTimeSeries {
  436. agg.CPUCostVector = nil
  437. agg.RAMCostVector = nil
  438. agg.GPUCostVector = nil
  439. agg.PVCostVector = nil
  440. agg.NetworkCostVector = nil
  441. agg.TotalCostVector = nil
  442. } else { // otherwise compute a totalcostvector
  443. v1 := addVectors(agg.CPUCostVector, agg.RAMCostVector)
  444. v2 := addVectors(v1, agg.GPUCostVector)
  445. v3 := addVectors(v2, agg.PVCostVector)
  446. v4 := addVectors(v3, agg.NetworkCostVector)
  447. agg.TotalCostVector = v4
  448. }
  449. // Typesafety checks
  450. if math.IsNaN(agg.CPUAllocationHourlyAverage) || math.IsInf(agg.CPUAllocationHourlyAverage, 0) {
  451. klog.V(1).Infof("[Warning] CPUAllocationHourlyAverage is %f for '%s: %s/%s'", agg.CPUAllocationHourlyAverage, agg.Cluster, agg.Aggregator, agg.Environment)
  452. agg.CPUAllocationHourlyAverage = 0
  453. }
  454. if math.IsNaN(agg.CPUCost) || math.IsInf(agg.CPUCost, 0) {
  455. klog.V(1).Infof("[Warning] CPUCost is %f for '%s: %s/%s'", agg.CPUCost, agg.Cluster, agg.Aggregator, agg.Environment)
  456. agg.CPUCost = 0
  457. }
  458. if math.IsNaN(agg.CPUEfficiency) || math.IsInf(agg.CPUEfficiency, 0) {
  459. klog.V(1).Infof("[Warning] CPUEfficiency is %f for '%s: %s/%s'", agg.CPUEfficiency, agg.Cluster, agg.Aggregator, agg.Environment)
  460. agg.CPUEfficiency = 0
  461. }
  462. if math.IsNaN(agg.Efficiency) || math.IsInf(agg.Efficiency, 0) {
  463. klog.V(1).Infof("[Warning] Efficiency is %f for '%s: %s/%s'", agg.Efficiency, agg.Cluster, agg.Aggregator, agg.Environment)
  464. agg.Efficiency = 0
  465. }
  466. if math.IsNaN(agg.GPUAllocationHourlyAverage) || math.IsInf(agg.GPUAllocationHourlyAverage, 0) {
  467. klog.V(1).Infof("[Warning] GPUAllocationHourlyAverage is %f for '%s: %s/%s'", agg.GPUAllocationHourlyAverage, agg.Cluster, agg.Aggregator, agg.Environment)
  468. agg.GPUAllocationHourlyAverage = 0
  469. }
  470. if math.IsNaN(agg.GPUCost) || math.IsInf(agg.GPUCost, 0) {
  471. klog.V(1).Infof("[Warning] GPUCost is %f for '%s: %s/%s'", agg.GPUCost, agg.Cluster, agg.Aggregator, agg.Environment)
  472. agg.GPUCost = 0
  473. }
  474. if math.IsNaN(agg.RAMAllocationHourlyAverage) || math.IsInf(agg.RAMAllocationHourlyAverage, 0) {
  475. klog.V(1).Infof("[Warning] RAMAllocationHourlyAverage is %f for '%s: %s/%s'", agg.RAMAllocationHourlyAverage, agg.Cluster, agg.Aggregator, agg.Environment)
  476. agg.RAMAllocationHourlyAverage = 0
  477. }
  478. if math.IsNaN(agg.RAMCost) || math.IsInf(agg.RAMCost, 0) {
  479. klog.V(1).Infof("[Warning] RAMCost is %f for '%s: %s/%s'", agg.RAMCost, agg.Cluster, agg.Aggregator, agg.Environment)
  480. agg.RAMCost = 0
  481. }
  482. if math.IsNaN(agg.RAMEfficiency) || math.IsInf(agg.RAMEfficiency, 0) {
  483. klog.V(1).Infof("[Warning] RAMEfficiency is %f for '%s: %s/%s'", agg.RAMEfficiency, agg.Cluster, agg.Aggregator, agg.Environment)
  484. agg.RAMEfficiency = 0
  485. }
  486. if math.IsNaN(agg.PVAllocationHourlyAverage) || math.IsInf(agg.PVAllocationHourlyAverage, 0) {
  487. klog.V(1).Infof("[Warning] PVAllocationHourlyAverage is %f for '%s: %s/%s'", agg.PVAllocationHourlyAverage, agg.Cluster, agg.Aggregator, agg.Environment)
  488. agg.PVAllocationHourlyAverage = 0
  489. }
  490. if math.IsNaN(agg.PVCost) || math.IsInf(agg.PVCost, 0) {
  491. klog.V(1).Infof("[Warning] PVCost is %f for '%s: %s/%s'", agg.PVCost, agg.Cluster, agg.Aggregator, agg.Environment)
  492. agg.PVCost = 0
  493. }
  494. if math.IsNaN(agg.NetworkCost) || math.IsInf(agg.NetworkCost, 0) {
  495. klog.V(1).Infof("[Warning] NetworkCost is %f for '%s: %s/%s'", agg.NetworkCost, agg.Cluster, agg.Aggregator, agg.Environment)
  496. agg.NetworkCost = 0
  497. }
  498. if math.IsNaN(agg.SharedCost) || math.IsInf(agg.SharedCost, 0) {
  499. klog.V(1).Infof("[Warning] SharedCost is %f for '%s: %s/%s'", agg.SharedCost, agg.Cluster, agg.Aggregator, agg.Environment)
  500. agg.SharedCost = 0
  501. }
  502. if math.IsNaN(agg.TotalCost) || math.IsInf(agg.TotalCost, 0) {
  503. klog.V(1).Infof("[Warning] TotalCost is %f for '%s: %s/%s'", agg.TotalCost, agg.Cluster, agg.Aggregator, agg.Environment)
  504. agg.TotalCost = 0
  505. }
  506. }
  507. return aggregations
  508. }
  509. func aggregateDatum(cp cloud.Provider, aggregations map[string]*Aggregation, costDatum *CostData, field string, subfields []string, rate string, key string, discount float64, customDiscount float64, idleCoefficient float64, includeProperties bool) {
  510. // add new entry to aggregation results if a new key is encountered
  511. if _, ok := aggregations[key]; !ok {
  512. agg := &Aggregation{
  513. Aggregator: field,
  514. Environment: key,
  515. }
  516. if len(subfields) > 0 {
  517. agg.Subfields = subfields
  518. }
  519. if includeProperties {
  520. props := &kubecost.Properties{}
  521. props.SetCluster(costDatum.ClusterID)
  522. props.SetNode(costDatum.NodeName)
  523. if controller, kind, hasController := costDatum.GetController(); hasController {
  524. props.SetController(controller)
  525. props.SetControllerKind(kind)
  526. }
  527. props.SetLabels(costDatum.Labels)
  528. props.SetNamespace(costDatum.Namespace)
  529. props.SetPod(costDatum.PodName)
  530. props.SetServices(costDatum.Services)
  531. props.SetContainer(costDatum.Name)
  532. agg.Properties = props
  533. }
  534. aggregations[key] = agg
  535. }
  536. mergeVectors(cp, costDatum, aggregations[key], rate, discount, customDiscount, idleCoefficient)
  537. }
  538. func mergeVectors(cp cloud.Provider, costDatum *CostData, aggregation *Aggregation, rate string, discount float64, customDiscount float64, idleCoefficient float64) {
  539. aggregation.CPUAllocationVectors = addVectors(costDatum.CPUAllocation, aggregation.CPUAllocationVectors)
  540. aggregation.CPURequestedVectors = addVectors(costDatum.CPUReq, aggregation.CPURequestedVectors)
  541. aggregation.CPUUsedVectors = addVectors(costDatum.CPUUsed, aggregation.CPUUsedVectors)
  542. aggregation.RAMAllocationVectors = addVectors(costDatum.RAMAllocation, aggregation.RAMAllocationVectors)
  543. aggregation.RAMRequestedVectors = addVectors(costDatum.RAMReq, aggregation.RAMRequestedVectors)
  544. aggregation.RAMUsedVectors = addVectors(costDatum.RAMUsed, aggregation.RAMUsedVectors)
  545. aggregation.GPUAllocationVectors = addVectors(costDatum.GPUReq, aggregation.GPUAllocationVectors)
  546. for _, pvcd := range costDatum.PVCData {
  547. aggregation.PVAllocationVectors = addVectors(pvcd.Values, aggregation.PVAllocationVectors)
  548. }
  549. cpuv, ramv, gpuv, pvvs, netv := getPriceVectors(cp, costDatum, rate, discount, customDiscount, idleCoefficient)
  550. aggregation.CPUCostVector = addVectors(cpuv, aggregation.CPUCostVector)
  551. aggregation.RAMCostVector = addVectors(ramv, aggregation.RAMCostVector)
  552. aggregation.GPUCostVector = addVectors(gpuv, aggregation.GPUCostVector)
  553. aggregation.NetworkCostVector = addVectors(netv, aggregation.NetworkCostVector)
  554. for _, vectorList := range pvvs {
  555. aggregation.PVCostVector = addVectors(aggregation.PVCostVector, vectorList)
  556. }
  557. }
  558. // Returns the blended discounts applied to the node as a result of global discounts and reserved instance
  559. // discounts
  560. func getDiscounts(costDatum *CostData, cpuCost float64, ramCost float64, discount float64) (float64, float64) {
  561. if costDatum.NodeData == nil {
  562. return discount, discount
  563. }
  564. if costDatum.NodeData.IsSpot() {
  565. return 0, 0
  566. }
  567. reserved := costDatum.NodeData.Reserved
  568. // blended discounts
  569. blendedCPUDiscount := discount
  570. blendedRAMDiscount := discount
  571. if reserved != nil && reserved.CPUCost > 0 && reserved.RAMCost > 0 {
  572. reservedCPUDiscount := 0.0
  573. if cpuCost == 0 {
  574. klog.V(1).Infof("[Warning] No cpu cost found for cluster '%s' node '%s'", costDatum.ClusterID, costDatum.NodeName)
  575. } else {
  576. reservedCPUDiscount = 1.0 - (reserved.CPUCost / cpuCost)
  577. }
  578. reservedRAMDiscount := 0.0
  579. if ramCost == 0 {
  580. klog.V(1).Infof("[Warning] No ram cost found for cluster '%s' node '%s'", costDatum.ClusterID, costDatum.NodeName)
  581. } else {
  582. reservedRAMDiscount = 1.0 - (reserved.RAMCost / ramCost)
  583. }
  584. // AWS passes the # of reserved CPU and RAM as -1 to represent "All"
  585. if reserved.ReservedCPU < 0 && reserved.ReservedRAM < 0 {
  586. blendedCPUDiscount = reservedCPUDiscount
  587. blendedRAMDiscount = reservedRAMDiscount
  588. } else {
  589. nodeCPU, ierr := strconv.ParseInt(costDatum.NodeData.VCPU, 10, 64)
  590. nodeRAM, ferr := strconv.ParseFloat(costDatum.NodeData.RAMBytes, 64)
  591. if ierr == nil && ferr == nil {
  592. nodeRAMGB := nodeRAM / 1024 / 1024 / 1024
  593. reservedRAMGB := float64(reserved.ReservedRAM) / 1024 / 1024 / 1024
  594. nonReservedCPU := nodeCPU - reserved.ReservedCPU
  595. nonReservedRAM := nodeRAMGB - reservedRAMGB
  596. if nonReservedCPU == 0 {
  597. blendedCPUDiscount = reservedCPUDiscount
  598. } else {
  599. if nodeCPU == 0 {
  600. klog.V(1).Infof("[Warning] No ram found for cluster '%s' node '%s'", costDatum.ClusterID, costDatum.NodeName)
  601. } else {
  602. blendedCPUDiscount = (float64(reserved.ReservedCPU) * reservedCPUDiscount) + (float64(nonReservedCPU)*discount)/float64(nodeCPU)
  603. }
  604. }
  605. if nonReservedRAM == 0 {
  606. blendedRAMDiscount = reservedRAMDiscount
  607. } else {
  608. if nodeRAMGB == 0 {
  609. klog.V(1).Infof("[Warning] No ram found for cluster '%s' node '%s'", costDatum.ClusterID, costDatum.NodeName)
  610. } else {
  611. blendedRAMDiscount = (reservedRAMGB * reservedRAMDiscount) + (nonReservedRAM*discount)/nodeRAMGB
  612. }
  613. }
  614. }
  615. }
  616. }
  617. return blendedCPUDiscount, blendedRAMDiscount
  618. }
  619. func parseVectorPricing(cfg *cloud.CustomPricing, costDatum *CostData, cpuCostStr, ramCostStr, gpuCostStr, pvCostStr string) (float64, float64, float64, float64, bool) {
  620. usesCustom := false
  621. cpuCost, err := strconv.ParseFloat(cpuCostStr, 64)
  622. if err != nil || math.IsNaN(cpuCost) || math.IsInf(cpuCost, 0) || cpuCost == 0 {
  623. cpuCost, err = strconv.ParseFloat(cfg.CPU, 64)
  624. usesCustom = true
  625. if err != nil || math.IsNaN(cpuCost) || math.IsInf(cpuCost, 0) {
  626. cpuCost = 0
  627. }
  628. }
  629. ramCost, err := strconv.ParseFloat(ramCostStr, 64)
  630. if err != nil || math.IsNaN(ramCost) || math.IsInf(ramCost, 0) || ramCost == 0 {
  631. ramCost, err = strconv.ParseFloat(cfg.RAM, 64)
  632. usesCustom = true
  633. if err != nil || math.IsNaN(ramCost) || math.IsInf(ramCost, 0) {
  634. ramCost = 0
  635. }
  636. }
  637. gpuCost, err := strconv.ParseFloat(gpuCostStr, 64)
  638. if err != nil || math.IsNaN(gpuCost) || math.IsInf(gpuCost, 0) {
  639. gpuCost, err = strconv.ParseFloat(cfg.GPU, 64)
  640. if err != nil || math.IsNaN(gpuCost) || math.IsInf(gpuCost, 0) {
  641. gpuCost = 0
  642. }
  643. }
  644. pvCost, err := strconv.ParseFloat(pvCostStr, 64)
  645. if err != nil || math.IsNaN(cpuCost) || math.IsInf(cpuCost, 0) {
  646. pvCost, err = strconv.ParseFloat(cfg.Storage, 64)
  647. if err != nil || math.IsNaN(pvCost) || math.IsInf(pvCost, 0) {
  648. pvCost = 0
  649. }
  650. }
  651. return cpuCost, ramCost, gpuCost, pvCost, usesCustom
  652. }
  653. func getPriceVectors(cp cloud.Provider, costDatum *CostData, rate string, discount float64, customDiscount float64, idleCoefficient float64) ([]*util.Vector, []*util.Vector, []*util.Vector, [][]*util.Vector, []*util.Vector) {
  654. var cpuCost float64
  655. var ramCost float64
  656. var gpuCost float64
  657. var pvCost float64
  658. var usesCustom bool
  659. // If custom pricing is enabled and can be retrieved, replace
  660. // default cost values with custom values
  661. customPricing, err := cp.GetConfig()
  662. if err != nil {
  663. klog.Errorf("failed to load custom pricing: %s", err)
  664. }
  665. if cloud.CustomPricesEnabled(cp) && err == nil {
  666. var cpuCostStr string
  667. var ramCostStr string
  668. var gpuCostStr string
  669. var pvCostStr string
  670. if costDatum.NodeData.IsSpot() {
  671. cpuCostStr = customPricing.SpotCPU
  672. ramCostStr = customPricing.SpotRAM
  673. gpuCostStr = customPricing.SpotGPU
  674. } else {
  675. cpuCostStr = customPricing.CPU
  676. ramCostStr = customPricing.RAM
  677. gpuCostStr = customPricing.GPU
  678. }
  679. pvCostStr = customPricing.Storage
  680. cpuCost, ramCost, gpuCost, pvCost, usesCustom = parseVectorPricing(customPricing, costDatum, cpuCostStr, ramCostStr, gpuCostStr, pvCostStr)
  681. } else if costDatum.NodeData == nil && err == nil {
  682. cpuCostStr := customPricing.CPU
  683. ramCostStr := customPricing.RAM
  684. gpuCostStr := customPricing.GPU
  685. pvCostStr := customPricing.Storage
  686. cpuCost, ramCost, gpuCost, pvCost, usesCustom = parseVectorPricing(customPricing, costDatum, cpuCostStr, ramCostStr, gpuCostStr, pvCostStr)
  687. } else {
  688. cpuCostStr := costDatum.NodeData.VCPUCost
  689. ramCostStr := costDatum.NodeData.RAMCost
  690. gpuCostStr := costDatum.NodeData.GPUCost
  691. pvCostStr := costDatum.NodeData.StorageCost
  692. cpuCost, ramCost, gpuCost, pvCost, usesCustom = parseVectorPricing(customPricing, costDatum, cpuCostStr, ramCostStr, gpuCostStr, pvCostStr)
  693. }
  694. if usesCustom {
  695. log.DedupedWarningf(5, "No pricing data found for node `%s` , using custom pricing", costDatum.NodeName)
  696. }
  697. cpuDiscount, ramDiscount := getDiscounts(costDatum, cpuCost, ramCost, discount)
  698. klog.V(4).Infof("Node Name: %s", costDatum.NodeName)
  699. klog.V(4).Infof("Blended CPU Discount: %f", cpuDiscount)
  700. klog.V(4).Infof("Blended RAM Discount: %f", ramDiscount)
  701. // TODO should we try to apply the rate coefficient here or leave it as a totals-only metric?
  702. rateCoeff := 1.0
  703. if idleCoefficient == 0 {
  704. idleCoefficient = 1.0
  705. }
  706. cpuv := make([]*util.Vector, 0, len(costDatum.CPUAllocation))
  707. for _, val := range costDatum.CPUAllocation {
  708. cpuv = append(cpuv, &util.Vector{
  709. Timestamp: math.Round(val.Timestamp/10) * 10,
  710. Value: (val.Value * cpuCost * (1 - cpuDiscount) * (1 - customDiscount) / idleCoefficient) * rateCoeff,
  711. })
  712. }
  713. ramv := make([]*util.Vector, 0, len(costDatum.RAMAllocation))
  714. for _, val := range costDatum.RAMAllocation {
  715. ramv = append(ramv, &util.Vector{
  716. Timestamp: math.Round(val.Timestamp/10) * 10,
  717. Value: ((val.Value / 1024 / 1024 / 1024) * ramCost * (1 - ramDiscount) * (1 - customDiscount) / idleCoefficient) * rateCoeff,
  718. })
  719. }
  720. gpuv := make([]*util.Vector, 0, len(costDatum.GPUReq))
  721. for _, val := range costDatum.GPUReq {
  722. gpuv = append(gpuv, &util.Vector{
  723. Timestamp: math.Round(val.Timestamp/10) * 10,
  724. Value: (val.Value * gpuCost * (1 - discount) * (1 - customDiscount) / idleCoefficient) * rateCoeff,
  725. })
  726. }
  727. pvvs := make([][]*util.Vector, 0, len(costDatum.PVCData))
  728. for _, pvcData := range costDatum.PVCData {
  729. pvv := make([]*util.Vector, 0, len(pvcData.Values))
  730. if pvcData.Volume != nil {
  731. cost, _ := strconv.ParseFloat(pvcData.Volume.Cost, 64)
  732. // override with custom pricing if enabled
  733. if cloud.CustomPricesEnabled(cp) {
  734. cost = pvCost
  735. }
  736. for _, val := range pvcData.Values {
  737. pvv = append(pvv, &util.Vector{
  738. Timestamp: math.Round(val.Timestamp/10) * 10,
  739. Value: ((val.Value / 1024 / 1024 / 1024) * cost * (1 - customDiscount) / idleCoefficient) * rateCoeff,
  740. })
  741. }
  742. pvvs = append(pvvs, pvv)
  743. }
  744. }
  745. netv := make([]*util.Vector, 0, len(costDatum.NetworkData))
  746. for _, val := range costDatum.NetworkData {
  747. netv = append(netv, &util.Vector{
  748. Timestamp: math.Round(val.Timestamp/10) * 10,
  749. Value: val.Value,
  750. })
  751. }
  752. return cpuv, ramv, gpuv, pvvs, netv
  753. }
  754. func averageVectors(vectors []*util.Vector) float64 {
  755. if len(vectors) == 0 {
  756. return 0.0
  757. }
  758. return totalVectors(vectors) / float64(len(vectors))
  759. }
  760. func totalVectors(vectors []*util.Vector) float64 {
  761. total := 0.0
  762. for _, vector := range vectors {
  763. total += vector.Value
  764. }
  765. return total
  766. }
  767. // addVectors adds two slices of Vectors. Vector timestamps are rounded to the
  768. // nearest ten seconds to allow matching of Vectors within a delta allowance.
  769. // Matching Vectors are summed, while unmatched Vectors are passed through.
  770. // e.g. [(t=1, 1), (t=2, 2)] + [(t=2, 2), (t=3, 3)] = [(t=1, 1), (t=2, 4), (t=3, 3)]
  771. func addVectors(xvs []*util.Vector, yvs []*util.Vector) []*util.Vector {
  772. sumOp := func(result *util.Vector, x *float64, y *float64) bool {
  773. if x != nil && y != nil {
  774. result.Value = *x + *y
  775. } else if y != nil {
  776. result.Value = *y
  777. } else if x != nil {
  778. result.Value = *x
  779. }
  780. return true
  781. }
  782. return util.ApplyVectorOp(xvs, yvs, sumOp)
  783. }
  784. // minCostDataLength sets the minimum number of time series data required to
  785. // cache both raw and aggregated cost data
  786. const minCostDataLength = 2
  787. // EmptyDataError describes an error caused by empty cost data for some
  788. // defined interval
  789. type EmptyDataError struct {
  790. err error
  791. window kubecost.Window
  792. }
  793. // Error implements the error interface
  794. func (ede *EmptyDataError) Error() string {
  795. err := fmt.Sprintf("empty data for range: %s", ede.window)
  796. if ede.err != nil {
  797. err += fmt.Sprintf(": %s", ede.err)
  798. }
  799. return err
  800. }
  801. func costDataTimeSeriesLength(costData map[string]*CostData) int {
  802. l := 0
  803. for _, cd := range costData {
  804. if l < len(cd.RAMAllocation) {
  805. l = len(cd.RAMAllocation)
  806. }
  807. if l < len(cd.CPUAllocation) {
  808. l = len(cd.CPUAllocation)
  809. }
  810. }
  811. return l
  812. }
  813. // ScaleHourlyCostData converts per-hour cost data to per-resolution data. If the target resolution is higher (i.e. < 1.0h)
  814. // then we can do simple multiplication by the fraction-of-an-hour and retain accuracy. If the target resolution is
  815. // lower (i.e. > 1.0h) then we sum groups of hourly data by resolution to maintain fidelity.
  816. // e.g. (100 hours of per-hour hourly data, resolutionHours=10) => 10 data points, grouped and summed by 10-hour window
  817. // e.g. (20 minutes of per-minute hourly data, resolutionHours=1/60) => 20 data points, scaled down by a factor of 60
  818. func ScaleHourlyCostData(data map[string]*CostData, resolutionHours float64) map[string]*CostData {
  819. scaled := map[string]*CostData{}
  820. for key, datum := range data {
  821. datum.RAMReq = scaleVectorSeries(datum.RAMReq, resolutionHours)
  822. datum.RAMUsed = scaleVectorSeries(datum.RAMUsed, resolutionHours)
  823. datum.RAMAllocation = scaleVectorSeries(datum.RAMAllocation, resolutionHours)
  824. datum.CPUReq = scaleVectorSeries(datum.CPUReq, resolutionHours)
  825. datum.CPUUsed = scaleVectorSeries(datum.CPUUsed, resolutionHours)
  826. datum.CPUAllocation = scaleVectorSeries(datum.CPUAllocation, resolutionHours)
  827. datum.GPUReq = scaleVectorSeries(datum.GPUReq, resolutionHours)
  828. datum.NetworkData = scaleVectorSeries(datum.NetworkData, resolutionHours)
  829. for _, pvcDatum := range datum.PVCData {
  830. pvcDatum.Values = scaleVectorSeries(pvcDatum.Values, resolutionHours)
  831. }
  832. scaled[key] = datum
  833. }
  834. return scaled
  835. }
  836. func scaleVectorSeries(vs []*util.Vector, resolutionHours float64) []*util.Vector {
  837. // if scaling to a lower resolution, compress the hourly data for maximum accuracy
  838. if resolutionHours > 1.0 {
  839. return compressVectorSeries(vs, resolutionHours)
  840. }
  841. // if scaling to a higher resolution, simply scale each value down by the fraction of an hour
  842. for _, v := range vs {
  843. v.Value *= resolutionHours
  844. }
  845. return vs
  846. }
  847. func compressVectorSeries(vs []*util.Vector, resolutionHours float64) []*util.Vector {
  848. if len(vs) == 0 {
  849. return vs
  850. }
  851. compressed := []*util.Vector{}
  852. threshold := float64(60 * 60 * resolutionHours)
  853. var acc *util.Vector
  854. for i, v := range vs {
  855. if acc == nil {
  856. // start a new accumulation from current datum
  857. acc = &util.Vector{
  858. Value: vs[i].Value,
  859. Timestamp: vs[i].Timestamp,
  860. }
  861. continue
  862. }
  863. if v.Timestamp-acc.Timestamp < threshold {
  864. // v should be accumulated in current datum
  865. acc.Value += v.Value
  866. } else {
  867. // v falls outside current datum's threshold; append and start a new one
  868. compressed = append(compressed, acc)
  869. acc = &util.Vector{
  870. Value: vs[i].Value,
  871. Timestamp: vs[i].Timestamp,
  872. }
  873. }
  874. }
  875. // append any remaining, incomplete accumulation
  876. if acc != nil {
  877. compressed = append(compressed, acc)
  878. }
  879. return compressed
  880. }
  881. type AggregateQueryOpts struct {
  882. Rate string
  883. Filters map[string]string
  884. SharedResources *SharedResourceInfo
  885. ShareSplit string
  886. AllocateIdle bool
  887. IncludeTimeSeries bool
  888. IncludeEfficiency bool
  889. DisableCache bool
  890. ClearCache bool
  891. NoCache bool
  892. NoExpireCache bool
  893. RemoteEnabled bool
  894. DisableSharedOverhead bool
  895. UseETLAdapter bool
  896. }
  897. func DefaultAggregateQueryOpts() *AggregateQueryOpts {
  898. return &AggregateQueryOpts{
  899. Rate: "",
  900. Filters: map[string]string{},
  901. SharedResources: nil,
  902. ShareSplit: SplitTypeWeighted,
  903. AllocateIdle: false,
  904. IncludeTimeSeries: true,
  905. IncludeEfficiency: true,
  906. DisableCache: false,
  907. ClearCache: false,
  908. NoCache: false,
  909. NoExpireCache: false,
  910. RemoteEnabled: env.IsRemoteEnabled(),
  911. DisableSharedOverhead: false,
  912. UseETLAdapter: false,
  913. }
  914. }
  915. // ComputeAggregateCostModel computes cost data for the given window, then aggregates it by the given fields.
  916. // Data is cached on two levels: the aggregation is cached as well as the underlying cost data.
  917. func (a *Accesses) ComputeAggregateCostModel(promClient prometheusClient.Client, window kubecost.Window, field string, subfields []string, opts *AggregateQueryOpts) (map[string]*Aggregation, string, error) {
  918. // Window is the range of the query, i.e. (start, end)
  919. // It must be closed, i.e. neither start nor end can be nil
  920. if window.IsOpen() {
  921. return nil, "", fmt.Errorf("illegal window: %s", window)
  922. }
  923. // Resolution is the duration of each datum in the cost model range query,
  924. // which corresponds to both the step size given to Prometheus query_range
  925. // and to the window passed to the range queries.
  926. // i.e. by default, we support 1h resolution for queries of windows defined
  927. // in terms of days or integer multiples of hours (e.g. 1d, 12h)
  928. resolution := time.Hour
  929. // Determine resolution by size of duration and divisibility of window.
  930. // By default, resolution is 1hr. If the window is smaller than 1hr, then
  931. // resolution goes down to 1m. If the window is not a multiple of 1hr, then
  932. // resolution goes down to 1m. If the window is greater than 1d, then
  933. // resolution gets scaled up to improve performance by reducing the amount
  934. // of data being computed.
  935. durMins := int64(math.Trunc(window.Minutes()))
  936. if durMins < 24*60 { // less than 1d
  937. // TODO should we have additional options for going by
  938. // e.g. 30m? 10m? 5m?
  939. if durMins%60 != 0 || durMins < 3*60 { // not divisible by 1h or less than 3h
  940. resolution = time.Minute
  941. }
  942. } else { // greater than 1d
  943. if durMins >= 7*24*60 { // greater than (or equal to) 7 days
  944. resolution = 24.0 * time.Hour
  945. } else if durMins >= 2*24*60 { // greater than (or equal to) 2 days
  946. resolution = 2.0 * time.Hour
  947. }
  948. }
  949. // Parse options
  950. if opts == nil {
  951. opts = DefaultAggregateQueryOpts()
  952. }
  953. rate := opts.Rate
  954. filters := opts.Filters
  955. sri := opts.SharedResources
  956. shared := opts.ShareSplit
  957. allocateIdle := opts.AllocateIdle
  958. includeTimeSeries := opts.IncludeTimeSeries
  959. includeEfficiency := opts.IncludeEfficiency
  960. disableCache := opts.DisableCache
  961. clearCache := opts.ClearCache
  962. noCache := opts.NoCache
  963. noExpireCache := opts.NoExpireCache
  964. remoteEnabled := opts.RemoteEnabled
  965. disableSharedOverhead := opts.DisableSharedOverhead
  966. // retainFuncs override filterFuncs. Make sure shared resources do not
  967. // get filtered out.
  968. retainFuncs := []FilterFunc{}
  969. retainFuncs = append(retainFuncs, func(cd *CostData) (bool, string) {
  970. if sri != nil {
  971. return sri.IsSharedResource(cd), ""
  972. }
  973. return false, ""
  974. })
  975. // Parse cost data filters into FilterFuncs
  976. filterFuncs := []FilterFunc{}
  977. aggregateEnvironment := func(costDatum *CostData) string {
  978. if field == "cluster" {
  979. return costDatum.ClusterID
  980. } else if field == "node" {
  981. return costDatum.NodeName
  982. } else if field == "namespace" {
  983. return costDatum.Namespace
  984. } else if field == "service" {
  985. if len(costDatum.Services) > 0 {
  986. return costDatum.Namespace + "/" + costDatum.Services[0]
  987. }
  988. } else if field == "deployment" {
  989. if len(costDatum.Deployments) > 0 {
  990. return costDatum.Namespace + "/" + costDatum.Deployments[0]
  991. }
  992. } else if field == "daemonset" {
  993. if len(costDatum.Daemonsets) > 0 {
  994. return costDatum.Namespace + "/" + costDatum.Daemonsets[0]
  995. }
  996. } else if field == "statefulset" {
  997. if len(costDatum.Statefulsets) > 0 {
  998. return costDatum.Namespace + "/" + costDatum.Statefulsets[0]
  999. }
  1000. } else if field == "label" {
  1001. if costDatum.Labels != nil {
  1002. for _, sf := range subfields {
  1003. if subfieldName, ok := costDatum.Labels[sf]; ok {
  1004. return fmt.Sprintf("%s=%s", sf, subfieldName)
  1005. }
  1006. }
  1007. }
  1008. } else if field == "pod" {
  1009. return costDatum.Namespace + "/" + costDatum.PodName
  1010. } else if field == "container" {
  1011. return costDatum.Namespace + "/" + costDatum.PodName + "/" + costDatum.Name
  1012. }
  1013. return ""
  1014. }
  1015. if filters["podprefix"] != "" {
  1016. pps := []string{}
  1017. for _, fp := range strings.Split(filters["podprefix"], ",") {
  1018. if fp != "" {
  1019. cleanedFilter := strings.TrimSpace(fp)
  1020. pps = append(pps, cleanedFilter)
  1021. }
  1022. }
  1023. filterFuncs = append(filterFuncs, func(cd *CostData) (bool, string) {
  1024. aggEnv := aggregateEnvironment(cd)
  1025. for _, pp := range pps {
  1026. cleanedFilter := strings.TrimSpace(pp)
  1027. if strings.HasPrefix(cd.PodName, cleanedFilter) {
  1028. return true, aggEnv
  1029. }
  1030. }
  1031. return false, aggEnv
  1032. })
  1033. }
  1034. if filters["namespace"] != "" {
  1035. // namespaces may be comma-separated, e.g. kubecost,default
  1036. // multiple namespaces are evaluated as an OR relationship
  1037. nss := strings.Split(filters["namespace"], ",")
  1038. filterFuncs = append(filterFuncs, func(cd *CostData) (bool, string) {
  1039. aggEnv := aggregateEnvironment(cd)
  1040. for _, ns := range nss {
  1041. nsTrim := strings.TrimSpace(ns)
  1042. if cd.Namespace == nsTrim {
  1043. return true, aggEnv
  1044. } else if strings.HasSuffix(nsTrim, "*") { // trigger wildcard prefix filtering
  1045. nsTrimAsterisk := strings.TrimSuffix(nsTrim, "*")
  1046. if strings.HasPrefix(cd.Namespace, nsTrimAsterisk) {
  1047. return true, aggEnv
  1048. }
  1049. }
  1050. }
  1051. return false, aggEnv
  1052. })
  1053. }
  1054. if filters["node"] != "" {
  1055. // nodes may be comma-separated, e.g. aws-node-1,aws-node-2
  1056. // multiple nodes are evaluated as an OR relationship
  1057. nodes := strings.Split(filters["node"], ",")
  1058. filterFuncs = append(filterFuncs, func(cd *CostData) (bool, string) {
  1059. aggEnv := aggregateEnvironment(cd)
  1060. for _, node := range nodes {
  1061. nodeTrim := strings.TrimSpace(node)
  1062. if cd.NodeName == nodeTrim {
  1063. return true, aggEnv
  1064. } else if strings.HasSuffix(nodeTrim, "*") { // trigger wildcard prefix filtering
  1065. nodeTrimAsterisk := strings.TrimSuffix(nodeTrim, "*")
  1066. if strings.HasPrefix(cd.NodeName, nodeTrimAsterisk) {
  1067. return true, aggEnv
  1068. }
  1069. }
  1070. }
  1071. return false, aggEnv
  1072. })
  1073. }
  1074. if filters["cluster"] != "" {
  1075. // clusters may be comma-separated, e.g. cluster-one,cluster-two
  1076. // multiple clusters are evaluated as an OR relationship
  1077. cs := strings.Split(filters["cluster"], ",")
  1078. filterFuncs = append(filterFuncs, func(cd *CostData) (bool, string) {
  1079. aggEnv := aggregateEnvironment(cd)
  1080. for _, c := range cs {
  1081. cTrim := strings.TrimSpace(c)
  1082. id, name := cd.ClusterID, cd.ClusterName
  1083. if id == cTrim || name == cTrim {
  1084. return true, aggEnv
  1085. } else if strings.HasSuffix(cTrim, "*") { // trigger wildcard prefix filtering
  1086. cTrimAsterisk := strings.TrimSuffix(cTrim, "*")
  1087. if strings.HasPrefix(id, cTrimAsterisk) || strings.HasPrefix(name, cTrimAsterisk) {
  1088. return true, aggEnv
  1089. }
  1090. }
  1091. }
  1092. return false, aggEnv
  1093. })
  1094. }
  1095. if filters["labels"] != "" {
  1096. // labels are expected to be comma-separated and to take the form key=value
  1097. // e.g. app=cost-analyzer,app.kubernetes.io/instance=kubecost
  1098. // each different label will be applied as an AND
  1099. // multiple values for a single label will be evaluated as an OR
  1100. labelValues := map[string][]string{}
  1101. ls := strings.Split(filters["labels"], ",")
  1102. for _, l := range ls {
  1103. lTrim := strings.TrimSpace(l)
  1104. label := strings.Split(lTrim, "=")
  1105. if len(label) == 2 {
  1106. ln := prom.SanitizeLabelName(strings.TrimSpace(label[0]))
  1107. lv := strings.TrimSpace(label[1])
  1108. labelValues[ln] = append(labelValues[ln], lv)
  1109. } else {
  1110. // label is not of the form name=value, so log it and move on
  1111. log.Warningf("ComputeAggregateCostModel: skipping illegal label filter: %s", l)
  1112. }
  1113. }
  1114. // Generate FilterFunc for each set of label filters by invoking a function instead of accessing
  1115. // values by closure to prevent reference-type looping bug.
  1116. // (see https://github.com/golang/go/wiki/CommonMistakes#using-reference-to-loop-iterator-variable)
  1117. for label, values := range labelValues {
  1118. ff := (func(l string, vs []string) FilterFunc {
  1119. return func(cd *CostData) (bool, string) {
  1120. ae := aggregateEnvironment(cd)
  1121. for _, v := range vs {
  1122. if v == "__unallocated__" { // Special case. __unallocated__ means return all pods without the attached label
  1123. if _, ok := cd.Labels[l]; !ok {
  1124. return true, ae
  1125. }
  1126. }
  1127. if cd.Labels[l] == v {
  1128. return true, ae
  1129. } else if strings.HasSuffix(v, "*") { // trigger wildcard prefix filtering
  1130. vTrim := strings.TrimSuffix(v, "*")
  1131. if strings.HasPrefix(cd.Labels[l], vTrim) {
  1132. return true, ae
  1133. }
  1134. }
  1135. }
  1136. return false, ae
  1137. }
  1138. })(label, values)
  1139. filterFuncs = append(filterFuncs, ff)
  1140. }
  1141. }
  1142. if filters["annotations"] != "" {
  1143. // annotations are expected to be comma-separated and to take the form key=value
  1144. // e.g. app=cost-analyzer,app.kubernetes.io/instance=kubecost
  1145. // each different annotation will be applied as an AND
  1146. // multiple values for a single annotation will be evaluated as an OR
  1147. annotationValues := map[string][]string{}
  1148. as := strings.Split(filters["annotations"], ",")
  1149. for _, annot := range as {
  1150. aTrim := strings.TrimSpace(annot)
  1151. annotation := strings.Split(aTrim, "=")
  1152. if len(annotation) == 2 {
  1153. an := prom.SanitizeLabelName(strings.TrimSpace(annotation[0]))
  1154. av := strings.TrimSpace(annotation[1])
  1155. annotationValues[an] = append(annotationValues[an], av)
  1156. } else {
  1157. // annotation is not of the form name=value, so log it and move on
  1158. log.Warningf("ComputeAggregateCostModel: skipping illegal annotation filter: %s", annot)
  1159. }
  1160. }
  1161. // Generate FilterFunc for each set of annotation filters by invoking a function instead of accessing
  1162. // values by closure to prevent reference-type looping bug.
  1163. // (see https://github.com/golang/go/wiki/CommonMistakes#using-reference-to-loop-iterator-variable)
  1164. for annotation, values := range annotationValues {
  1165. ff := (func(l string, vs []string) FilterFunc {
  1166. return func(cd *CostData) (bool, string) {
  1167. ae := aggregateEnvironment(cd)
  1168. for _, v := range vs {
  1169. if v == "__unallocated__" { // Special case. __unallocated__ means return all pods without the attached label
  1170. if _, ok := cd.Annotations[l]; !ok {
  1171. return true, ae
  1172. }
  1173. }
  1174. if cd.Annotations[l] == v {
  1175. return true, ae
  1176. } else if strings.HasSuffix(v, "*") { // trigger wildcard prefix filtering
  1177. vTrim := strings.TrimSuffix(v, "*")
  1178. if strings.HasPrefix(cd.Annotations[l], vTrim) {
  1179. return true, ae
  1180. }
  1181. }
  1182. }
  1183. return false, ae
  1184. }
  1185. })(annotation, values)
  1186. filterFuncs = append(filterFuncs, ff)
  1187. }
  1188. }
  1189. // clear cache prior to checking the cache so that a clearCache=true
  1190. // request always returns a freshly computed value
  1191. if clearCache {
  1192. a.AggregateCache.Flush()
  1193. a.CostDataCache.Flush()
  1194. }
  1195. cacheExpiry := a.GetCacheExpiration(window.Duration())
  1196. if noExpireCache {
  1197. cacheExpiry = cache.NoExpiration
  1198. }
  1199. // parametrize cache key by all request parameters
  1200. aggKey := GenerateAggKey(window, field, subfields, opts)
  1201. thanosOffset := time.Now().Add(-thanos.OffsetDuration())
  1202. if a.ThanosClient != nil && window.End().After(thanosOffset) {
  1203. log.Infof("ComputeAggregateCostModel: setting end time backwards to first present data")
  1204. // Apply offsets to both end and start times to maintain correct time range
  1205. deltaDuration := window.End().Sub(thanosOffset)
  1206. s := window.Start().Add(-1 * deltaDuration)
  1207. e := time.Now().Add(-thanos.OffsetDuration())
  1208. window.Set(&s, &e)
  1209. }
  1210. dur, off := window.DurationOffsetStrings()
  1211. key := fmt.Sprintf(`%s:%s:%fh:%t`, dur, off, resolution.Hours(), remoteEnabled)
  1212. // report message about which of the two caches hit. by default report a miss
  1213. cacheMessage := fmt.Sprintf("ComputeAggregateCostModel: L1 cache miss: %s L2 cache miss: %s", aggKey, key)
  1214. // check the cache for aggregated response; if cache is hit and not disabled, return response
  1215. if value, found := a.AggregateCache.Get(aggKey); found && !disableCache && !noCache {
  1216. result, ok := value.(map[string]*Aggregation)
  1217. if !ok {
  1218. // disable cache and recompute if type cast fails
  1219. log.Errorf("ComputeAggregateCostModel: caching error: failed to cast aggregate data to struct: %s", aggKey)
  1220. return a.ComputeAggregateCostModel(promClient, window, field, subfields, opts)
  1221. }
  1222. return result, fmt.Sprintf("aggregate cache hit: %s", aggKey), nil
  1223. }
  1224. if window.Hours() >= 1.0 {
  1225. // exclude the last window of the time frame to match Prometheus definitions of range, offset, and resolution
  1226. start := window.Start().Add(resolution)
  1227. window.Set(&start, window.End())
  1228. } else {
  1229. // don't cache requests for durations of less than one hour
  1230. disableCache = true
  1231. }
  1232. // attempt to retrieve cost data from cache
  1233. var costData map[string]*CostData
  1234. var err error
  1235. cacheData, found := a.CostDataCache.Get(key)
  1236. if found && !disableCache && !noCache {
  1237. ok := false
  1238. costData, ok = cacheData.(map[string]*CostData)
  1239. cacheMessage = fmt.Sprintf("ComputeAggregateCostModel: L1 cache miss: %s, L2 cost data cache hit: %s", aggKey, key)
  1240. if !ok {
  1241. log.Errorf("ComputeAggregateCostModel: caching error: failed to cast cost data to struct: %s", key)
  1242. }
  1243. } else {
  1244. log.Infof("ComputeAggregateCostModel: missed cache: %s (found %t, disableCache %t, noCache %t)", key, found, disableCache, noCache)
  1245. costData, err = a.Model.ComputeCostDataRange(promClient, a.CloudProvider, window, resolution, "", "", remoteEnabled)
  1246. if err != nil {
  1247. if prom.IsErrorCollection(err) {
  1248. return nil, "", err
  1249. }
  1250. if pce, ok := err.(prom.CommError); ok {
  1251. return nil, "", pce
  1252. }
  1253. if strings.Contains(err.Error(), "data is empty") {
  1254. return nil, "", &EmptyDataError{err: err, window: window}
  1255. }
  1256. return nil, "", err
  1257. }
  1258. // compute length of the time series in the cost data and only compute
  1259. // aggregates and cache if the length is sufficiently high
  1260. costDataLen := costDataTimeSeriesLength(costData)
  1261. if costDataLen == 0 {
  1262. return nil, "", &EmptyDataError{window: window}
  1263. }
  1264. if costDataLen >= minCostDataLength && !noCache {
  1265. log.Infof("ComputeAggregateCostModel: setting L2 cache: %s", key)
  1266. a.CostDataCache.Set(key, costData, cacheExpiry)
  1267. }
  1268. }
  1269. c, err := a.CloudProvider.GetConfig()
  1270. if err != nil {
  1271. return nil, "", err
  1272. }
  1273. discount, err := ParsePercentString(c.Discount)
  1274. if err != nil {
  1275. return nil, "", err
  1276. }
  1277. customDiscount, err := ParsePercentString(c.NegotiatedDiscount)
  1278. if err != nil {
  1279. return nil, "", err
  1280. }
  1281. sc := make(map[string]*SharedCostInfo)
  1282. if !disableSharedOverhead {
  1283. for key, val := range c.SharedCosts {
  1284. cost, err := strconv.ParseFloat(val, 64)
  1285. durationCoefficient := window.Hours() / util.HoursPerMonth
  1286. if err != nil {
  1287. return nil, "", fmt.Errorf("unable to parse shared cost %s: %s", val, err)
  1288. }
  1289. sc[key] = &SharedCostInfo{
  1290. Name: key,
  1291. Cost: cost * durationCoefficient,
  1292. }
  1293. }
  1294. }
  1295. idleCoefficients := make(map[string]float64)
  1296. if allocateIdle {
  1297. dur, off, err := window.DurationOffset()
  1298. if err != nil {
  1299. log.Errorf("ComputeAggregateCostModel: error computing idle coefficient: illegal window: %s (%s)", window, err)
  1300. return nil, "", err
  1301. }
  1302. if a.ThanosClient != nil && off < thanos.OffsetDuration() {
  1303. // Determine difference between the Thanos offset and the requested
  1304. // offset; e.g. off=1h, thanosOffsetDuration=3h => diff=2h
  1305. diff := thanos.OffsetDuration() - off
  1306. // Reduce duration by difference and increase offset by difference
  1307. // e.g. 24h offset 0h => 21h offset 3h
  1308. dur = dur - diff
  1309. off = thanos.OffsetDuration()
  1310. log.Infof("ComputeAggregateCostModel: setting duration, offset to %s, %s due to Thanos", dur, off)
  1311. // Idle computation cannot be fulfilled for some windows, specifically
  1312. // those with sum(duration, offset) < Thanos offset, because there is
  1313. // no data within that window.
  1314. if dur <= 0 {
  1315. return nil, "", fmt.Errorf("requested idle coefficients from Thanos for illegal duration, offset: %s, %s (original window %s)", dur, off, window)
  1316. }
  1317. }
  1318. // Convert to Prometheus-compatible strings
  1319. durStr, offStr := util.DurationOffsetStrings(dur, off)
  1320. idleCoefficients, err = a.ComputeIdleCoefficient(costData, promClient, a.CloudProvider, discount, customDiscount, durStr, offStr)
  1321. if err != nil {
  1322. log.Errorf("ComputeAggregateCostModel: error computing idle coefficient: duration=%s, offset=%s, err=%s", durStr, offStr, err)
  1323. return nil, "", err
  1324. }
  1325. }
  1326. totalContainerCost := 0.0
  1327. if shared == SplitTypeWeighted {
  1328. totalContainerCost = GetTotalContainerCost(costData, rate, a.CloudProvider, discount, customDiscount, idleCoefficients)
  1329. }
  1330. // filter cost data by namespace and cluster after caching for maximal cache hits
  1331. costData, filteredContainerCount, filteredEnvironments := FilterCostData(costData, retainFuncs, filterFuncs)
  1332. // aggregate cost model data by given fields and cache the result for the default expiration
  1333. aggOpts := &AggregationOptions{
  1334. Discount: discount,
  1335. CustomDiscount: customDiscount,
  1336. IdleCoefficients: idleCoefficients,
  1337. IncludeEfficiency: includeEfficiency,
  1338. IncludeTimeSeries: includeTimeSeries,
  1339. Rate: rate,
  1340. ResolutionHours: resolution.Hours(),
  1341. SharedResourceInfo: sri,
  1342. SharedCosts: sc,
  1343. FilteredContainerCount: filteredContainerCount,
  1344. FilteredEnvironments: filteredEnvironments,
  1345. TotalContainerCost: totalContainerCost,
  1346. SharedSplit: shared,
  1347. }
  1348. result := AggregateCostData(costData, field, subfields, a.CloudProvider, aggOpts)
  1349. // If sending time series data back, switch scale back to hourly data. At this point,
  1350. // resolutionHours may have converted our hourly data to more- or less-than hourly data.
  1351. if includeTimeSeries {
  1352. for _, aggs := range result {
  1353. ScaleAggregationTimeSeries(aggs, resolution.Hours())
  1354. }
  1355. }
  1356. // compute length of the time series in the cost data and only cache
  1357. // aggregation results if the length is sufficiently high
  1358. costDataLen := costDataTimeSeriesLength(costData)
  1359. if costDataLen >= minCostDataLength && window.Hours() > 1.0 && !noCache {
  1360. // Set the result map (rather than a pointer to it) because map is a reference type
  1361. log.Infof("ComputeAggregateCostModel: setting aggregate cache: %s", aggKey)
  1362. a.AggregateCache.Set(aggKey, result, cacheExpiry)
  1363. } else {
  1364. log.Infof("ComputeAggregateCostModel: not setting aggregate cache: %s (not enough data: %t; duration less than 1h: %t; noCache: %t)", key, costDataLen < minCostDataLength, window.Hours() < 1, noCache)
  1365. }
  1366. return result, cacheMessage, nil
  1367. }
  1368. // ScaleAggregationTimeSeries reverses the scaling done by ScaleHourlyCostData, returning
  1369. // the aggregation's time series to hourly data.
  1370. func ScaleAggregationTimeSeries(aggregation *Aggregation, resolutionHours float64) {
  1371. for _, v := range aggregation.CPUCostVector {
  1372. v.Value /= resolutionHours
  1373. }
  1374. for _, v := range aggregation.GPUCostVector {
  1375. v.Value /= resolutionHours
  1376. }
  1377. for _, v := range aggregation.RAMCostVector {
  1378. v.Value /= resolutionHours
  1379. }
  1380. for _, v := range aggregation.PVCostVector {
  1381. v.Value /= resolutionHours
  1382. }
  1383. for _, v := range aggregation.NetworkCostVector {
  1384. v.Value /= resolutionHours
  1385. }
  1386. for _, v := range aggregation.TotalCostVector {
  1387. v.Value /= resolutionHours
  1388. }
  1389. return
  1390. }
  1391. // String returns a string representation of the encapsulated shared resources, which
  1392. // can be used to uniquely identify a set of shared resources. Sorting sets of shared
  1393. // resources ensures that strings representing permutations of the same combination match.
  1394. func (s *SharedResourceInfo) String() string {
  1395. if s == nil {
  1396. return ""
  1397. }
  1398. nss := []string{}
  1399. for ns := range s.SharedNamespace {
  1400. nss = append(nss, ns)
  1401. }
  1402. sort.Strings(nss)
  1403. nsStr := strings.Join(nss, ",")
  1404. labels := []string{}
  1405. for lbl, vals := range s.LabelSelectors {
  1406. for val := range vals {
  1407. if lbl != "" && val != "" {
  1408. labels = append(labels, fmt.Sprintf("%s=%s", lbl, val))
  1409. }
  1410. }
  1411. }
  1412. sort.Strings(labels)
  1413. labelStr := strings.Join(labels, ",")
  1414. return fmt.Sprintf("%s:%s", nsStr, labelStr)
  1415. }
  1416. type aggKeyParams struct {
  1417. duration string
  1418. offset string
  1419. filters map[string]string
  1420. field string
  1421. subfields []string
  1422. rate string
  1423. sri *SharedResourceInfo
  1424. shareType string
  1425. idle bool
  1426. timeSeries bool
  1427. efficiency bool
  1428. }
  1429. // GenerateAggKey generates a parameter-unique key for caching the aggregate cost model
  1430. func GenerateAggKey(window kubecost.Window, field string, subfields []string, opts *AggregateQueryOpts) string {
  1431. if opts == nil {
  1432. opts = DefaultAggregateQueryOpts()
  1433. }
  1434. // Covert to duration, offset so that cache hits occur, even when timestamps have
  1435. // shifted slightly.
  1436. duration, offset := window.DurationOffsetStrings()
  1437. // parse, trim, and sort podprefix filters
  1438. podPrefixFilters := []string{}
  1439. if ppfs, ok := opts.Filters["podprefix"]; ok && ppfs != "" {
  1440. for _, psf := range strings.Split(ppfs, ",") {
  1441. podPrefixFilters = append(podPrefixFilters, strings.TrimSpace(psf))
  1442. }
  1443. }
  1444. sort.Strings(podPrefixFilters)
  1445. podPrefixFiltersStr := strings.Join(podPrefixFilters, ",")
  1446. // parse, trim, and sort namespace filters
  1447. nsFilters := []string{}
  1448. if nsfs, ok := opts.Filters["namespace"]; ok && nsfs != "" {
  1449. for _, nsf := range strings.Split(nsfs, ",") {
  1450. nsFilters = append(nsFilters, strings.TrimSpace(nsf))
  1451. }
  1452. }
  1453. sort.Strings(nsFilters)
  1454. nsFilterStr := strings.Join(nsFilters, ",")
  1455. // parse, trim, and sort node filters
  1456. nodeFilters := []string{}
  1457. if nodefs, ok := opts.Filters["node"]; ok && nodefs != "" {
  1458. for _, nodef := range strings.Split(nodefs, ",") {
  1459. nodeFilters = append(nodeFilters, strings.TrimSpace(nodef))
  1460. }
  1461. }
  1462. sort.Strings(nodeFilters)
  1463. nodeFilterStr := strings.Join(nodeFilters, ",")
  1464. // parse, trim, and sort cluster filters
  1465. cFilters := []string{}
  1466. if cfs, ok := opts.Filters["cluster"]; ok && cfs != "" {
  1467. for _, cf := range strings.Split(cfs, ",") {
  1468. cFilters = append(cFilters, strings.TrimSpace(cf))
  1469. }
  1470. }
  1471. sort.Strings(cFilters)
  1472. cFilterStr := strings.Join(cFilters, ",")
  1473. // parse, trim, and sort label filters
  1474. lFilters := []string{}
  1475. if lfs, ok := opts.Filters["labels"]; ok && lfs != "" {
  1476. for _, lf := range strings.Split(lfs, ",") {
  1477. // trim whitespace from the label name and the label value
  1478. // of each label name/value pair, then reconstruct
  1479. // e.g. "tier = frontend, app = kubecost" == "app=kubecost,tier=frontend"
  1480. lfa := strings.Split(lf, "=")
  1481. if len(lfa) == 2 {
  1482. lfn := strings.TrimSpace(lfa[0])
  1483. lfv := strings.TrimSpace(lfa[1])
  1484. lFilters = append(lFilters, fmt.Sprintf("%s=%s", lfn, lfv))
  1485. } else {
  1486. // label is not of the form name=value, so log it and move on
  1487. klog.V(2).Infof("[Warning] GenerateAggKey: skipping illegal label filter: %s", lf)
  1488. }
  1489. }
  1490. }
  1491. sort.Strings(lFilters)
  1492. lFilterStr := strings.Join(lFilters, ",")
  1493. // parse, trim, and sort annotation filters
  1494. aFilters := []string{}
  1495. if afs, ok := opts.Filters["annotations"]; ok && afs != "" {
  1496. for _, af := range strings.Split(afs, ",") {
  1497. // trim whitespace from the annotation name and the annotation value
  1498. // of each annotation name/value pair, then reconstruct
  1499. // e.g. "tier = frontend, app = kubecost" == "app=kubecost,tier=frontend"
  1500. afa := strings.Split(af, "=")
  1501. if len(afa) == 2 {
  1502. afn := strings.TrimSpace(afa[0])
  1503. afv := strings.TrimSpace(afa[1])
  1504. aFilters = append(aFilters, fmt.Sprintf("%s=%s", afn, afv))
  1505. } else {
  1506. // annotation is not of the form name=value, so log it and move on
  1507. klog.V(2).Infof("[Warning] GenerateAggKey: skipping illegal annotation filter: %s", af)
  1508. }
  1509. }
  1510. }
  1511. sort.Strings(aFilters)
  1512. aFilterStr := strings.Join(aFilters, ",")
  1513. filterStr := fmt.Sprintf("%s:%s:%s:%s:%s:%s", nsFilterStr, nodeFilterStr, cFilterStr, lFilterStr, aFilterStr, podPrefixFiltersStr)
  1514. sort.Strings(subfields)
  1515. fieldStr := fmt.Sprintf("%s:%s", field, strings.Join(subfields, ","))
  1516. return fmt.Sprintf("%s:%s:%s:%s:%s:%s:%s:%t:%t:%t", duration, offset, filterStr, fieldStr, opts.Rate,
  1517. opts.SharedResources, opts.ShareSplit, opts.AllocateIdle, opts.IncludeTimeSeries,
  1518. opts.IncludeEfficiency)
  1519. }
  1520. // Aggregator is capable of computing the aggregated cost model. This is
  1521. // a brutal interface, which should be cleaned up, but it's necessary for
  1522. // being able to swap in an ETL-backed implementation.
  1523. type Aggregator interface {
  1524. ComputeAggregateCostModel(promClient prometheusClient.Client, window kubecost.Window, field string, subfields []string, opts *AggregateQueryOpts) (map[string]*Aggregation, string, error)
  1525. }
  1526. func (a *Accesses) warmAggregateCostModelCache() {
  1527. // Only allow one concurrent cache-warming operation
  1528. sem := util.NewSemaphore(1)
  1529. // Set default values, pulling them from application settings where applicable, and warm the cache
  1530. // for the given duration. Cache is intentionally set to expire (i.e. noExpireCache=false) so that
  1531. // if the default parameters change, the old cached defaults with eventually expire. Thus, the
  1532. // timing of the cache expiry/refresh is the only mechanism ensuring 100% cache warmth.
  1533. warmFunc := func(duration, durationHrs, offset string, cacheEfficiencyData bool) (error, error) {
  1534. promClient := a.GetPrometheusClient(true)
  1535. windowStr := fmt.Sprintf("%s offset %s", duration, offset)
  1536. window, err := kubecost.ParseWindowUTC(windowStr)
  1537. if err != nil {
  1538. return nil, fmt.Errorf("invalid window from window string: %s", windowStr)
  1539. }
  1540. field := "namespace"
  1541. subfields := []string{}
  1542. aggOpts := DefaultAggregateQueryOpts()
  1543. aggOpts.Rate = ""
  1544. aggOpts.Filters = map[string]string{}
  1545. aggOpts.IncludeTimeSeries = false
  1546. aggOpts.IncludeEfficiency = true
  1547. aggOpts.DisableCache = true
  1548. aggOpts.ClearCache = false
  1549. aggOpts.NoCache = false
  1550. aggOpts.NoExpireCache = false
  1551. aggOpts.ShareSplit = SplitTypeWeighted
  1552. aggOpts.RemoteEnabled = env.IsRemoteEnabled()
  1553. aggOpts.AllocateIdle = cloud.AllocateIdleByDefault(a.CloudProvider)
  1554. sharedNamespaces := cloud.SharedNamespaces(a.CloudProvider)
  1555. sharedLabelNames, sharedLabelValues := cloud.SharedLabels(a.CloudProvider)
  1556. if len(sharedNamespaces) > 0 || len(sharedLabelNames) > 0 {
  1557. aggOpts.SharedResources = NewSharedResourceInfo(true, sharedNamespaces, sharedLabelNames, sharedLabelValues)
  1558. }
  1559. aggKey := GenerateAggKey(window, field, subfields, aggOpts)
  1560. log.Infof("aggregation: cache warming defaults: %s", aggKey)
  1561. key := fmt.Sprintf("%s:%s", durationHrs, offset)
  1562. _, _, aggErr := a.ComputeAggregateCostModel(promClient, window, field, subfields, aggOpts)
  1563. if aggErr != nil {
  1564. log.Infof("Error building cache %s: %s", window, aggErr)
  1565. }
  1566. if a.ThanosClient != nil {
  1567. offset = thanos.Offset()
  1568. log.Infof("Setting offset to %s", offset)
  1569. }
  1570. totals, err := a.ComputeClusterCosts(promClient, a.CloudProvider, durationHrs, offset, cacheEfficiencyData)
  1571. if err != nil {
  1572. log.Infof("Error building cluster costs cache %s", key)
  1573. }
  1574. maxMinutesWithData := 0.0
  1575. for _, cluster := range totals {
  1576. if cluster.DataMinutes > maxMinutesWithData {
  1577. maxMinutesWithData = cluster.DataMinutes
  1578. }
  1579. }
  1580. if len(totals) > 0 && maxMinutesWithData > clusterCostsCacheMinutes {
  1581. a.ClusterCostsCache.Set(key, totals, a.GetCacheExpiration(window.Duration()))
  1582. log.Infof("caching %s cluster costs for %s", duration, a.GetCacheExpiration(window.Duration()))
  1583. } else {
  1584. log.Warningf("not caching %s cluster costs: no data or less than %f minutes data ", duration, clusterCostsCacheMinutes)
  1585. }
  1586. return aggErr, err
  1587. }
  1588. // 1 day
  1589. go func(sem *util.Semaphore) {
  1590. defer errors.HandlePanic()
  1591. duration := "1d"
  1592. offset := "1m"
  1593. durHrs := "24h"
  1594. dur := 24 * time.Hour
  1595. for {
  1596. sem.Acquire()
  1597. warmFunc(duration, durHrs, offset, true)
  1598. sem.Return()
  1599. log.Infof("aggregation: warm cache: %s", duration)
  1600. time.Sleep(a.GetCacheRefresh(dur))
  1601. }
  1602. }(sem)
  1603. // 2 day
  1604. go func(sem *util.Semaphore) {
  1605. defer errors.HandlePanic()
  1606. duration := "2d"
  1607. offset := "1m"
  1608. durHrs := "48h"
  1609. dur := 2 * 24 * time.Hour
  1610. for {
  1611. sem.Acquire()
  1612. warmFunc(duration, durHrs, offset, false)
  1613. sem.Return()
  1614. log.Infof("aggregation: warm cache: %s", duration)
  1615. time.Sleep(a.GetCacheRefresh(dur))
  1616. }
  1617. }(sem)
  1618. if !env.IsETLEnabled() {
  1619. // 7 day
  1620. go func(sem *util.Semaphore) {
  1621. defer errors.HandlePanic()
  1622. duration := "7d"
  1623. offset := "1m"
  1624. durHrs := "168h"
  1625. dur := 7 * 24 * time.Hour
  1626. for {
  1627. sem.Acquire()
  1628. aggErr, err := warmFunc(duration, durHrs, offset, false)
  1629. sem.Return()
  1630. log.Infof("aggregation: warm cache: %s", duration)
  1631. if aggErr == nil && err == nil {
  1632. time.Sleep(a.GetCacheRefresh(dur))
  1633. } else {
  1634. time.Sleep(5 * time.Minute)
  1635. }
  1636. }
  1637. }(sem)
  1638. // 30 day
  1639. go func(sem *util.Semaphore) {
  1640. defer errors.HandlePanic()
  1641. for {
  1642. duration := "30d"
  1643. offset := "1m"
  1644. durHrs := "720h"
  1645. dur := 30 * 24 * time.Hour
  1646. sem.Acquire()
  1647. aggErr, err := warmFunc(duration, durHrs, offset, false)
  1648. sem.Return()
  1649. if aggErr == nil && err == nil {
  1650. time.Sleep(a.GetCacheRefresh(dur))
  1651. } else {
  1652. time.Sleep(5 * time.Minute)
  1653. }
  1654. }
  1655. }(sem)
  1656. }
  1657. }
  1658. // AggregateCostModelHandler handles requests to the aggregated cost model API. See
  1659. // ComputeAggregateCostModel for details.
  1660. func (a *Accesses) AggregateCostModelHandler(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  1661. w.Header().Set("Content-Type", "application/json")
  1662. windowStr := r.URL.Query().Get("window")
  1663. // Convert UTC-RFC3339 pairs to configured UTC offset
  1664. // e.g. with UTC offset of -0600, 2020-07-01T00:00:00Z becomes
  1665. // 2020-07-01T06:00:00Z == 2020-07-01T00:00:00-0600
  1666. // TODO niko/etl fix the frontend because this is confusing if you're
  1667. // actually asking for UTC time (...Z) and we swap that "Z" out for the
  1668. // configured UTC offset without asking
  1669. rfc3339 := `\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\dZ`
  1670. regex := regexp.MustCompile(fmt.Sprintf(`(%s),(%s)`, rfc3339, rfc3339))
  1671. match := regex.FindStringSubmatch(windowStr)
  1672. if match != nil {
  1673. start, _ := time.Parse(time.RFC3339, match[1])
  1674. start = start.Add(-env.GetParsedUTCOffset()).In(time.UTC)
  1675. end, _ := time.Parse(time.RFC3339, match[2])
  1676. end = end.Add(-env.GetParsedUTCOffset()).In(time.UTC)
  1677. windowStr = fmt.Sprintf("%sZ,%sZ", start.Format("2006-01-02T15:04:05"), end.Format("2006-01-02T15:04:05Z"))
  1678. }
  1679. // determine duration and offset from query parameters
  1680. window, err := kubecost.ParseWindowWithOffset(windowStr, env.GetParsedUTCOffset())
  1681. if err != nil || window.Start() == nil {
  1682. WriteError(w, BadRequest(fmt.Sprintf("invalid window: %s", err)))
  1683. return
  1684. }
  1685. durRegex := regexp.MustCompile(`^(\d+)(m|h|d|s)$`)
  1686. isDurationStr := durRegex.MatchString(windowStr)
  1687. // legacy offset option should override window offset
  1688. if r.URL.Query().Get("offset") != "" {
  1689. offset := r.URL.Query().Get("offset")
  1690. // Shift window by offset, but only when manually set with separate
  1691. // parameter and window was provided as a duration string. Otherwise,
  1692. // do not alter the (duration, offset) from ParseWindowWithOffset.
  1693. if offset != "1m" && isDurationStr {
  1694. match := durRegex.FindStringSubmatch(offset)
  1695. if match != nil && len(match) == 3 {
  1696. dur := time.Minute
  1697. if match[2] == "h" {
  1698. dur = time.Hour
  1699. }
  1700. if match[2] == "d" {
  1701. dur = 24 * time.Hour
  1702. }
  1703. if match[2] == "s" {
  1704. dur = time.Second
  1705. }
  1706. num, _ := strconv.ParseInt(match[1], 10, 64)
  1707. window = window.Shift(-time.Duration(num) * dur)
  1708. }
  1709. }
  1710. }
  1711. opts := DefaultAggregateQueryOpts()
  1712. // parse remaining query parameters
  1713. namespace := r.URL.Query().Get("namespace")
  1714. cluster := r.URL.Query().Get("cluster")
  1715. labels := r.URL.Query().Get("labels")
  1716. annotations := r.URL.Query().Get("annotations")
  1717. podprefix := r.URL.Query().Get("podprefix")
  1718. field := r.URL.Query().Get("aggregation")
  1719. sharedNamespaces := r.URL.Query().Get("sharedNamespaces")
  1720. sharedLabelNames := r.URL.Query().Get("sharedLabelNames")
  1721. sharedLabelValues := r.URL.Query().Get("sharedLabelValues")
  1722. remote := r.URL.Query().Get("remote") != "false"
  1723. subfieldStr := r.URL.Query().Get("aggregationSubfield")
  1724. subfields := []string{}
  1725. if len(subfieldStr) > 0 {
  1726. s := strings.Split(r.URL.Query().Get("aggregationSubfield"), ",")
  1727. for _, rawLabel := range s {
  1728. subfields = append(subfields, prom.SanitizeLabelName(rawLabel))
  1729. }
  1730. }
  1731. idleFlag := r.URL.Query().Get("allocateIdle")
  1732. if idleFlag == "default" {
  1733. c, _ := a.CloudProvider.GetConfig()
  1734. opts.AllocateIdle = (c.DefaultIdle == "true")
  1735. } else {
  1736. opts.AllocateIdle = (idleFlag == "true")
  1737. }
  1738. opts.Rate = r.URL.Query().Get("rate")
  1739. opts.ShareSplit = r.URL.Query().Get("sharedSplit")
  1740. // timeSeries == true maintains the time series dimension of the data,
  1741. // which by default gets summed over the entire interval
  1742. opts.IncludeTimeSeries = r.URL.Query().Get("timeSeries") == "true"
  1743. // efficiency has been deprecated in favor of a default to always send efficiency
  1744. opts.IncludeEfficiency = true
  1745. // TODO niko/caching rename "recomputeCache"
  1746. // disableCache, if set to "true", tells this function to recompute and
  1747. // cache the requested data
  1748. opts.DisableCache = r.URL.Query().Get("disableCache") == "true"
  1749. // clearCache, if set to "true", tells this function to flush the cache,
  1750. // then recompute and cache the requested data
  1751. opts.ClearCache = r.URL.Query().Get("clearCache") == "true"
  1752. // noCache avoids the cache altogether, both reading from and writing to
  1753. opts.NoCache = r.URL.Query().Get("noCache") == "true"
  1754. // noExpireCache should only be used by cache warming to set non-expiring caches
  1755. opts.NoExpireCache = false
  1756. // etl triggers ETL adapter
  1757. opts.UseETLAdapter = r.URL.Query().Get("etl") == "true"
  1758. // aggregation field is required
  1759. if field == "" {
  1760. WriteError(w, BadRequest("Missing aggregation field parameter"))
  1761. return
  1762. }
  1763. // aggregation subfield is required when aggregation field is "label"
  1764. if field == "label" && len(subfields) == 0 {
  1765. WriteError(w, BadRequest("Missing aggregation field parameter"))
  1766. return
  1767. }
  1768. // enforce one of four available rate options
  1769. if opts.Rate != "" && opts.Rate != "hourly" && opts.Rate != "daily" && opts.Rate != "monthly" {
  1770. WriteError(w, BadRequest("Missing aggregation field parameter"))
  1771. return
  1772. }
  1773. // parse cost data filters
  1774. // namespace and cluster are exact-string-matches
  1775. // labels are expected to be comma-separated and to take the form key=value
  1776. // e.g. app=cost-analyzer,app.kubernetes.io/instance=kubecost
  1777. opts.Filters = map[string]string{
  1778. "namespace": namespace,
  1779. "cluster": cluster,
  1780. "labels": labels,
  1781. "annotations": annotations,
  1782. "podprefix": podprefix,
  1783. }
  1784. // parse shared resources
  1785. sn := []string{}
  1786. sln := []string{}
  1787. slv := []string{}
  1788. if sharedNamespaces != "" {
  1789. sn = strings.Split(sharedNamespaces, ",")
  1790. }
  1791. if sharedLabelNames != "" {
  1792. sln = strings.Split(sharedLabelNames, ",")
  1793. slv = strings.Split(sharedLabelValues, ",")
  1794. if len(sln) != len(slv) || slv[0] == "" {
  1795. WriteError(w, BadRequest("Supply exactly one shared label value per shared label name"))
  1796. return
  1797. }
  1798. }
  1799. if len(sn) > 0 || len(sln) > 0 {
  1800. opts.SharedResources = NewSharedResourceInfo(true, sn, sln, slv)
  1801. }
  1802. // enable remote if it is available and not disabled
  1803. opts.RemoteEnabled = remote && env.IsRemoteEnabled()
  1804. promClient := a.GetPrometheusClient(remote)
  1805. var data map[string]*Aggregation
  1806. var message string
  1807. data, message, err = a.AggAPI.ComputeAggregateCostModel(promClient, window, field, subfields, opts)
  1808. // Find any warnings in http request context
  1809. warning, _ := util.GetWarning(r)
  1810. if err != nil {
  1811. if emptyErr, ok := err.(*EmptyDataError); ok {
  1812. if warning == "" {
  1813. w.Write(WrapData(map[string]interface{}{}, emptyErr))
  1814. } else {
  1815. w.Write(WrapDataWithWarning(map[string]interface{}{}, emptyErr, warning))
  1816. }
  1817. return
  1818. }
  1819. if boundaryErr, ok := err.(*kubecost.BoundaryError); ok {
  1820. if window.Start() != nil && window.Start().After(time.Now().Add(-90*24*time.Hour)) {
  1821. // Asking for data within a 90 day period: it will be available
  1822. // after the pipeline builds
  1823. msg := "Data will be available after ETL is built"
  1824. rex := regexp.MustCompile(`(\d+\.*\d*)%`)
  1825. match := rex.FindStringSubmatch(boundaryErr.Message)
  1826. if len(match) > 1 {
  1827. completionPct, err := strconv.ParseFloat(match[1], 64)
  1828. if err == nil {
  1829. msg = fmt.Sprintf("%s (%.1f%% complete)", msg, completionPct)
  1830. }
  1831. }
  1832. WriteError(w, InternalServerError(msg))
  1833. } else {
  1834. // Boundary error outside of 90 day period; may not be available
  1835. WriteError(w, InternalServerError(boundaryErr.Error()))
  1836. }
  1837. return
  1838. }
  1839. errStr := fmt.Sprintf("error computing aggregate cost model: %s", err)
  1840. WriteError(w, InternalServerError(errStr))
  1841. return
  1842. }
  1843. if warning == "" {
  1844. w.Write(WrapDataWithMessage(data, nil, message))
  1845. } else {
  1846. w.Write(WrapDataWithMessageAndWarning(data, nil, message, warning))
  1847. }
  1848. }
  1849. // The below was transferred from a different package in order to maintain
  1850. // previous behavior. Ultimately, we should clean this up at some point.
  1851. // TODO move to util and/or standardize everything
  1852. type Error struct {
  1853. StatusCode int
  1854. Body string
  1855. }
  1856. func WriteError(w http.ResponseWriter, err Error) {
  1857. status := err.StatusCode
  1858. if status == 0 {
  1859. status = http.StatusInternalServerError
  1860. }
  1861. w.WriteHeader(status)
  1862. resp, _ := json.Marshal(&Response{
  1863. Code: status,
  1864. Message: fmt.Sprintf("Error: %s", err.Body),
  1865. })
  1866. w.Write(resp)
  1867. }
  1868. func BadRequest(message string) Error {
  1869. return Error{
  1870. StatusCode: http.StatusBadRequest,
  1871. Body: message,
  1872. }
  1873. }
  1874. func InternalServerError(message string) Error {
  1875. if message == "" {
  1876. message = "Internal Server Error"
  1877. }
  1878. return Error{
  1879. StatusCode: http.StatusInternalServerError,
  1880. Body: message,
  1881. }
  1882. }
  1883. func NotFound() Error {
  1884. return Error{
  1885. StatusCode: http.StatusNotFound,
  1886. Body: "Not Found",
  1887. }
  1888. }