aggregation.go 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999
  1. package costmodel
  2. import (
  3. "fmt"
  4. "math"
  5. "net/http"
  6. "sort"
  7. "strconv"
  8. "strings"
  9. "time"
  10. "github.com/julienschmidt/httprouter"
  11. "github.com/opencost/opencost/pkg/cloud/provider"
  12. "github.com/opencost/opencost/pkg/errors"
  13. "github.com/opencost/opencost/core/pkg/log"
  14. "github.com/opencost/opencost/core/pkg/opencost"
  15. "github.com/opencost/opencost/core/pkg/util"
  16. "github.com/opencost/opencost/core/pkg/util/httputil"
  17. "github.com/opencost/opencost/core/pkg/util/json"
  18. "github.com/opencost/opencost/core/pkg/util/promutil"
  19. "github.com/opencost/opencost/core/pkg/util/timeutil"
  20. "github.com/opencost/opencost/pkg/cloud/models"
  21. "github.com/opencost/opencost/pkg/env"
  22. )
  23. const (
  24. // SplitTypeWeighted signals that shared costs should be shared
  25. // proportionally, rather than evenly
  26. SplitTypeWeighted = "weighted"
  27. // UnallocatedSubfield indicates an allocation datum that does not have the
  28. // chosen Aggregator; e.g. during aggregation by some label, there may be
  29. // cost data that do not have the given label.
  30. UnallocatedSubfield = "__unallocated__"
  31. )
  32. // Aggregation describes aggregated cost data, containing cumulative cost and
  33. // allocation data per resource, vectors of rate data per resource, efficiency
  34. // data, and metadata describing the type of aggregation operation.
  35. type Aggregation struct {
  36. Aggregator string `json:"aggregation"`
  37. Subfields []string `json:"subfields,omitempty"`
  38. Environment string `json:"environment"`
  39. Cluster string `json:"cluster,omitempty"`
  40. Properties *opencost.AllocationProperties `json:"-"`
  41. Start time.Time `json:"-"`
  42. End time.Time `json:"-"`
  43. CPUAllocationHourlyAverage float64 `json:"cpuAllocationAverage"`
  44. CPUAllocationVectors []*util.Vector `json:"-"`
  45. CPUAllocationTotal float64 `json:"-"`
  46. CPUCost float64 `json:"cpuCost"`
  47. CPUCostVector []*util.Vector `json:"cpuCostVector,omitempty"`
  48. CPUEfficiency float64 `json:"cpuEfficiency"`
  49. CPURequestedVectors []*util.Vector `json:"-"`
  50. CPUUsedVectors []*util.Vector `json:"-"`
  51. Efficiency float64 `json:"efficiency"`
  52. GPUAllocationHourlyAverage float64 `json:"gpuAllocationAverage"`
  53. GPUAllocationVectors []*util.Vector `json:"-"`
  54. GPUCost float64 `json:"gpuCost"`
  55. GPUCostVector []*util.Vector `json:"gpuCostVector,omitempty"`
  56. GPUAllocationTotal float64 `json:"-"`
  57. RAMAllocationHourlyAverage float64 `json:"ramAllocationAverage"`
  58. RAMAllocationVectors []*util.Vector `json:"-"`
  59. RAMAllocationTotal float64 `json:"-"`
  60. RAMCost float64 `json:"ramCost"`
  61. RAMCostVector []*util.Vector `json:"ramCostVector,omitempty"`
  62. RAMEfficiency float64 `json:"ramEfficiency"`
  63. RAMRequestedVectors []*util.Vector `json:"-"`
  64. RAMUsedVectors []*util.Vector `json:"-"`
  65. PVAllocationHourlyAverage float64 `json:"pvAllocationAverage"`
  66. PVAllocationVectors []*util.Vector `json:"-"`
  67. PVAllocationTotal float64 `json:"-"`
  68. PVCost float64 `json:"pvCost"`
  69. PVCostVector []*util.Vector `json:"pvCostVector,omitempty"`
  70. NetworkCost float64 `json:"networkCost"`
  71. NetworkCostVector []*util.Vector `json:"networkCostVector,omitempty"`
  72. SharedCost float64 `json:"sharedCost"`
  73. TotalCost float64 `json:"totalCost"`
  74. TotalCostVector []*util.Vector `json:"totalCostVector,omitempty"`
  75. }
  76. // TotalHours determines the amount of hours the Aggregation covers, as a
  77. // function of the cost vectors and the resolution of those vectors' data
  78. func (a *Aggregation) TotalHours(resolutionHours float64) float64 {
  79. length := 1
  80. if length < len(a.CPUCostVector) {
  81. length = len(a.CPUCostVector)
  82. }
  83. if length < len(a.RAMCostVector) {
  84. length = len(a.RAMCostVector)
  85. }
  86. if length < len(a.PVCostVector) {
  87. length = len(a.PVCostVector)
  88. }
  89. if length < len(a.GPUCostVector) {
  90. length = len(a.GPUCostVector)
  91. }
  92. if length < len(a.NetworkCostVector) {
  93. length = len(a.NetworkCostVector)
  94. }
  95. return float64(length) * resolutionHours
  96. }
  97. // RateCoefficient computes the coefficient by which the total cost needs to be
  98. // multiplied in order to convert totals costs into per-rate costs.
  99. func (a *Aggregation) RateCoefficient(rateStr string, resolutionHours float64) float64 {
  100. // monthly rate = (730.0)*(total cost)/(total hours)
  101. // daily rate = (24.0)*(total cost)/(total hours)
  102. // hourly rate = (1.0)*(total cost)/(total hours)
  103. // default to hourly rate
  104. coeff := 1.0
  105. switch rateStr {
  106. case "daily":
  107. coeff = timeutil.HoursPerDay
  108. case "monthly":
  109. coeff = timeutil.HoursPerMonth
  110. }
  111. return coeff / a.TotalHours(resolutionHours)
  112. }
  113. type SharedResourceInfo struct {
  114. ShareResources bool
  115. SharedNamespace map[string]bool
  116. LabelSelectors map[string]map[string]bool
  117. }
  118. type SharedCostInfo struct {
  119. Name string
  120. Cost float64
  121. ShareType string
  122. }
  123. func (s *SharedResourceInfo) IsSharedResource(costDatum *CostData) bool {
  124. // exists in a shared namespace
  125. if _, ok := s.SharedNamespace[costDatum.Namespace]; ok {
  126. return true
  127. }
  128. // has at least one shared label (OR, not AND in the case of multiple labels)
  129. for labelName, labelValues := range s.LabelSelectors {
  130. if val, ok := costDatum.Labels[labelName]; ok && labelValues[val] {
  131. return true
  132. }
  133. }
  134. return false
  135. }
  136. func NewSharedResourceInfo(shareResources bool, sharedNamespaces []string, labelNames []string, labelValues []string) *SharedResourceInfo {
  137. sr := &SharedResourceInfo{
  138. ShareResources: shareResources,
  139. SharedNamespace: make(map[string]bool),
  140. LabelSelectors: make(map[string]map[string]bool),
  141. }
  142. for _, ns := range sharedNamespaces {
  143. sr.SharedNamespace[strings.Trim(ns, " ")] = true
  144. }
  145. // Creating a map of label name to label value, but only if
  146. // the cardinality matches
  147. if len(labelNames) == len(labelValues) {
  148. for i := range labelNames {
  149. cleanedLname := promutil.SanitizeLabelName(strings.Trim(labelNames[i], " "))
  150. if values, ok := sr.LabelSelectors[cleanedLname]; ok {
  151. values[strings.Trim(labelValues[i], " ")] = true
  152. } else {
  153. sr.LabelSelectors[cleanedLname] = map[string]bool{strings.Trim(labelValues[i], " "): true}
  154. }
  155. }
  156. }
  157. return sr
  158. }
  159. func GetTotalContainerCost(costData map[string]*CostData, rate string, cp models.Provider, discount float64, customDiscount float64, idleCoefficients map[string]float64) float64 {
  160. totalContainerCost := 0.0
  161. for _, costDatum := range costData {
  162. clusterID := costDatum.ClusterID
  163. cpuv, ramv, gpuv, pvvs, netv := getPriceVectors(cp, costDatum, discount, customDiscount, idleCoefficients[clusterID])
  164. totalContainerCost += totalVectors(cpuv)
  165. totalContainerCost += totalVectors(ramv)
  166. totalContainerCost += totalVectors(gpuv)
  167. for _, pv := range pvvs {
  168. totalContainerCost += totalVectors(pv)
  169. }
  170. totalContainerCost += totalVectors(netv)
  171. }
  172. return totalContainerCost
  173. }
  174. func (a *Accesses) ComputeIdleCoefficient(costData map[string]*CostData, discount float64, customDiscount float64, window, offset time.Duration) (map[string]float64, error) {
  175. coefficients := make(map[string]float64)
  176. profileName := "ComputeIdleCoefficient: ComputeClusterCosts"
  177. profileStart := time.Now()
  178. var clusterCosts map[string]*ClusterCosts
  179. var err error
  180. fmtWindow, fmtOffset := timeutil.DurationOffsetStrings(window, offset)
  181. key := fmt.Sprintf("%s:%s", fmtWindow, fmtOffset)
  182. if data, valid := a.ClusterCostsCache.Get(key); valid {
  183. clusterCosts = data.(map[string]*ClusterCosts)
  184. } else {
  185. clusterCosts, err = a.ComputeClusterCosts(a.DataSource, a.CloudProvider, window, offset, false)
  186. if err != nil {
  187. return nil, err
  188. }
  189. }
  190. measureTime(profileStart, profileThreshold, profileName)
  191. for cid, costs := range clusterCosts {
  192. if costs.CPUCumulative == 0 && costs.RAMCumulative == 0 && costs.StorageCumulative == 0 {
  193. log.Warnf("No ClusterCosts data for cluster '%s'. Is it emitting data?", cid)
  194. coefficients[cid] = 1.0
  195. continue
  196. }
  197. if costs.TotalCumulative == 0 {
  198. return nil, fmt.Errorf("TotalCumulative cluster cost for cluster '%s' returned 0 over window '%s' offset '%s'", cid, fmtWindow, fmtOffset)
  199. }
  200. totalContainerCost := 0.0
  201. for _, costDatum := range costData {
  202. if costDatum.ClusterID == cid {
  203. cpuv, ramv, gpuv, pvvs, _ := getPriceVectors(a.CloudProvider, costDatum, discount, customDiscount, 1)
  204. totalContainerCost += totalVectors(cpuv)
  205. totalContainerCost += totalVectors(ramv)
  206. totalContainerCost += totalVectors(gpuv)
  207. for _, pv := range pvvs {
  208. totalContainerCost += totalVectors(pv)
  209. }
  210. }
  211. }
  212. coeff := totalContainerCost / costs.TotalCumulative
  213. coefficients[cid] = coeff
  214. }
  215. return coefficients, nil
  216. }
  217. // AggregationOptions provides optional parameters to AggregateCostData, allowing callers to perform more complex operations
  218. type AggregationOptions struct {
  219. Discount float64 // percent by which to discount CPU, RAM, and GPU cost
  220. CustomDiscount float64 // additional custom discount applied to all prices
  221. IdleCoefficients map[string]float64 // scales costs by amount of idle resources on a per-cluster basis
  222. IncludeEfficiency bool // set to true to receive efficiency/usage data
  223. IncludeTimeSeries bool // set to true to receive time series data
  224. Rate string // set to "hourly", "daily", or "monthly" to receive cost rate, rather than cumulative cost
  225. ResolutionHours float64
  226. SharedResourceInfo *SharedResourceInfo
  227. SharedCosts map[string]*SharedCostInfo
  228. FilteredContainerCount int
  229. FilteredEnvironments map[string]int
  230. SharedSplit string
  231. TotalContainerCost float64
  232. }
  233. // Returns the blended discounts applied to the node as a result of global discounts and reserved instance
  234. // discounts
  235. func getDiscounts(costDatum *CostData, cpuCost float64, ramCost float64, discount float64) (float64, float64) {
  236. if costDatum.NodeData == nil {
  237. return discount, discount
  238. }
  239. if costDatum.NodeData.IsSpot() {
  240. return 0, 0
  241. }
  242. reserved := costDatum.NodeData.Reserved
  243. // blended discounts
  244. blendedCPUDiscount := discount
  245. blendedRAMDiscount := discount
  246. if reserved != nil && reserved.CPUCost > 0 && reserved.RAMCost > 0 {
  247. reservedCPUDiscount := 0.0
  248. if cpuCost == 0 {
  249. log.Warnf("No cpu cost found for cluster '%s' node '%s'", costDatum.ClusterID, costDatum.NodeName)
  250. } else {
  251. reservedCPUDiscount = 1.0 - (reserved.CPUCost / cpuCost)
  252. }
  253. reservedRAMDiscount := 0.0
  254. if ramCost == 0 {
  255. log.Warnf("No ram cost found for cluster '%s' node '%s'", costDatum.ClusterID, costDatum.NodeName)
  256. } else {
  257. reservedRAMDiscount = 1.0 - (reserved.RAMCost / ramCost)
  258. }
  259. // AWS passes the # of reserved CPU and RAM as -1 to represent "All"
  260. if reserved.ReservedCPU < 0 && reserved.ReservedRAM < 0 {
  261. blendedCPUDiscount = reservedCPUDiscount
  262. blendedRAMDiscount = reservedRAMDiscount
  263. } else {
  264. nodeCPU, ierr := strconv.ParseInt(costDatum.NodeData.VCPU, 10, 64)
  265. nodeRAM, ferr := strconv.ParseFloat(costDatum.NodeData.RAMBytes, 64)
  266. if ierr == nil && ferr == nil {
  267. nodeRAMGB := nodeRAM / 1024 / 1024 / 1024
  268. reservedRAMGB := float64(reserved.ReservedRAM) / 1024 / 1024 / 1024
  269. nonReservedCPU := nodeCPU - reserved.ReservedCPU
  270. nonReservedRAM := nodeRAMGB - reservedRAMGB
  271. if nonReservedCPU == 0 {
  272. blendedCPUDiscount = reservedCPUDiscount
  273. } else {
  274. if nodeCPU == 0 {
  275. log.Warnf("No ram found for cluster '%s' node '%s'", costDatum.ClusterID, costDatum.NodeName)
  276. } else {
  277. blendedCPUDiscount = (float64(reserved.ReservedCPU) * reservedCPUDiscount) + (float64(nonReservedCPU)*discount)/float64(nodeCPU)
  278. }
  279. }
  280. if nonReservedRAM == 0 {
  281. blendedRAMDiscount = reservedRAMDiscount
  282. } else {
  283. if nodeRAMGB == 0 {
  284. log.Warnf("No ram found for cluster '%s' node '%s'", costDatum.ClusterID, costDatum.NodeName)
  285. } else {
  286. blendedRAMDiscount = (reservedRAMGB * reservedRAMDiscount) + (nonReservedRAM*discount)/nodeRAMGB
  287. }
  288. }
  289. }
  290. }
  291. }
  292. return blendedCPUDiscount, blendedRAMDiscount
  293. }
  294. func parseVectorPricing(cfg *models.CustomPricing, cpuCostStr, ramCostStr, gpuCostStr, pvCostStr string) (float64, float64, float64, float64, bool) {
  295. usesCustom := false
  296. cpuCost, err := strconv.ParseFloat(cpuCostStr, 64)
  297. if err != nil || math.IsNaN(cpuCost) || math.IsInf(cpuCost, 0) || cpuCost == 0 {
  298. cpuCost, err = strconv.ParseFloat(cfg.CPU, 64)
  299. usesCustom = true
  300. if err != nil || math.IsNaN(cpuCost) || math.IsInf(cpuCost, 0) {
  301. cpuCost = 0
  302. }
  303. }
  304. ramCost, err := strconv.ParseFloat(ramCostStr, 64)
  305. if err != nil || math.IsNaN(ramCost) || math.IsInf(ramCost, 0) || ramCost == 0 {
  306. ramCost, err = strconv.ParseFloat(cfg.RAM, 64)
  307. usesCustom = true
  308. if err != nil || math.IsNaN(ramCost) || math.IsInf(ramCost, 0) {
  309. ramCost = 0
  310. }
  311. }
  312. gpuCost, err := strconv.ParseFloat(gpuCostStr, 64)
  313. if err != nil || math.IsNaN(gpuCost) || math.IsInf(gpuCost, 0) {
  314. gpuCost, err = strconv.ParseFloat(cfg.GPU, 64)
  315. if err != nil || math.IsNaN(gpuCost) || math.IsInf(gpuCost, 0) {
  316. gpuCost = 0
  317. }
  318. }
  319. pvCost, err := strconv.ParseFloat(pvCostStr, 64)
  320. if err != nil || math.IsNaN(cpuCost) || math.IsInf(cpuCost, 0) {
  321. pvCost, err = strconv.ParseFloat(cfg.Storage, 64)
  322. if err != nil || math.IsNaN(pvCost) || math.IsInf(pvCost, 0) {
  323. pvCost = 0
  324. }
  325. }
  326. return cpuCost, ramCost, gpuCost, pvCost, usesCustom
  327. }
  328. func getPriceVectors(cp models.Provider, costDatum *CostData, discount float64, customDiscount float64, idleCoefficient float64) ([]*util.Vector, []*util.Vector, []*util.Vector, [][]*util.Vector, []*util.Vector) {
  329. var cpuCost float64
  330. var ramCost float64
  331. var gpuCost float64
  332. var pvCost float64
  333. var usesCustom bool
  334. // If custom pricing is enabled and can be retrieved, replace
  335. // default cost values with custom values
  336. customPricing, err := cp.GetConfig()
  337. if err != nil {
  338. log.Errorf("failed to load custom pricing: %s", err)
  339. }
  340. if provider.CustomPricesEnabled(cp) && err == nil {
  341. var cpuCostStr string
  342. var ramCostStr string
  343. var gpuCostStr string
  344. var pvCostStr string
  345. if costDatum.NodeData.IsSpot() {
  346. cpuCostStr = customPricing.SpotCPU
  347. ramCostStr = customPricing.SpotRAM
  348. gpuCostStr = customPricing.SpotGPU
  349. } else {
  350. cpuCostStr = customPricing.CPU
  351. ramCostStr = customPricing.RAM
  352. gpuCostStr = customPricing.GPU
  353. }
  354. pvCostStr = customPricing.Storage
  355. cpuCost, ramCost, gpuCost, pvCost, usesCustom = parseVectorPricing(customPricing, cpuCostStr, ramCostStr, gpuCostStr, pvCostStr)
  356. } else if costDatum.NodeData == nil && err == nil {
  357. cpuCostStr := customPricing.CPU
  358. ramCostStr := customPricing.RAM
  359. gpuCostStr := customPricing.GPU
  360. pvCostStr := customPricing.Storage
  361. cpuCost, ramCost, gpuCost, pvCost, usesCustom = parseVectorPricing(customPricing, cpuCostStr, ramCostStr, gpuCostStr, pvCostStr)
  362. } else {
  363. cpuCostStr := costDatum.NodeData.VCPUCost
  364. ramCostStr := costDatum.NodeData.RAMCost
  365. gpuCostStr := costDatum.NodeData.GPUCost
  366. pvCostStr := costDatum.NodeData.StorageCost
  367. cpuCost, ramCost, gpuCost, pvCost, usesCustom = parseVectorPricing(customPricing, cpuCostStr, ramCostStr, gpuCostStr, pvCostStr)
  368. }
  369. if usesCustom {
  370. log.DedupedWarningf(5, "No pricing data found for node `%s` , using custom pricing", costDatum.NodeName)
  371. }
  372. cpuDiscount, ramDiscount := getDiscounts(costDatum, cpuCost, ramCost, discount)
  373. log.Debugf("Node Name: %s", costDatum.NodeName)
  374. log.Debugf("Blended CPU Discount: %f", cpuDiscount)
  375. log.Debugf("Blended RAM Discount: %f", ramDiscount)
  376. // TODO should we try to apply the rate coefficient here or leave it as a totals-only metric?
  377. rateCoeff := 1.0
  378. if idleCoefficient == 0 {
  379. idleCoefficient = 1.0
  380. }
  381. cpuv := make([]*util.Vector, 0, len(costDatum.CPUAllocation))
  382. for _, val := range costDatum.CPUAllocation {
  383. cpuv = append(cpuv, &util.Vector{
  384. Timestamp: math.Round(val.Timestamp/10) * 10,
  385. Value: (val.Value * cpuCost * (1 - cpuDiscount) * (1 - customDiscount) / idleCoefficient) * rateCoeff,
  386. })
  387. }
  388. ramv := make([]*util.Vector, 0, len(costDatum.RAMAllocation))
  389. for _, val := range costDatum.RAMAllocation {
  390. ramv = append(ramv, &util.Vector{
  391. Timestamp: math.Round(val.Timestamp/10) * 10,
  392. Value: ((val.Value / 1024 / 1024 / 1024) * ramCost * (1 - ramDiscount) * (1 - customDiscount) / idleCoefficient) * rateCoeff,
  393. })
  394. }
  395. gpuv := make([]*util.Vector, 0, len(costDatum.GPUReq))
  396. for _, val := range costDatum.GPUReq {
  397. gpuv = append(gpuv, &util.Vector{
  398. Timestamp: math.Round(val.Timestamp/10) * 10,
  399. Value: (val.Value * gpuCost * (1 - discount) * (1 - customDiscount) / idleCoefficient) * rateCoeff,
  400. })
  401. }
  402. pvvs := make([][]*util.Vector, 0, len(costDatum.PVCData))
  403. for _, pvcData := range costDatum.PVCData {
  404. pvv := make([]*util.Vector, 0, len(pvcData.Values))
  405. if pvcData.Volume != nil {
  406. cost, _ := strconv.ParseFloat(pvcData.Volume.Cost, 64)
  407. // override with custom pricing if enabled
  408. if provider.CustomPricesEnabled(cp) {
  409. cost = pvCost
  410. }
  411. for _, val := range pvcData.Values {
  412. pvv = append(pvv, &util.Vector{
  413. Timestamp: math.Round(val.Timestamp/10) * 10,
  414. Value: ((val.Value / 1024 / 1024 / 1024) * cost * (1 - customDiscount) / idleCoefficient) * rateCoeff,
  415. })
  416. }
  417. pvvs = append(pvvs, pvv)
  418. }
  419. }
  420. netv := make([]*util.Vector, 0, len(costDatum.NetworkData))
  421. for _, val := range costDatum.NetworkData {
  422. netv = append(netv, &util.Vector{
  423. Timestamp: math.Round(val.Timestamp/10) * 10,
  424. Value: val.Value,
  425. })
  426. }
  427. return cpuv, ramv, gpuv, pvvs, netv
  428. }
  429. func totalVectors(vectors []*util.Vector) float64 {
  430. total := 0.0
  431. for _, vector := range vectors {
  432. total += vector.Value
  433. }
  434. return total
  435. }
  436. // EmptyDataError describes an error caused by empty cost data for some
  437. // defined interval
  438. type EmptyDataError struct {
  439. err error
  440. window opencost.Window
  441. }
  442. // Error implements the error interface
  443. func (ede *EmptyDataError) Error() string {
  444. err := fmt.Sprintf("empty data for range: %s", ede.window)
  445. if ede.err != nil {
  446. err += fmt.Sprintf(": %s", ede.err)
  447. }
  448. return err
  449. }
  450. // ScaleHourlyCostData converts per-hour cost data to per-resolution data. If the target resolution is higher (i.e. < 1.0h)
  451. // then we can do simple multiplication by the fraction-of-an-hour and retain accuracy. If the target resolution is
  452. // lower (i.e. > 1.0h) then we sum groups of hourly data by resolution to maintain fidelity.
  453. // e.g. (100 hours of per-hour hourly data, resolutionHours=10) => 10 data points, grouped and summed by 10-hour window
  454. // e.g. (20 minutes of per-minute hourly data, resolutionHours=1/60) => 20 data points, scaled down by a factor of 60
  455. func ScaleHourlyCostData(data map[string]*CostData, resolutionHours float64) map[string]*CostData {
  456. scaled := map[string]*CostData{}
  457. for key, datum := range data {
  458. datum.RAMReq = scaleVectorSeries(datum.RAMReq, resolutionHours)
  459. datum.RAMUsed = scaleVectorSeries(datum.RAMUsed, resolutionHours)
  460. datum.RAMAllocation = scaleVectorSeries(datum.RAMAllocation, resolutionHours)
  461. datum.CPUReq = scaleVectorSeries(datum.CPUReq, resolutionHours)
  462. datum.CPUUsed = scaleVectorSeries(datum.CPUUsed, resolutionHours)
  463. datum.CPUAllocation = scaleVectorSeries(datum.CPUAllocation, resolutionHours)
  464. datum.GPUReq = scaleVectorSeries(datum.GPUReq, resolutionHours)
  465. datum.NetworkData = scaleVectorSeries(datum.NetworkData, resolutionHours)
  466. for _, pvcDatum := range datum.PVCData {
  467. pvcDatum.Values = scaleVectorSeries(pvcDatum.Values, resolutionHours)
  468. }
  469. scaled[key] = datum
  470. }
  471. return scaled
  472. }
  473. func scaleVectorSeries(vs []*util.Vector, resolutionHours float64) []*util.Vector {
  474. // if scaling to a lower resolution, compress the hourly data for maximum accuracy
  475. if resolutionHours > 1.0 {
  476. return compressVectorSeries(vs, resolutionHours)
  477. }
  478. // if scaling to a higher resolution, simply scale each value down by the fraction of an hour
  479. for _, v := range vs {
  480. v.Value *= resolutionHours
  481. }
  482. return vs
  483. }
  484. func compressVectorSeries(vs []*util.Vector, resolutionHours float64) []*util.Vector {
  485. if len(vs) == 0 {
  486. return vs
  487. }
  488. compressed := []*util.Vector{}
  489. threshold := float64(60 * 60 * resolutionHours)
  490. var acc *util.Vector
  491. for i, v := range vs {
  492. if acc == nil {
  493. // start a new accumulation from current datum
  494. acc = &util.Vector{
  495. Value: vs[i].Value,
  496. Timestamp: vs[i].Timestamp,
  497. }
  498. continue
  499. }
  500. if v.Timestamp-acc.Timestamp < threshold {
  501. // v should be accumulated in current datum
  502. acc.Value += v.Value
  503. } else {
  504. // v falls outside current datum's threshold; append and start a new one
  505. compressed = append(compressed, acc)
  506. acc = &util.Vector{
  507. Value: vs[i].Value,
  508. Timestamp: vs[i].Timestamp,
  509. }
  510. }
  511. }
  512. // append any remaining, incomplete accumulation
  513. if acc != nil {
  514. compressed = append(compressed, acc)
  515. }
  516. return compressed
  517. }
  518. // ScaleAggregationTimeSeries reverses the scaling done by ScaleHourlyCostData, returning
  519. // the aggregation's time series to hourly data.
  520. func ScaleAggregationTimeSeries(aggregation *Aggregation, resolutionHours float64) {
  521. for _, v := range aggregation.CPUCostVector {
  522. v.Value /= resolutionHours
  523. }
  524. for _, v := range aggregation.GPUCostVector {
  525. v.Value /= resolutionHours
  526. }
  527. for _, v := range aggregation.RAMCostVector {
  528. v.Value /= resolutionHours
  529. }
  530. for _, v := range aggregation.PVCostVector {
  531. v.Value /= resolutionHours
  532. }
  533. for _, v := range aggregation.NetworkCostVector {
  534. v.Value /= resolutionHours
  535. }
  536. for _, v := range aggregation.TotalCostVector {
  537. v.Value /= resolutionHours
  538. }
  539. }
  540. // String returns a string representation of the encapsulated shared resources, which
  541. // can be used to uniquely identify a set of shared resources. Sorting sets of shared
  542. // resources ensures that strings representing permutations of the same combination match.
  543. func (s *SharedResourceInfo) String() string {
  544. if s == nil {
  545. return ""
  546. }
  547. nss := []string{}
  548. for ns := range s.SharedNamespace {
  549. nss = append(nss, ns)
  550. }
  551. sort.Strings(nss)
  552. nsStr := strings.Join(nss, ",")
  553. labels := []string{}
  554. for lbl, vals := range s.LabelSelectors {
  555. for val := range vals {
  556. if lbl != "" && val != "" {
  557. labels = append(labels, fmt.Sprintf("%s=%s", lbl, val))
  558. }
  559. }
  560. }
  561. sort.Strings(labels)
  562. labelStr := strings.Join(labels, ",")
  563. return fmt.Sprintf("%s:%s", nsStr, labelStr)
  564. }
  565. // ParseAggregationProperties attempts to parse and return aggregation properties
  566. // encoded under the given key. If none exist, or if parsing fails, an error
  567. // is returned with empty AllocationProperties.
  568. func ParseAggregationProperties(aggregations []string) ([]string, error) {
  569. aggregateBy := []string{}
  570. // In case of no aggregation option, aggregate to the container, with a key Cluster/Node/Namespace/Pod/Container
  571. if len(aggregations) == 0 {
  572. aggregateBy = []string{
  573. opencost.AllocationClusterProp,
  574. opencost.AllocationNodeProp,
  575. opencost.AllocationNamespaceProp,
  576. opencost.AllocationPodProp,
  577. opencost.AllocationContainerProp,
  578. }
  579. } else if len(aggregations) == 1 && aggregations[0] == "all" {
  580. aggregateBy = []string{}
  581. } else {
  582. for _, agg := range aggregations {
  583. aggregate := strings.TrimSpace(agg)
  584. if aggregate != "" {
  585. if prop, err := opencost.ParseProperty(aggregate); err == nil {
  586. aggregateBy = append(aggregateBy, string(prop))
  587. } else if strings.HasPrefix(aggregate, "label:") {
  588. aggregateBy = append(aggregateBy, aggregate)
  589. } else if strings.HasPrefix(aggregate, "annotation:") {
  590. aggregateBy = append(aggregateBy, aggregate)
  591. }
  592. }
  593. }
  594. }
  595. return aggregateBy, nil
  596. }
  597. func (a *Accesses) warmAggregateCostModelCache() {
  598. const clusterCostsCacheMinutes = 5.0
  599. // Only allow one concurrent cache-warming operation
  600. sem := util.NewSemaphore(1)
  601. // Set default values, pulling them from application settings where applicable, and warm the cache
  602. // for the given duration. Cache is intentionally set to expire (i.e. noExpireCache=false) so that
  603. // if the default parameters change, the old cached defaults with eventually expire. Thus, the
  604. // timing of the cache expiry/refresh is the only mechanism ensuring 100% cache warmth.
  605. warmFunc := func(duration, offset time.Duration, cacheEfficiencyData bool) error {
  606. fmtDuration, fmtOffset := timeutil.DurationOffsetStrings(duration, offset)
  607. durationHrs, _ := timeutil.FormatDurationStringDaysToHours(fmtDuration)
  608. windowStr := fmt.Sprintf("%s offset %s", fmtDuration, fmtOffset)
  609. window, err := opencost.ParseWindowUTC(windowStr)
  610. if err != nil {
  611. return fmt.Errorf("invalid window from window string: %s", windowStr)
  612. }
  613. key := fmt.Sprintf("%s:%s", durationHrs, fmtOffset)
  614. totals, err := a.ComputeClusterCosts(a.DataSource, a.CloudProvider, duration, offset, cacheEfficiencyData)
  615. if err != nil {
  616. log.Infof("Error building cluster costs cache %s", key)
  617. }
  618. maxMinutesWithData := 0.0
  619. for _, cluster := range totals {
  620. if cluster.DataMinutes > maxMinutesWithData {
  621. maxMinutesWithData = cluster.DataMinutes
  622. }
  623. }
  624. if len(totals) > 0 && maxMinutesWithData > clusterCostsCacheMinutes {
  625. a.ClusterCostsCache.Set(key, totals, a.GetCacheExpiration(window.Duration()))
  626. log.Infof("caching %s cluster costs for %s", fmtDuration, a.GetCacheExpiration(window.Duration()))
  627. } else {
  628. log.Warnf("not caching %s cluster costs: no data or less than %f minutes data ", fmtDuration, clusterCostsCacheMinutes)
  629. }
  630. return err
  631. }
  632. // 1 day
  633. go func(sem *util.Semaphore) {
  634. defer errors.HandlePanic()
  635. offset := time.Minute
  636. duration := 24 * time.Hour
  637. for {
  638. sem.Acquire()
  639. warmFunc(duration, offset, true)
  640. sem.Return()
  641. log.Infof("aggregation: warm cache: %s", timeutil.DurationString(duration))
  642. time.Sleep(a.GetCacheRefresh(duration))
  643. }
  644. }(sem)
  645. if !env.IsETLEnabled() {
  646. // 2 day
  647. go func(sem *util.Semaphore) {
  648. defer errors.HandlePanic()
  649. offset := time.Minute
  650. duration := 2 * 24 * time.Hour
  651. for {
  652. sem.Acquire()
  653. warmFunc(duration, offset, false)
  654. sem.Return()
  655. log.Infof("aggregation: warm cache: %s", timeutil.DurationString(duration))
  656. time.Sleep(a.GetCacheRefresh(duration))
  657. }
  658. }(sem)
  659. // 7 day
  660. go func(sem *util.Semaphore) {
  661. defer errors.HandlePanic()
  662. offset := time.Minute
  663. duration := 7 * 24 * time.Hour
  664. for {
  665. sem.Acquire()
  666. err := warmFunc(duration, offset, false)
  667. sem.Return()
  668. log.Infof("aggregation: warm cache: %s", timeutil.DurationString(duration))
  669. if err == nil {
  670. time.Sleep(a.GetCacheRefresh(duration))
  671. } else {
  672. time.Sleep(5 * time.Minute)
  673. }
  674. }
  675. }(sem)
  676. // 30 day
  677. go func(sem *util.Semaphore) {
  678. defer errors.HandlePanic()
  679. for {
  680. offset := time.Minute
  681. duration := 30 * 24 * time.Hour
  682. sem.Acquire()
  683. err := warmFunc(duration, offset, false)
  684. sem.Return()
  685. if err == nil {
  686. time.Sleep(a.GetCacheRefresh(duration))
  687. } else {
  688. time.Sleep(5 * time.Minute)
  689. }
  690. }
  691. }(sem)
  692. }
  693. }
  694. func (a *Accesses) ComputeAllocationHandlerSummary(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  695. w.Header().Set("Content-Type", "application/json")
  696. qp := httputil.NewQueryParams(r.URL.Query())
  697. // Window is a required field describing the window of time over which to
  698. // compute allocation data.
  699. window, err := opencost.ParseWindowWithOffset(qp.Get("window", ""), env.GetParsedUTCOffset())
  700. if err != nil {
  701. http.Error(w, fmt.Sprintf("Invalid 'window' parameter: %s", err), http.StatusBadRequest)
  702. }
  703. // Step is an optional parameter that defines the duration per-set, i.e.
  704. // the window for an AllocationSet, of the AllocationSetRange to be
  705. // computed. Defaults to the window size, making one set.
  706. step := qp.GetDuration("step", window.Duration())
  707. // Resolution is an optional parameter, defaulting to the configured ETL
  708. // resolution.
  709. resolution := qp.GetDuration("resolution", env.GetETLResolution())
  710. // Aggregation is a required comma-separated list of fields by which to
  711. // aggregate results. Some fields allow a sub-field, which is distinguished
  712. // with a colon; e.g. "label:app".
  713. // Examples: "namespace", "namespace,label:app"
  714. aggregations := qp.GetList("aggregate", ",")
  715. aggregateBy, err := ParseAggregationProperties(aggregations)
  716. if err != nil {
  717. http.Error(w, fmt.Sprintf("Invalid 'aggregate' parameter: %s", err), http.StatusBadRequest)
  718. }
  719. // Accumulate is an optional parameter, defaulting to false, which if true
  720. // sums each Set in the Range, producing one Set.
  721. accumulate := qp.GetBool("accumulate", false)
  722. // Query for AllocationSets in increments of the given step duration,
  723. // appending each to the AllocationSetRange.
  724. asr := opencost.NewAllocationSetRange()
  725. stepStart := *window.Start()
  726. for window.End().After(stepStart) {
  727. stepEnd := stepStart.Add(step)
  728. stepWindow := opencost.NewWindow(&stepStart, &stepEnd)
  729. as, err := a.Model.ComputeAllocation(*stepWindow.Start(), *stepWindow.End(), resolution)
  730. if err != nil {
  731. WriteError(w, InternalServerError(err.Error()))
  732. return
  733. }
  734. asr.Append(as)
  735. stepStart = stepEnd
  736. }
  737. // Aggregate, if requested
  738. if len(aggregateBy) > 0 {
  739. err = asr.AggregateBy(aggregateBy, nil)
  740. if err != nil {
  741. WriteError(w, InternalServerError(err.Error()))
  742. return
  743. }
  744. }
  745. // Accumulate, if requested
  746. if accumulate {
  747. asr, err = asr.Accumulate(opencost.AccumulateOptionAll)
  748. if err != nil {
  749. WriteError(w, InternalServerError(err.Error()))
  750. return
  751. }
  752. }
  753. sasl := []*opencost.SummaryAllocationSet{}
  754. for _, as := range asr.Slice() {
  755. sas := opencost.NewSummaryAllocationSet(as, nil, nil, false, false)
  756. sasl = append(sasl, sas)
  757. }
  758. sasr := opencost.NewSummaryAllocationSetRange(sasl...)
  759. w.Write(WrapData(sasr, nil))
  760. }
  761. // ComputeAllocationHandler computes an AllocationSetRange from the CostModel.
  762. func (a *Accesses) ComputeAllocationHandler(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  763. w.Header().Set("Content-Type", "application/json")
  764. qp := httputil.NewQueryParams(r.URL.Query())
  765. // Window is a required field describing the window of time over which to
  766. // compute allocation data.
  767. window, err := opencost.ParseWindowWithOffset(qp.Get("window", ""), env.GetParsedUTCOffset())
  768. if err != nil {
  769. http.Error(w, fmt.Sprintf("Invalid 'window' parameter: %s", err), http.StatusBadRequest)
  770. }
  771. // Resolution is an optional parameter, defaulting to the configured ETL
  772. // resolution.
  773. resolution := qp.GetDuration("resolution", env.GetETLResolution())
  774. // Step is an optional parameter that defines the duration per-set, i.e.
  775. // the window for an AllocationSet, of the AllocationSetRange to be
  776. // computed. Defaults to the window size, making one set.
  777. step := qp.GetDuration("step", window.Duration())
  778. // Aggregation is an optional comma-separated list of fields by which to
  779. // aggregate results. Some fields allow a sub-field, which is distinguished
  780. // with a colon; e.g. "label:app".
  781. // Examples: "namespace", "namespace,label:app"
  782. aggregations := qp.GetList("aggregate", ",")
  783. aggregateBy, err := ParseAggregationProperties(aggregations)
  784. if err != nil {
  785. http.Error(w, fmt.Sprintf("Invalid 'aggregate' parameter: %s", err), http.StatusBadRequest)
  786. }
  787. // IncludeIdle, if true, uses Asset data to incorporate Idle Allocation
  788. includeIdle := qp.GetBool("includeIdle", false)
  789. // Accumulate is an optional parameter, defaulting to false, which if true
  790. // sums each Set in the Range, producing one Set.
  791. accumulate := qp.GetBool("accumulate", false)
  792. // Accumulate is an optional parameter that accumulates an AllocationSetRange
  793. // by the resolution of the given time duration.
  794. // Defaults to 0. If a value is not passed then the parameter is not used.
  795. accumulateBy := opencost.AccumulateOption(qp.Get("accumulateBy", ""))
  796. // if accumulateBy is not explicitly set, and accumulate is true, ensure result is accumulated
  797. if accumulateBy == opencost.AccumulateOptionNone && accumulate {
  798. accumulateBy = opencost.AccumulateOptionAll
  799. }
  800. // IdleByNode, if true, computes idle allocations at the node level.
  801. // Otherwise it is computed at the cluster level. (Not relevant if idle
  802. // is not included.)
  803. idleByNode := qp.GetBool("idleByNode", false)
  804. sharedLoadBalancer := qp.GetBool("sharelb", false)
  805. // IncludeProportionalAssetResourceCosts, if true,
  806. includeProportionalAssetResourceCosts := qp.GetBool("includeProportionalAssetResourceCosts", false)
  807. // include aggregated labels/annotations if true
  808. includeAggregatedMetadata := qp.GetBool("includeAggregatedMetadata", false)
  809. shareIdle := qp.GetBool("shareIdle", false)
  810. asr, err := a.Model.QueryAllocation(window, resolution, step, aggregateBy, includeIdle, idleByNode, includeProportionalAssetResourceCosts, includeAggregatedMetadata, sharedLoadBalancer, accumulateBy, shareIdle)
  811. if err != nil {
  812. if strings.Contains(strings.ToLower(err.Error()), "bad request") {
  813. WriteError(w, BadRequest(err.Error()))
  814. } else {
  815. WriteError(w, InternalServerError(err.Error()))
  816. }
  817. return
  818. }
  819. w.Write(WrapData(asr, nil))
  820. }
  821. // The below was transferred from a different package in order to maintain
  822. // previous behavior. Ultimately, we should clean this up at some point.
  823. // TODO move to util and/or standardize everything
  824. type Error struct {
  825. StatusCode int
  826. Body string
  827. }
  828. func WriteError(w http.ResponseWriter, err Error) {
  829. status := err.StatusCode
  830. if status == 0 {
  831. status = http.StatusInternalServerError
  832. }
  833. w.WriteHeader(status)
  834. resp, _ := json.Marshal(&Response{
  835. Code: status,
  836. Message: fmt.Sprintf("Error: %s", err.Body),
  837. })
  838. w.Write(resp)
  839. }
  840. func BadRequest(message string) Error {
  841. return Error{
  842. StatusCode: http.StatusBadRequest,
  843. Body: message,
  844. }
  845. }
  846. func InternalServerError(message string) Error {
  847. if message == "" {
  848. message = "Internal Server Error"
  849. }
  850. return Error{
  851. StatusCode: http.StatusInternalServerError,
  852. Body: message,
  853. }
  854. }
  855. func NotFound() Error {
  856. return Error{
  857. StatusCode: http.StatusNotFound,
  858. Body: "Not Found",
  859. }
  860. }