2
0

aggregation.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385
  1. package costmodel
  2. import (
  3. "fmt"
  4. "net/http"
  5. "strings"
  6. "time"
  7. "github.com/julienschmidt/httprouter"
  8. "github.com/opencost/opencost/pkg/errors"
  9. "github.com/opencost/opencost/core/pkg/log"
  10. "github.com/opencost/opencost/core/pkg/opencost"
  11. "github.com/opencost/opencost/core/pkg/util"
  12. "github.com/opencost/opencost/core/pkg/util/httputil"
  13. "github.com/opencost/opencost/core/pkg/util/json"
  14. "github.com/opencost/opencost/core/pkg/util/timeutil"
  15. "github.com/opencost/opencost/pkg/env"
  16. )
  17. const (
  18. // SplitTypeWeighted signals that shared costs should be shared
  19. // proportionally, rather than evenly
  20. SplitTypeWeighted = "weighted"
  21. // UnallocatedSubfield indicates an allocation datum that does not have the
  22. // chosen Aggregator; e.g. during aggregation by some label, there may be
  23. // cost data that do not have the given label.
  24. UnallocatedSubfield = "__unallocated__"
  25. )
  26. // ParseAggregationProperties attempts to parse and return aggregation properties
  27. // encoded under the given key. If none exist, or if parsing fails, an error
  28. // is returned with empty AllocationProperties.
  29. func ParseAggregationProperties(aggregations []string) ([]string, error) {
  30. aggregateBy := []string{}
  31. // In case of no aggregation option, aggregate to the container, with a key Cluster/Node/Namespace/Pod/Container
  32. if len(aggregations) == 0 {
  33. aggregateBy = []string{
  34. opencost.AllocationClusterProp,
  35. opencost.AllocationNodeProp,
  36. opencost.AllocationNamespaceProp,
  37. opencost.AllocationPodProp,
  38. opencost.AllocationContainerProp,
  39. }
  40. } else if len(aggregations) == 1 && aggregations[0] == "all" {
  41. aggregateBy = []string{}
  42. } else {
  43. for _, agg := range aggregations {
  44. aggregate := strings.TrimSpace(agg)
  45. if aggregate != "" {
  46. if prop, err := opencost.ParseProperty(aggregate); err == nil {
  47. aggregateBy = append(aggregateBy, string(prop))
  48. } else if strings.HasPrefix(aggregate, "label:") {
  49. aggregateBy = append(aggregateBy, aggregate)
  50. } else if strings.HasPrefix(aggregate, "annotation:") {
  51. aggregateBy = append(aggregateBy, aggregate)
  52. }
  53. }
  54. }
  55. }
  56. return aggregateBy, nil
  57. }
  58. func (a *Accesses) warmAggregateCostModelCache() {
  59. const clusterCostsCacheMinutes = 5.0
  60. // Only allow one concurrent cache-warming operation
  61. sem := util.NewSemaphore(1)
  62. // Set default values, pulling them from application settings where applicable, and warm the cache
  63. // for the given duration. Cache is intentionally set to expire (i.e. noExpireCache=false) so that
  64. // if the default parameters change, the old cached defaults with eventually expire. Thus, the
  65. // timing of the cache expiry/refresh is the only mechanism ensuring 100% cache warmth.
  66. warmFunc := func(duration, offset time.Duration, cacheEfficiencyData bool) error {
  67. fmtDuration, fmtOffset := timeutil.DurationOffsetStrings(duration, offset)
  68. durationHrs, _ := timeutil.FormatDurationStringDaysToHours(fmtDuration)
  69. windowStr := fmt.Sprintf("%s offset %s", fmtDuration, fmtOffset)
  70. window, err := opencost.ParseWindowUTC(windowStr)
  71. if err != nil {
  72. return fmt.Errorf("invalid window from window string: %s", windowStr)
  73. }
  74. key := fmt.Sprintf("%s:%s", durationHrs, fmtOffset)
  75. totals, err := a.ComputeClusterCosts(a.DataSource, a.CloudProvider, duration, offset, cacheEfficiencyData)
  76. if err != nil {
  77. log.Infof("Error building cluster costs cache %s", key)
  78. }
  79. maxMinutesWithData := 0.0
  80. for _, cluster := range totals {
  81. if cluster.DataMinutes > maxMinutesWithData {
  82. maxMinutesWithData = cluster.DataMinutes
  83. }
  84. }
  85. if len(totals) > 0 && maxMinutesWithData > clusterCostsCacheMinutes {
  86. a.ClusterCostsCache.Set(key, totals, a.GetCacheExpiration(window.Duration()))
  87. log.Infof("caching %s cluster costs for %s", fmtDuration, a.GetCacheExpiration(window.Duration()))
  88. } else {
  89. log.Warnf("not caching %s cluster costs: no data or less than %f minutes data ", fmtDuration, clusterCostsCacheMinutes)
  90. }
  91. return err
  92. }
  93. // 1 day
  94. go func(sem *util.Semaphore) {
  95. defer errors.HandlePanic()
  96. offset := time.Minute
  97. duration := 24 * time.Hour
  98. for {
  99. sem.Acquire()
  100. warmFunc(duration, offset, true)
  101. sem.Return()
  102. log.Infof("aggregation: warm cache: %s", timeutil.DurationString(duration))
  103. time.Sleep(a.GetCacheRefresh(duration))
  104. }
  105. }(sem)
  106. if !env.IsETLEnabled() {
  107. // 2 day
  108. go func(sem *util.Semaphore) {
  109. defer errors.HandlePanic()
  110. offset := time.Minute
  111. duration := 2 * 24 * time.Hour
  112. for {
  113. sem.Acquire()
  114. warmFunc(duration, offset, false)
  115. sem.Return()
  116. log.Infof("aggregation: warm cache: %s", timeutil.DurationString(duration))
  117. time.Sleep(a.GetCacheRefresh(duration))
  118. }
  119. }(sem)
  120. // 7 day
  121. go func(sem *util.Semaphore) {
  122. defer errors.HandlePanic()
  123. offset := time.Minute
  124. duration := 7 * 24 * time.Hour
  125. for {
  126. sem.Acquire()
  127. err := warmFunc(duration, offset, false)
  128. sem.Return()
  129. log.Infof("aggregation: warm cache: %s", timeutil.DurationString(duration))
  130. if err == nil {
  131. time.Sleep(a.GetCacheRefresh(duration))
  132. } else {
  133. time.Sleep(5 * time.Minute)
  134. }
  135. }
  136. }(sem)
  137. // 30 day
  138. go func(sem *util.Semaphore) {
  139. defer errors.HandlePanic()
  140. for {
  141. offset := time.Minute
  142. duration := 30 * 24 * time.Hour
  143. sem.Acquire()
  144. err := warmFunc(duration, offset, false)
  145. sem.Return()
  146. if err == nil {
  147. time.Sleep(a.GetCacheRefresh(duration))
  148. } else {
  149. time.Sleep(5 * time.Minute)
  150. }
  151. }
  152. }(sem)
  153. }
  154. }
  155. func (a *Accesses) ComputeAllocationHandlerSummary(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  156. w.Header().Set("Content-Type", "application/json")
  157. qp := httputil.NewQueryParams(r.URL.Query())
  158. // Window is a required field describing the window of time over which to
  159. // compute allocation data.
  160. window, err := opencost.ParseWindowWithOffset(qp.Get("window", ""), env.GetParsedUTCOffset())
  161. if err != nil {
  162. http.Error(w, fmt.Sprintf("Invalid 'window' parameter: %s", err), http.StatusBadRequest)
  163. }
  164. // Step is an optional parameter that defines the duration per-set, i.e.
  165. // the window for an AllocationSet, of the AllocationSetRange to be
  166. // computed. Defaults to the window size, making one set.
  167. step := qp.GetDuration("step", window.Duration())
  168. // Resolution is an optional parameter, defaulting to the configured ETL
  169. // resolution.
  170. resolution := qp.GetDuration("resolution", env.GetETLResolution())
  171. // Aggregation is a required comma-separated list of fields by which to
  172. // aggregate results. Some fields allow a sub-field, which is distinguished
  173. // with a colon; e.g. "label:app".
  174. // Examples: "namespace", "namespace,label:app"
  175. aggregations := qp.GetList("aggregate", ",")
  176. aggregateBy, err := ParseAggregationProperties(aggregations)
  177. if err != nil {
  178. http.Error(w, fmt.Sprintf("Invalid 'aggregate' parameter: %s", err), http.StatusBadRequest)
  179. }
  180. // Accumulate is an optional parameter, defaulting to false, which if true
  181. // sums each Set in the Range, producing one Set.
  182. accumulate := qp.GetBool("accumulate", false)
  183. // Query for AllocationSets in increments of the given step duration,
  184. // appending each to the AllocationSetRange.
  185. asr := opencost.NewAllocationSetRange()
  186. stepStart := *window.Start()
  187. for window.End().After(stepStart) {
  188. stepEnd := stepStart.Add(step)
  189. stepWindow := opencost.NewWindow(&stepStart, &stepEnd)
  190. as, err := a.Model.ComputeAllocation(*stepWindow.Start(), *stepWindow.End(), resolution)
  191. if err != nil {
  192. WriteError(w, InternalServerError(err.Error()))
  193. return
  194. }
  195. asr.Append(as)
  196. stepStart = stepEnd
  197. }
  198. // Aggregate, if requested
  199. if len(aggregateBy) > 0 {
  200. err = asr.AggregateBy(aggregateBy, nil)
  201. if err != nil {
  202. WriteError(w, InternalServerError(err.Error()))
  203. return
  204. }
  205. }
  206. // Accumulate, if requested
  207. if accumulate {
  208. asr, err = asr.Accumulate(opencost.AccumulateOptionAll)
  209. if err != nil {
  210. WriteError(w, InternalServerError(err.Error()))
  211. return
  212. }
  213. }
  214. sasl := []*opencost.SummaryAllocationSet{}
  215. for _, as := range asr.Slice() {
  216. sas := opencost.NewSummaryAllocationSet(as, nil, nil, false, false)
  217. sasl = append(sasl, sas)
  218. }
  219. sasr := opencost.NewSummaryAllocationSetRange(sasl...)
  220. w.Write(WrapData(sasr, nil))
  221. }
  222. // ComputeAllocationHandler computes an AllocationSetRange from the CostModel.
  223. func (a *Accesses) ComputeAllocationHandler(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
  224. w.Header().Set("Content-Type", "application/json")
  225. qp := httputil.NewQueryParams(r.URL.Query())
  226. // Window is a required field describing the window of time over which to
  227. // compute allocation data.
  228. window, err := opencost.ParseWindowWithOffset(qp.Get("window", ""), env.GetParsedUTCOffset())
  229. if err != nil {
  230. http.Error(w, fmt.Sprintf("Invalid 'window' parameter: %s", err), http.StatusBadRequest)
  231. }
  232. // Resolution is an optional parameter, defaulting to the configured ETL
  233. // resolution.
  234. resolution := qp.GetDuration("resolution", env.GetETLResolution())
  235. // Step is an optional parameter that defines the duration per-set, i.e.
  236. // the window for an AllocationSet, of the AllocationSetRange to be
  237. // computed. Defaults to the window size, making one set.
  238. step := qp.GetDuration("step", window.Duration())
  239. // Aggregation is an optional comma-separated list of fields by which to
  240. // aggregate results. Some fields allow a sub-field, which is distinguished
  241. // with a colon; e.g. "label:app".
  242. // Examples: "namespace", "namespace,label:app"
  243. aggregations := qp.GetList("aggregate", ",")
  244. aggregateBy, err := ParseAggregationProperties(aggregations)
  245. if err != nil {
  246. http.Error(w, fmt.Sprintf("Invalid 'aggregate' parameter: %s", err), http.StatusBadRequest)
  247. }
  248. // IncludeIdle, if true, uses Asset data to incorporate Idle Allocation
  249. includeIdle := qp.GetBool("includeIdle", false)
  250. // Accumulate is an optional parameter, defaulting to false, which if true
  251. // sums each Set in the Range, producing one Set.
  252. accumulate := qp.GetBool("accumulate", false)
  253. // Accumulate is an optional parameter that accumulates an AllocationSetRange
  254. // by the resolution of the given time duration.
  255. // Defaults to 0. If a value is not passed then the parameter is not used.
  256. accumulateBy := opencost.AccumulateOption(qp.Get("accumulateBy", ""))
  257. // if accumulateBy is not explicitly set, and accumulate is true, ensure result is accumulated
  258. if accumulateBy == opencost.AccumulateOptionNone && accumulate {
  259. accumulateBy = opencost.AccumulateOptionAll
  260. }
  261. // IdleByNode, if true, computes idle allocations at the node level.
  262. // Otherwise it is computed at the cluster level. (Not relevant if idle
  263. // is not included.)
  264. idleByNode := qp.GetBool("idleByNode", false)
  265. sharedLoadBalancer := qp.GetBool("sharelb", false)
  266. // IncludeProportionalAssetResourceCosts, if true,
  267. includeProportionalAssetResourceCosts := qp.GetBool("includeProportionalAssetResourceCosts", false)
  268. // include aggregated labels/annotations if true
  269. includeAggregatedMetadata := qp.GetBool("includeAggregatedMetadata", false)
  270. shareIdle := qp.GetBool("shareIdle", false)
  271. asr, err := a.Model.QueryAllocation(window, resolution, step, aggregateBy, includeIdle, idleByNode, includeProportionalAssetResourceCosts, includeAggregatedMetadata, sharedLoadBalancer, accumulateBy, shareIdle)
  272. if err != nil {
  273. if strings.Contains(strings.ToLower(err.Error()), "bad request") {
  274. WriteError(w, BadRequest(err.Error()))
  275. } else {
  276. WriteError(w, InternalServerError(err.Error()))
  277. }
  278. return
  279. }
  280. w.Write(WrapData(asr, nil))
  281. }
  282. // The below was transferred from a different package in order to maintain
  283. // previous behavior. Ultimately, we should clean this up at some point.
  284. // TODO move to util and/or standardize everything
  285. type Error struct {
  286. StatusCode int
  287. Body string
  288. }
  289. func WriteError(w http.ResponseWriter, err Error) {
  290. status := err.StatusCode
  291. if status == 0 {
  292. status = http.StatusInternalServerError
  293. }
  294. w.WriteHeader(status)
  295. resp, _ := json.Marshal(&Response{
  296. Code: status,
  297. Message: fmt.Sprintf("Error: %s", err.Body),
  298. })
  299. w.Write(resp)
  300. }
  301. func BadRequest(message string) Error {
  302. return Error{
  303. StatusCode: http.StatusBadRequest,
  304. Body: message,
  305. }
  306. }
  307. func InternalServerError(message string) Error {
  308. if message == "" {
  309. message = "Internal Server Error"
  310. }
  311. return Error{
  312. StatusCode: http.StatusInternalServerError,
  313. Body: message,
  314. }
  315. }
  316. func NotFound() Error {
  317. return Error{
  318. StatusCode: http.StatusNotFound,
  319. Body: "Not Found",
  320. }
  321. }