|
|
@@ -30,6 +30,7 @@ import (
|
|
|
const (
|
|
|
prometheusServerEndpointEnvVar = "PROMETHEUS_SERVER_ENDPOINT"
|
|
|
prometheusTroubleshootingEp = "http://docs.kubecost.com/custom-prom#troubleshoot"
|
|
|
+ RFC3339Milli = "2006-01-02T15:04:05.000Z"
|
|
|
)
|
|
|
|
|
|
var (
|
|
|
@@ -61,7 +62,9 @@ type Accesses struct {
|
|
|
ServiceSelectorRecorder *prometheus.GaugeVec
|
|
|
DeploymentSelectorRecorder *prometheus.GaugeVec
|
|
|
Model *CostModel
|
|
|
- Cache *cache.Cache
|
|
|
+ AggregateCache *cache.Cache
|
|
|
+ CostDataCache *cache.Cache
|
|
|
+ OutOfClusterCache *cache.Cache
|
|
|
}
|
|
|
|
|
|
type DataEnvelope struct {
|
|
|
@@ -71,6 +74,45 @@ type DataEnvelope struct {
|
|
|
Message string `json:"message,omitempty"`
|
|
|
}
|
|
|
|
|
|
+// filterCostData allows through only CostData that matches the given filters for namespace and clusterId
|
|
|
+func filterCostData(data map[string]*CostData, namespace, clusterId string) map[string]*CostData {
|
|
|
+ result := make(map[string]*CostData)
|
|
|
+ for key, datum := range data {
|
|
|
+ if costDataPassesFilters(datum, namespace, clusterId) {
|
|
|
+ result[key] = datum
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return result
|
|
|
+}
|
|
|
+
|
|
|
+func filterFields(fields string, data map[string]*CostData) map[string]CostData {
|
|
|
+ fs := strings.Split(fields, ",")
|
|
|
+ fmap := make(map[string]bool)
|
|
|
+ for _, f := range fs {
|
|
|
+ fieldNameLower := strings.ToLower(f) // convert to go struct name by uppercasing first letter
|
|
|
+ klog.V(1).Infof("to delete: %s", fieldNameLower)
|
|
|
+ fmap[fieldNameLower] = true
|
|
|
+ }
|
|
|
+ filteredData := make(map[string]CostData)
|
|
|
+ for cname, costdata := range data {
|
|
|
+ s := reflect.TypeOf(*costdata)
|
|
|
+ val := reflect.ValueOf(*costdata)
|
|
|
+ costdata2 := CostData{}
|
|
|
+ cd2 := reflect.New(reflect.Indirect(reflect.ValueOf(costdata2)).Type()).Elem()
|
|
|
+ n := s.NumField()
|
|
|
+ for i := 0; i < n; i++ {
|
|
|
+ field := s.Field(i)
|
|
|
+ value := val.Field(i)
|
|
|
+ value2 := cd2.Field(i)
|
|
|
+ if _, ok := fmap[strings.ToLower(field.Name)]; !ok {
|
|
|
+ value2.Set(reflect.Value(value))
|
|
|
+ }
|
|
|
+ }
|
|
|
+ filteredData[cname] = cd2.Interface().(CostData)
|
|
|
+ }
|
|
|
+ return filteredData
|
|
|
+}
|
|
|
+
|
|
|
func normalizeTimeParam(param string) (string, error) {
|
|
|
// convert days to hours
|
|
|
if param[len(param)-1:] == "d" {
|
|
|
@@ -86,6 +128,65 @@ func normalizeTimeParam(param string) (string, error) {
|
|
|
return param, nil
|
|
|
}
|
|
|
|
|
|
+// parseDuration converts a Prometheus-style duration string into a Duration
|
|
|
+func parseDuration(duration string) (*time.Duration, error) {
|
|
|
+ unitStr := duration[len(duration)-1:]
|
|
|
+ var unit time.Duration
|
|
|
+ switch unitStr {
|
|
|
+ case "s":
|
|
|
+ unit = time.Second
|
|
|
+ case "m":
|
|
|
+ unit = time.Minute
|
|
|
+ case "h":
|
|
|
+ unit = time.Hour
|
|
|
+ case "d":
|
|
|
+ unit = 24.0 * time.Hour
|
|
|
+ default:
|
|
|
+ return nil, fmt.Errorf("error parsing duration: %s did not match expected format [0-9+](s|m|d|h)", duration)
|
|
|
+ }
|
|
|
+
|
|
|
+ amountStr := duration[:len(duration)-1]
|
|
|
+ amount, err := strconv.ParseInt(amountStr, 10, 64)
|
|
|
+ if err != nil {
|
|
|
+ return nil, fmt.Errorf("error parsing duration: %s did not match expected format [0-9+](s|m|d|h)", duration)
|
|
|
+ }
|
|
|
+
|
|
|
+ dur := time.Duration(amount) * unit
|
|
|
+ return &dur, nil
|
|
|
+}
|
|
|
+
|
|
|
+// parseTimeRange returns a start and end time, respectively, which are converted from
|
|
|
+// a duration and offset, defined as strings with Prometheus-style syntax.
|
|
|
+func parseTimeRange(duration, offset string) (*time.Time, *time.Time, error) {
|
|
|
+ // endTime defaults to the current time, unless an offset is explicity declared,
|
|
|
+ // in which case it shifts endTime back by given duration
|
|
|
+ endTime := time.Now()
|
|
|
+ if offset != "" {
|
|
|
+ o, err := time.ParseDuration(offset)
|
|
|
+ if err != nil {
|
|
|
+ return nil, nil, fmt.Errorf("error parsing offset (%s): %s", offset, err)
|
|
|
+ }
|
|
|
+ endTime = endTime.Add(-1 * o)
|
|
|
+ }
|
|
|
+
|
|
|
+ // if duration is defined in terms of days, convert to hours
|
|
|
+ // e.g. convert "2d" to "48h"
|
|
|
+ durationNorm, err := normalizeTimeParam(duration)
|
|
|
+ if err != nil {
|
|
|
+ return nil, nil, fmt.Errorf("error parsing duration (%s): %s", duration, err)
|
|
|
+ }
|
|
|
+
|
|
|
+ // convert time duration into start and end times, formatted
|
|
|
+ // as ISO datetime strings
|
|
|
+ dur, err := time.ParseDuration(durationNorm)
|
|
|
+ if err != nil {
|
|
|
+ return nil, nil, fmt.Errorf("errorf parsing duration (%s): %s", durationNorm, err)
|
|
|
+ }
|
|
|
+ startTime := endTime.Add(-1 * dur)
|
|
|
+
|
|
|
+ return &startTime, &endTime, nil
|
|
|
+}
|
|
|
+
|
|
|
func wrapDataWithMessage(data interface{}, err error, message string) []byte {
|
|
|
var resp []byte
|
|
|
|
|
|
@@ -143,34 +244,6 @@ func (a *Accesses) RefreshPricingData(w http.ResponseWriter, r *http.Request, ps
|
|
|
w.Write(wrapData(nil, err))
|
|
|
}
|
|
|
|
|
|
-func filterFields(fields string, data map[string]*CostData) map[string]CostData {
|
|
|
- fs := strings.Split(fields, ",")
|
|
|
- fmap := make(map[string]bool)
|
|
|
- for _, f := range fs {
|
|
|
- fieldNameLower := strings.ToLower(f) // convert to go struct name by uppercasing first letter
|
|
|
- klog.V(1).Infof("to delete: %s", fieldNameLower)
|
|
|
- fmap[fieldNameLower] = true
|
|
|
- }
|
|
|
- filteredData := make(map[string]CostData)
|
|
|
- for cname, costdata := range data {
|
|
|
- s := reflect.TypeOf(*costdata)
|
|
|
- val := reflect.ValueOf(*costdata)
|
|
|
- costdata2 := CostData{}
|
|
|
- cd2 := reflect.New(reflect.Indirect(reflect.ValueOf(costdata2)).Type()).Elem()
|
|
|
- n := s.NumField()
|
|
|
- for i := 0; i < n; i++ {
|
|
|
- field := s.Field(i)
|
|
|
- value := val.Field(i)
|
|
|
- value2 := cd2.Field(i)
|
|
|
- if _, ok := fmap[strings.ToLower(field.Name)]; !ok {
|
|
|
- value2.Set(reflect.Value(value))
|
|
|
- }
|
|
|
- }
|
|
|
- filteredData[cname] = cd2.Interface().(CostData)
|
|
|
- }
|
|
|
- return filteredData
|
|
|
-}
|
|
|
-
|
|
|
func (a *Accesses) CostDataModel(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
|
w.Header().Set("Access-Control-Allow-Origin", "*")
|
|
|
@@ -209,8 +282,7 @@ func (a *Accesses) CostDataModel(w http.ResponseWriter, r *http.Request, ps http
|
|
|
// dataCount is the number of time series data expected for the given interval,
|
|
|
// which we compute because Prometheus time series vectors omit zero values.
|
|
|
// This assumes hourly data, incremented by one to capture the 0th data point.
|
|
|
- dataCount := int64(dur.Hours()) + 1
|
|
|
- klog.V(1).Infof("for duration %s dataCount = %d", dur.String(), dataCount)
|
|
|
+ dataCount := int(dur.Hours())
|
|
|
|
|
|
opts := &AggregationOptions{
|
|
|
DataCount: dataCount,
|
|
|
@@ -260,7 +332,7 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
|
w.Header().Set("Access-Control-Allow-Origin", "*")
|
|
|
|
|
|
- window := r.URL.Query().Get("window")
|
|
|
+ duration := r.URL.Query().Get("window")
|
|
|
offset := r.URL.Query().Get("offset")
|
|
|
namespace := r.URL.Query().Get("namespace")
|
|
|
cluster := r.URL.Query().Get("cluster")
|
|
|
@@ -271,7 +343,7 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
|
|
|
sharedNamespaces := r.URL.Query().Get("sharedNamespaces")
|
|
|
sharedLabelNames := r.URL.Query().Get("sharedLabelNames")
|
|
|
sharedLabelValues := r.URL.Query().Get("sharedLabelValues")
|
|
|
- remote := r.URL.Query().Get("remote")
|
|
|
+ remote := r.URL.Query().Get("remote") != "false"
|
|
|
|
|
|
subfields := []string{}
|
|
|
if len(subfieldStr) > 0 {
|
|
|
@@ -287,85 +359,12 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
|
|
|
|
|
|
// disableCache, if set to "true", tells this function to recompute and
|
|
|
// cache the requested data
|
|
|
- disableCache := r.URL.Query().Get("disableCache") == "true" || allocateIdle
|
|
|
+ disableCache := r.URL.Query().Get("disableCache") == "true"
|
|
|
|
|
|
// clearCache, if set to "true", tells this function to flush the cache,
|
|
|
// then recompute and cache the requested data
|
|
|
clearCache := r.URL.Query().Get("clearCache") == "true"
|
|
|
|
|
|
- // time window must be defined, whether by window and offset or by manually
|
|
|
- // setting the start and end times as ISO time strings
|
|
|
- var start, end string
|
|
|
- var dur time.Duration
|
|
|
- layout := "2006-01-02T15:04:05.000Z"
|
|
|
- if window == "" {
|
|
|
- start = r.URL.Query().Get("start")
|
|
|
- startTime, err := time.Parse(layout, start)
|
|
|
- if err != nil {
|
|
|
- w.WriteHeader(http.StatusBadRequest)
|
|
|
- w.Write(wrapData(nil, fmt.Errorf("Invalid start parameter: %s", start)))
|
|
|
- return
|
|
|
- }
|
|
|
-
|
|
|
- end = r.URL.Query().Get("end")
|
|
|
- endTime, err := time.Parse(layout, end)
|
|
|
- if err != nil {
|
|
|
- w.WriteHeader(http.StatusBadRequest)
|
|
|
- w.Write(wrapData(nil, fmt.Errorf("Invalid end parameter: %s", end)))
|
|
|
- return
|
|
|
- }
|
|
|
-
|
|
|
- dur = endTime.Sub(startTime)
|
|
|
- } else {
|
|
|
- // endTime defaults to the current time, unless an offset is explicity declared,
|
|
|
- // in which case it shifts endTime back by given duration
|
|
|
- endTime := time.Now()
|
|
|
- if offset != "" {
|
|
|
- o, err := time.ParseDuration(offset)
|
|
|
- if err != nil {
|
|
|
- klog.V(1).Infof("error parsing offset: %s", err)
|
|
|
- w.Write(wrapData(nil, err))
|
|
|
- return
|
|
|
- }
|
|
|
- endTime = endTime.Add(-1 * o)
|
|
|
- }
|
|
|
-
|
|
|
- if a.ThanosClient != nil {
|
|
|
- if endTime.After(time.Now().Add(-3 * time.Hour)) {
|
|
|
- klog.Infof("Setting end time backwards to first present data")
|
|
|
- endTime = time.Now().Add(-3 * time.Hour)
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // if window is defined in terms of days, convert to hours
|
|
|
- // e.g. convert "2d" to "48h"
|
|
|
- window, err := normalizeTimeParam(window)
|
|
|
- if err != nil {
|
|
|
- w.Write(wrapData(nil, err))
|
|
|
- return
|
|
|
- }
|
|
|
-
|
|
|
- // convert time window into start and end times, formatted
|
|
|
- // as ISO datetime strings
|
|
|
- dur, err = time.ParseDuration(window)
|
|
|
- if err != nil {
|
|
|
- w.Write(wrapData(nil, err))
|
|
|
- return
|
|
|
- }
|
|
|
- startTime := endTime.Add(-1 * dur)
|
|
|
-
|
|
|
- start = startTime.Format(layout)
|
|
|
- end = endTime.Format(layout)
|
|
|
-
|
|
|
- klog.V(1).Infof("start: %s, end: %s", start, end)
|
|
|
- }
|
|
|
-
|
|
|
- // dataCount is the number of time series data expected for the given interval,
|
|
|
- // which we compute because Prometheus time series vectors omit zero values.
|
|
|
- // This assumes hourly data, incremented by one to capture the 0th data point.
|
|
|
- dataCount := int64(dur.Hours()) + 1
|
|
|
- klog.V(1).Infof("for duration %s dataCount = %d", dur.String(), dataCount)
|
|
|
-
|
|
|
// aggregation field is required
|
|
|
if field == "" {
|
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
|
@@ -390,40 +389,101 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
|
|
|
// clear cache prior to checking the cache so that a clearCache=true
|
|
|
// request always returns a freshly computed value
|
|
|
if clearCache {
|
|
|
- a.Cache.Flush()
|
|
|
+ a.AggregateCache.Flush()
|
|
|
+ a.CostDataCache.Flush()
|
|
|
}
|
|
|
|
|
|
// parametrize cache key by all request parameters
|
|
|
- aggKey := fmt.Sprintf("aggregate:%s:%s:%s:%s:%s:%s:%s:%t:%t:%t",
|
|
|
- window, offset, namespace, cluster, field, strings.Join(subfields, ","), rate, allocateIdle, includeTimeSeries, includeEfficiency)
|
|
|
+ aggKey := fmt.Sprintf("%s:%s:%s:%s:%s:%s:%s:%t:%t:%t",
|
|
|
+ duration, offset, namespace, cluster, field, strings.Join(subfields, ","), rate,
|
|
|
+ allocateIdle, includeTimeSeries, includeEfficiency)
|
|
|
|
|
|
// check the cache for aggregated response; if cache is hit and not disabled, return response
|
|
|
- if result, found := a.Cache.Get(aggKey); found && !disableCache {
|
|
|
- w.Write(wrapDataWithMessage(result, nil, fmt.Sprintf("cache hit: %s", aggKey)))
|
|
|
+ if result, found := a.AggregateCache.Get(aggKey); found && !disableCache {
|
|
|
+ w.Write(wrapDataWithMessage(result, nil, fmt.Sprintf("aggregate cache hit: %s", aggKey)))
|
|
|
return
|
|
|
}
|
|
|
|
|
|
+ // enable remote if it is available and not disabled
|
|
|
remoteAvailable := os.Getenv(remoteEnabled) == "true"
|
|
|
- remoteEnabled := false
|
|
|
- if remoteAvailable && remote != "false" {
|
|
|
- remoteEnabled = true
|
|
|
- }
|
|
|
+ remoteEnabled := remote && remoteAvailable
|
|
|
|
|
|
// Use Thanos Client if it exists (enabled) and remote flag set
|
|
|
var pClient prometheusClient.Client
|
|
|
- if remote != "false" && a.ThanosClient != nil {
|
|
|
+ if remote && a.ThanosClient != nil {
|
|
|
pClient = a.ThanosClient
|
|
|
} else {
|
|
|
pClient = a.PrometheusClient
|
|
|
}
|
|
|
|
|
|
- data, err := a.Model.ComputeCostDataRange(pClient, a.KubeClientSet, a.Cloud, start, end, "1h", namespace, cluster, remoteEnabled)
|
|
|
+ // convert duration and offset to start and end times
|
|
|
+ startTime, endTime, err := parseTimeRange(duration, offset)
|
|
|
if err != nil {
|
|
|
- klog.V(1).Infof("error computing cost data range: start=%s, end=%s, err=%s", start, end, err)
|
|
|
- w.Write(wrapData(nil, err))
|
|
|
+ w.WriteHeader(http.StatusBadRequest)
|
|
|
+ w.Write(wrapData(nil, fmt.Errorf("Error parsing duration (%s) and offset (%s)", duration, offset)))
|
|
|
+ return
|
|
|
+ }
|
|
|
+ durationHours := endTime.Sub(*startTime).Hours()
|
|
|
+
|
|
|
+ threeHoursAgo := time.Now().Add(-3 * time.Hour)
|
|
|
+ if a.ThanosClient != nil && endTime.After(threeHoursAgo) {
|
|
|
+ klog.Infof("Setting end time backwards to first present data")
|
|
|
+ *endTime = time.Now().Add(-3 * time.Hour)
|
|
|
+ }
|
|
|
+
|
|
|
+ // determine resolution by size of duration
|
|
|
+ resolution := "1h"
|
|
|
+ if durationHours >= 2160 {
|
|
|
+ // 90 days
|
|
|
+ resolution = "72h"
|
|
|
+ } else if durationHours >= 720 {
|
|
|
+ // 30 days
|
|
|
+ resolution = "24h"
|
|
|
+ } else if durationHours >= 168 {
|
|
|
+ // 7 days
|
|
|
+ resolution = "6h"
|
|
|
+ } else if durationHours >= 48 {
|
|
|
+ // 2 days
|
|
|
+ resolution = "2h"
|
|
|
+ }
|
|
|
+ resolutionDuration, err := parseDuration(resolution)
|
|
|
+ resolutionHours := resolutionDuration.Hours()
|
|
|
+ if err != nil {
|
|
|
+ w.WriteHeader(http.StatusBadRequest)
|
|
|
+ w.Write(wrapData(nil, fmt.Errorf("Error parsing resolution (%s)", resolution)))
|
|
|
return
|
|
|
}
|
|
|
|
|
|
+ // exclude the last window of the time frame to match Prometheus definitions of range, offset, and resolution
|
|
|
+ // e.g. requesting duration=2d, offset=1d, resolution=1h on Jan 4 12:00:00 should provide data for Jan 1 12:00 - Jan 3 12:00
|
|
|
+ // which has the equivalent start and end times of Jan 1 1:00 and Jan 3 12:00, respectively.
|
|
|
+ *startTime = startTime.Add(1 * *resolutionDuration)
|
|
|
+
|
|
|
+ // attempt to retrieve cost data from cache
|
|
|
+ var costData map[string]*CostData
|
|
|
+ key := fmt.Sprintf(`%s:%s:%s:%t`, duration, offset, resolution, remoteEnabled)
|
|
|
+ cacheData, found := a.CostDataCache.Get(key)
|
|
|
+ if found && !disableCache {
|
|
|
+ ok := false
|
|
|
+ costData, ok = cacheData.(map[string]*CostData)
|
|
|
+ if !ok {
|
|
|
+ klog.Errorf("caching error: failed to cast cost data to struct: %s", key)
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ start := startTime.Format(RFC3339Milli)
|
|
|
+ end := endTime.Format(RFC3339Milli)
|
|
|
+ costData, err = a.Model.ComputeCostDataRange(pClient, a.KubeClientSet, a.Cloud, start, end, resolution, "", "", remoteEnabled)
|
|
|
+ if err != nil {
|
|
|
+ w.Write(wrapData(nil, err))
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ a.CostDataCache.Set(key, costData, cache.DefaultExpiration)
|
|
|
+ }
|
|
|
+
|
|
|
+ // filter cost data by namespace and cluster after caching for maximal cache hits
|
|
|
+ costData = filterCostData(costData, namespace, cluster)
|
|
|
+
|
|
|
c, err := a.Cloud.GetConfig()
|
|
|
if err != nil {
|
|
|
w.Write(wrapData(nil, err))
|
|
|
@@ -438,14 +498,14 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
|
|
|
|
|
|
idleCoefficients := make(map[string]float64)
|
|
|
if allocateIdle {
|
|
|
- windowStr := fmt.Sprintf("%dh", int(dur.Hours()))
|
|
|
+ windowStr := fmt.Sprintf("%dh", int(durationHours))
|
|
|
if a.ThanosClient != nil {
|
|
|
klog.Infof("Setting offset to 3h")
|
|
|
offset = "3h"
|
|
|
}
|
|
|
- idleCoefficients, err = ComputeIdleCoefficient(data, pClient, a.Cloud, discount, windowStr, offset)
|
|
|
+ idleCoefficients, err = ComputeIdleCoefficient(costData, pClient, a.Cloud, discount, windowStr, offset, resolution)
|
|
|
if err != nil {
|
|
|
- klog.V(1).Infof("error computing idle coefficient: windowString=%s, offset=%s, err=%s", windowStr, offset, err)
|
|
|
+ klog.Errorf("error computing idle coefficient: windowString=%s, offset=%s, err=%s", windowStr, offset, err)
|
|
|
w.Write(wrapData(nil, err))
|
|
|
return
|
|
|
}
|
|
|
@@ -474,20 +534,24 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
|
|
|
klog.Infof("Idle Coeff: %s: %f", cid, idleCoefficient)
|
|
|
}
|
|
|
|
|
|
+ dataCount := int(durationHours / resolutionHours)
|
|
|
+ klog.V(1).Infof("data count = %d for duration (%fh) resolution (%fh)", dataCount, durationHours, resolutionHours)
|
|
|
+
|
|
|
// aggregate cost model data by given fields and cache the result for the default expiration
|
|
|
opts := &AggregationOptions{
|
|
|
- DataCount: dataCount,
|
|
|
- Discount: discount,
|
|
|
- IdleCoefficients: idleCoefficients,
|
|
|
- IncludeEfficiency: includeEfficiency,
|
|
|
- IncludeTimeSeries: includeTimeSeries,
|
|
|
- Rate: rate,
|
|
|
- SharedResourceInfo: sr,
|
|
|
+ DataCount: dataCount,
|
|
|
+ Discount: discount,
|
|
|
+ IdleCoefficients: idleCoefficients,
|
|
|
+ IncludeEfficiency: includeEfficiency,
|
|
|
+ IncludeTimeSeries: includeTimeSeries,
|
|
|
+ Rate: rate,
|
|
|
+ ResolutionCoefficient: resolutionHours,
|
|
|
+ SharedResourceInfo: sr,
|
|
|
}
|
|
|
- result := AggregateCostData(data, field, subfields, a.Cloud, opts)
|
|
|
- a.Cache.Set(aggKey, result, cache.DefaultExpiration)
|
|
|
+ result := AggregateCostData(costData, field, subfields, a.Cloud, opts)
|
|
|
+ a.AggregateCache.Set(aggKey, result, cache.DefaultExpiration)
|
|
|
|
|
|
- w.Write(wrapDataWithMessage(result, nil, fmt.Sprintf("cache miss: %s", aggKey)))
|
|
|
+ w.Write(wrapDataWithMessage(result, nil, fmt.Sprintf("aggregate cache miss: %s", aggKey)))
|
|
|
}
|
|
|
|
|
|
func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
|
|
|
@@ -535,33 +599,7 @@ func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps
|
|
|
}
|
|
|
discount = discount * 0.01
|
|
|
|
|
|
- layout := "2006-01-02T15:04:05.000Z"
|
|
|
- startTime, err := time.Parse(layout, start)
|
|
|
- if err != nil {
|
|
|
- w.Write(wrapData(nil, err))
|
|
|
- return
|
|
|
- }
|
|
|
- endTime, err := time.Parse(layout, end)
|
|
|
- if err != nil {
|
|
|
- w.Write(wrapData(nil, err))
|
|
|
- return
|
|
|
- }
|
|
|
-
|
|
|
- dur := endTime.Sub(startTime)
|
|
|
- if err != nil {
|
|
|
- w.Write(wrapData(nil, err))
|
|
|
- return
|
|
|
- }
|
|
|
- windowHrs, err := strconv.ParseInt(window[:len(window)-1], 10, 64)
|
|
|
-
|
|
|
- // dataCount is the number of time series data expected for the given interval,
|
|
|
- // which we compute because Prometheus time series vectors omit zero values.
|
|
|
- // This assumes hourly data, incremented by one to capture the 0th data point.
|
|
|
- dataCount := (int64(dur.Hours()) / windowHrs) + 1
|
|
|
- klog.V(1).Infof("for duration %s dataCount = %d", dur.String(), dataCount)
|
|
|
-
|
|
|
opts := &AggregationOptions{
|
|
|
- DataCount: dataCount,
|
|
|
Discount: discount,
|
|
|
IdleCoefficients: make(map[string]float64),
|
|
|
}
|
|
|
@@ -586,8 +624,6 @@ func (a *Accesses) CostDataModelRangeLarge(w http.ResponseWriter, r *http.Reques
|
|
|
endString := r.URL.Query().Get("end")
|
|
|
windowString := r.URL.Query().Get("window")
|
|
|
|
|
|
- layout := "2006-01-02T15:04:05.000Z"
|
|
|
-
|
|
|
var start time.Time
|
|
|
var end time.Time
|
|
|
var err error
|
|
|
@@ -596,7 +632,7 @@ func (a *Accesses) CostDataModelRangeLarge(w http.ResponseWriter, r *http.Reques
|
|
|
windowString = "1h"
|
|
|
}
|
|
|
if startString != "" {
|
|
|
- start, err = time.Parse(layout, startString)
|
|
|
+ start, err = time.Parse(RFC3339Milli, startString)
|
|
|
if err != nil {
|
|
|
klog.V(1).Infof("Error parsing time " + startString + ". Error: " + err.Error())
|
|
|
w.Write(wrapData(nil, err))
|
|
|
@@ -610,7 +646,7 @@ func (a *Accesses) CostDataModelRangeLarge(w http.ResponseWriter, r *http.Reques
|
|
|
start = time.Now().Add(-2 * window)
|
|
|
}
|
|
|
if endString != "" {
|
|
|
- end, err = time.Parse(layout, endString)
|
|
|
+ end, err = time.Parse(RFC3339Milli, endString)
|
|
|
if err != nil {
|
|
|
klog.V(1).Infof("Error parsing time " + endString + ". Error: " + err.Error())
|
|
|
w.Write(wrapData(nil, err))
|
|
|
@@ -646,6 +682,54 @@ func (a *Accesses) OutofClusterCosts(w http.ResponseWriter, r *http.Request, ps
|
|
|
w.Write(wrapData(data, err))
|
|
|
}
|
|
|
|
|
|
+func (a *Accesses) OutOfClusterCostsWithCache(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
|
|
|
+ w.Header().Set("Content-Type", "application/json")
|
|
|
+ w.Header().Set("Access-Control-Allow-Origin", "*")
|
|
|
+
|
|
|
+ // start date for which to query costs, inclusive; format YYYY-MM-DD
|
|
|
+ start := r.URL.Query().Get("start")
|
|
|
+ // end date for which to query costs, inclusive; format YYYY-MM-DD
|
|
|
+ end := r.URL.Query().Get("end")
|
|
|
+ // aggregator sets the field by which to aggregate; default, prepended by "kubernetes_"
|
|
|
+ kubernetesAggregation := r.URL.Query().Get("aggregator")
|
|
|
+ // customAggregation allows full customization of aggregator w/o prepending
|
|
|
+ customAggregation := r.URL.Query().Get("customAggregation")
|
|
|
+ // disableCache, if set to "true", tells this function to recompute and
|
|
|
+ // cache the requested data
|
|
|
+ disableCache := r.URL.Query().Get("disableCache") == "true"
|
|
|
+ // clearCache, if set to "true", tells this function to flush the cache,
|
|
|
+ // then recompute and cache the requested data
|
|
|
+ clearCache := r.URL.Query().Get("clearCache") == "true"
|
|
|
+
|
|
|
+ aggregation := "kubernetes_" + kubernetesAggregation
|
|
|
+ if customAggregation != "" {
|
|
|
+ aggregation = customAggregation
|
|
|
+ }
|
|
|
+
|
|
|
+ // clear cache prior to checking the cache so that a clearCache=true
|
|
|
+ // request always returns a freshly computed value
|
|
|
+ if clearCache {
|
|
|
+ a.OutOfClusterCache.Flush()
|
|
|
+ }
|
|
|
+
|
|
|
+ // attempt to retrieve cost data from cache
|
|
|
+ key := fmt.Sprintf(`%s:%s:%s`, start, end, aggregation)
|
|
|
+ if value, found := a.OutOfClusterCache.Get(key); found && !disableCache {
|
|
|
+ if data, ok := value.([]*costAnalyzerCloud.OutOfClusterAllocation); ok {
|
|
|
+ w.Write(wrapDataWithMessage(data, nil, fmt.Sprintf("out of cluser cache hit: %s", key)))
|
|
|
+ return
|
|
|
+ }
|
|
|
+ klog.Errorf("caching error: failed to type cast data: %s", key)
|
|
|
+ }
|
|
|
+
|
|
|
+ data, err := a.Cloud.ExternalAllocations(start, end, aggregation)
|
|
|
+ if err == nil {
|
|
|
+ a.OutOfClusterCache.Set(key, data, cache.DefaultExpiration)
|
|
|
+ }
|
|
|
+
|
|
|
+ w.Write(wrapDataWithMessage(data, err, fmt.Sprintf("out of cluser cache miss: %s", key)))
|
|
|
+}
|
|
|
+
|
|
|
func (p *Accesses) GetAllNodePricing(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
|
w.Header().Set("Access-Control-Allow-Origin", "*")
|
|
|
@@ -1059,7 +1143,9 @@ func init() {
|
|
|
})
|
|
|
|
|
|
// cache responses from model for a default of 5 minutes; clear expired responses every 10 minutes
|
|
|
- modelCache := cache.New(time.Minute*5, time.Minute*10)
|
|
|
+ aggregateCache := cache.New(time.Minute*5, time.Minute*10)
|
|
|
+ costDataCache := cache.New(time.Minute*5, time.Minute*10)
|
|
|
+ outOfClusterCache := cache.New(time.Minute*5, time.Minute*10)
|
|
|
|
|
|
A = Accesses{
|
|
|
PrometheusClient: promCli,
|
|
|
@@ -1079,7 +1165,9 @@ func init() {
|
|
|
NetworkInternetEgressRecorder: NetworkInternetEgressRecorder,
|
|
|
PersistentVolumePriceRecorder: pvGv,
|
|
|
Model: NewCostModel(kubeClientset),
|
|
|
- Cache: modelCache,
|
|
|
+ AggregateCache: aggregateCache,
|
|
|
+ CostDataCache: costDataCache,
|
|
|
+ OutOfClusterCache: outOfClusterCache,
|
|
|
}
|
|
|
|
|
|
remoteEnabled := os.Getenv(remoteEnabled)
|
|
|
@@ -1138,7 +1226,7 @@ func init() {
|
|
|
Router.GET("/costDataModel", A.CostDataModel)
|
|
|
Router.GET("/costDataModelRange", A.CostDataModelRange)
|
|
|
Router.GET("/costDataModelRangeLarge", A.CostDataModelRangeLarge)
|
|
|
- Router.GET("/outOfClusterCosts", A.OutofClusterCosts)
|
|
|
+ Router.GET("/outOfClusterCosts", A.OutOfClusterCostsWithCache)
|
|
|
Router.GET("/allNodePricing", A.GetAllNodePricing)
|
|
|
Router.GET("/healthz", Healthz)
|
|
|
Router.GET("/getConfigs", A.GetConfigs)
|