| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385 |
- package costmodel
- import (
- "fmt"
- "net/http"
- "strings"
- "time"
- "github.com/julienschmidt/httprouter"
- "github.com/opencost/opencost/pkg/errors"
- "github.com/opencost/opencost/core/pkg/log"
- "github.com/opencost/opencost/core/pkg/opencost"
- "github.com/opencost/opencost/core/pkg/util"
- "github.com/opencost/opencost/core/pkg/util/httputil"
- "github.com/opencost/opencost/core/pkg/util/json"
- "github.com/opencost/opencost/core/pkg/util/timeutil"
- "github.com/opencost/opencost/pkg/env"
- )
- const (
- // SplitTypeWeighted signals that shared costs should be shared
- // proportionally, rather than evenly
- SplitTypeWeighted = "weighted"
- // UnallocatedSubfield indicates an allocation datum that does not have the
- // chosen Aggregator; e.g. during aggregation by some label, there may be
- // cost data that do not have the given label.
- UnallocatedSubfield = "__unallocated__"
- )
- // ParseAggregationProperties attempts to parse and return aggregation properties
- // encoded under the given key. If none exist, or if parsing fails, an error
- // is returned with empty AllocationProperties.
- func ParseAggregationProperties(aggregations []string) ([]string, error) {
- aggregateBy := []string{}
- // In case of no aggregation option, aggregate to the container, with a key Cluster/Node/Namespace/Pod/Container
- if len(aggregations) == 0 {
- aggregateBy = []string{
- opencost.AllocationClusterProp,
- opencost.AllocationNodeProp,
- opencost.AllocationNamespaceProp,
- opencost.AllocationPodProp,
- opencost.AllocationContainerProp,
- }
- } else if len(aggregations) == 1 && aggregations[0] == "all" {
- aggregateBy = []string{}
- } else {
- for _, agg := range aggregations {
- aggregate := strings.TrimSpace(agg)
- if aggregate != "" {
- if prop, err := opencost.ParseProperty(aggregate); err == nil {
- aggregateBy = append(aggregateBy, string(prop))
- } else if strings.HasPrefix(aggregate, "label:") {
- aggregateBy = append(aggregateBy, aggregate)
- } else if strings.HasPrefix(aggregate, "annotation:") {
- aggregateBy = append(aggregateBy, aggregate)
- }
- }
- }
- }
- return aggregateBy, nil
- }
- func (a *Accesses) warmAggregateCostModelCache() {
- const clusterCostsCacheMinutes = 5.0
- // Only allow one concurrent cache-warming operation
- sem := util.NewSemaphore(1)
- // Set default values, pulling them from application settings where applicable, and warm the cache
- // for the given duration. Cache is intentionally set to expire (i.e. noExpireCache=false) so that
- // if the default parameters change, the old cached defaults with eventually expire. Thus, the
- // timing of the cache expiry/refresh is the only mechanism ensuring 100% cache warmth.
- warmFunc := func(duration, offset time.Duration, cacheEfficiencyData bool) error {
- fmtDuration, fmtOffset := timeutil.DurationOffsetStrings(duration, offset)
- durationHrs, _ := timeutil.FormatDurationStringDaysToHours(fmtDuration)
- windowStr := fmt.Sprintf("%s offset %s", fmtDuration, fmtOffset)
- window, err := opencost.ParseWindowUTC(windowStr)
- if err != nil {
- return fmt.Errorf("invalid window from window string: %s", windowStr)
- }
- key := fmt.Sprintf("%s:%s", durationHrs, fmtOffset)
- totals, err := a.ComputeClusterCosts(a.DataSource, a.CloudProvider, duration, offset, cacheEfficiencyData)
- if err != nil {
- log.Infof("Error building cluster costs cache %s", key)
- }
- maxMinutesWithData := 0.0
- for _, cluster := range totals {
- if cluster.DataMinutes > maxMinutesWithData {
- maxMinutesWithData = cluster.DataMinutes
- }
- }
- if len(totals) > 0 && maxMinutesWithData > clusterCostsCacheMinutes {
- a.ClusterCostsCache.Set(key, totals, a.GetCacheExpiration(window.Duration()))
- log.Infof("caching %s cluster costs for %s", fmtDuration, a.GetCacheExpiration(window.Duration()))
- } else {
- log.Warnf("not caching %s cluster costs: no data or less than %f minutes data ", fmtDuration, clusterCostsCacheMinutes)
- }
- return err
- }
- // 1 day
- go func(sem *util.Semaphore) {
- defer errors.HandlePanic()
- offset := time.Minute
- duration := 24 * time.Hour
- for {
- sem.Acquire()
- warmFunc(duration, offset, true)
- sem.Return()
- log.Infof("aggregation: warm cache: %s", timeutil.DurationString(duration))
- time.Sleep(a.GetCacheRefresh(duration))
- }
- }(sem)
- if !env.IsETLEnabled() {
- // 2 day
- go func(sem *util.Semaphore) {
- defer errors.HandlePanic()
- offset := time.Minute
- duration := 2 * 24 * time.Hour
- for {
- sem.Acquire()
- warmFunc(duration, offset, false)
- sem.Return()
- log.Infof("aggregation: warm cache: %s", timeutil.DurationString(duration))
- time.Sleep(a.GetCacheRefresh(duration))
- }
- }(sem)
- // 7 day
- go func(sem *util.Semaphore) {
- defer errors.HandlePanic()
- offset := time.Minute
- duration := 7 * 24 * time.Hour
- for {
- sem.Acquire()
- err := warmFunc(duration, offset, false)
- sem.Return()
- log.Infof("aggregation: warm cache: %s", timeutil.DurationString(duration))
- if err == nil {
- time.Sleep(a.GetCacheRefresh(duration))
- } else {
- time.Sleep(5 * time.Minute)
- }
- }
- }(sem)
- // 30 day
- go func(sem *util.Semaphore) {
- defer errors.HandlePanic()
- for {
- offset := time.Minute
- duration := 30 * 24 * time.Hour
- sem.Acquire()
- err := warmFunc(duration, offset, false)
- sem.Return()
- if err == nil {
- time.Sleep(a.GetCacheRefresh(duration))
- } else {
- time.Sleep(5 * time.Minute)
- }
- }
- }(sem)
- }
- }
- func (a *Accesses) ComputeAllocationHandlerSummary(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
- w.Header().Set("Content-Type", "application/json")
- qp := httputil.NewQueryParams(r.URL.Query())
- // Window is a required field describing the window of time over which to
- // compute allocation data.
- window, err := opencost.ParseWindowWithOffset(qp.Get("window", ""), env.GetParsedUTCOffset())
- if err != nil {
- http.Error(w, fmt.Sprintf("Invalid 'window' parameter: %s", err), http.StatusBadRequest)
- }
- // Step is an optional parameter that defines the duration per-set, i.e.
- // the window for an AllocationSet, of the AllocationSetRange to be
- // computed. Defaults to the window size, making one set.
- step := qp.GetDuration("step", window.Duration())
- // Resolution is an optional parameter, defaulting to the configured ETL
- // resolution.
- resolution := qp.GetDuration("resolution", env.GetETLResolution())
- // Aggregation is a required comma-separated list of fields by which to
- // aggregate results. Some fields allow a sub-field, which is distinguished
- // with a colon; e.g. "label:app".
- // Examples: "namespace", "namespace,label:app"
- aggregations := qp.GetList("aggregate", ",")
- aggregateBy, err := ParseAggregationProperties(aggregations)
- if err != nil {
- http.Error(w, fmt.Sprintf("Invalid 'aggregate' parameter: %s", err), http.StatusBadRequest)
- }
- // Accumulate is an optional parameter, defaulting to false, which if true
- // sums each Set in the Range, producing one Set.
- accumulate := qp.GetBool("accumulate", false)
- // Query for AllocationSets in increments of the given step duration,
- // appending each to the AllocationSetRange.
- asr := opencost.NewAllocationSetRange()
- stepStart := *window.Start()
- for window.End().After(stepStart) {
- stepEnd := stepStart.Add(step)
- stepWindow := opencost.NewWindow(&stepStart, &stepEnd)
- as, err := a.Model.ComputeAllocation(*stepWindow.Start(), *stepWindow.End(), resolution)
- if err != nil {
- WriteError(w, InternalServerError(err.Error()))
- return
- }
- asr.Append(as)
- stepStart = stepEnd
- }
- // Aggregate, if requested
- if len(aggregateBy) > 0 {
- err = asr.AggregateBy(aggregateBy, nil)
- if err != nil {
- WriteError(w, InternalServerError(err.Error()))
- return
- }
- }
- // Accumulate, if requested
- if accumulate {
- asr, err = asr.Accumulate(opencost.AccumulateOptionAll)
- if err != nil {
- WriteError(w, InternalServerError(err.Error()))
- return
- }
- }
- sasl := []*opencost.SummaryAllocationSet{}
- for _, as := range asr.Slice() {
- sas := opencost.NewSummaryAllocationSet(as, nil, nil, false, false)
- sasl = append(sasl, sas)
- }
- sasr := opencost.NewSummaryAllocationSetRange(sasl...)
- w.Write(WrapData(sasr, nil))
- }
- // ComputeAllocationHandler computes an AllocationSetRange from the CostModel.
- func (a *Accesses) ComputeAllocationHandler(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
- w.Header().Set("Content-Type", "application/json")
- qp := httputil.NewQueryParams(r.URL.Query())
- // Window is a required field describing the window of time over which to
- // compute allocation data.
- window, err := opencost.ParseWindowWithOffset(qp.Get("window", ""), env.GetParsedUTCOffset())
- if err != nil {
- http.Error(w, fmt.Sprintf("Invalid 'window' parameter: %s", err), http.StatusBadRequest)
- }
- // Resolution is an optional parameter, defaulting to the configured ETL
- // resolution.
- resolution := qp.GetDuration("resolution", env.GetETLResolution())
- // Step is an optional parameter that defines the duration per-set, i.e.
- // the window for an AllocationSet, of the AllocationSetRange to be
- // computed. Defaults to the window size, making one set.
- step := qp.GetDuration("step", window.Duration())
- // Aggregation is an optional comma-separated list of fields by which to
- // aggregate results. Some fields allow a sub-field, which is distinguished
- // with a colon; e.g. "label:app".
- // Examples: "namespace", "namespace,label:app"
- aggregations := qp.GetList("aggregate", ",")
- aggregateBy, err := ParseAggregationProperties(aggregations)
- if err != nil {
- http.Error(w, fmt.Sprintf("Invalid 'aggregate' parameter: %s", err), http.StatusBadRequest)
- }
- // IncludeIdle, if true, uses Asset data to incorporate Idle Allocation
- includeIdle := qp.GetBool("includeIdle", false)
- // Accumulate is an optional parameter, defaulting to false, which if true
- // sums each Set in the Range, producing one Set.
- accumulate := qp.GetBool("accumulate", false)
- // Accumulate is an optional parameter that accumulates an AllocationSetRange
- // by the resolution of the given time duration.
- // Defaults to 0. If a value is not passed then the parameter is not used.
- accumulateBy := opencost.AccumulateOption(qp.Get("accumulateBy", ""))
- // if accumulateBy is not explicitly set, and accumulate is true, ensure result is accumulated
- if accumulateBy == opencost.AccumulateOptionNone && accumulate {
- accumulateBy = opencost.AccumulateOptionAll
- }
- // IdleByNode, if true, computes idle allocations at the node level.
- // Otherwise it is computed at the cluster level. (Not relevant if idle
- // is not included.)
- idleByNode := qp.GetBool("idleByNode", false)
- sharedLoadBalancer := qp.GetBool("sharelb", false)
- // IncludeProportionalAssetResourceCosts, if true,
- includeProportionalAssetResourceCosts := qp.GetBool("includeProportionalAssetResourceCosts", false)
- // include aggregated labels/annotations if true
- includeAggregatedMetadata := qp.GetBool("includeAggregatedMetadata", false)
- shareIdle := qp.GetBool("shareIdle", false)
- asr, err := a.Model.QueryAllocation(window, resolution, step, aggregateBy, includeIdle, idleByNode, includeProportionalAssetResourceCosts, includeAggregatedMetadata, sharedLoadBalancer, accumulateBy, shareIdle)
- if err != nil {
- if strings.Contains(strings.ToLower(err.Error()), "bad request") {
- WriteError(w, BadRequest(err.Error()))
- } else {
- WriteError(w, InternalServerError(err.Error()))
- }
- return
- }
- w.Write(WrapData(asr, nil))
- }
- // The below was transferred from a different package in order to maintain
- // previous behavior. Ultimately, we should clean this up at some point.
- // TODO move to util and/or standardize everything
- type Error struct {
- StatusCode int
- Body string
- }
- func WriteError(w http.ResponseWriter, err Error) {
- status := err.StatusCode
- if status == 0 {
- status = http.StatusInternalServerError
- }
- w.WriteHeader(status)
- resp, _ := json.Marshal(&Response{
- Code: status,
- Message: fmt.Sprintf("Error: %s", err.Body),
- })
- w.Write(resp)
- }
- func BadRequest(message string) Error {
- return Error{
- StatusCode: http.StatusBadRequest,
- Body: message,
- }
- }
- func InternalServerError(message string) Error {
- if message == "" {
- message = "Internal Server Error"
- }
- return Error{
- StatusCode: http.StatusInternalServerError,
- Body: message,
- }
- }
- func NotFound() Error {
- return Error{
- StatusCode: http.StatusNotFound,
- Body: "Not Found",
- }
- }
|