| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905 |
- package costmodel
- import (
- "context"
- "encoding/base64"
- "fmt"
- "io"
- "net/http"
- "os"
- "path"
- "reflect"
- "regexp"
- "strconv"
- "strings"
- "sync"
- "time"
- "github.com/microcosm-cc/bluemonday"
- "github.com/opencost/opencost/core/pkg/opencost"
- "github.com/opencost/opencost/core/pkg/util/httputil"
- "github.com/opencost/opencost/core/pkg/util/timeutil"
- "github.com/opencost/opencost/core/pkg/util/watcher"
- "github.com/opencost/opencost/core/pkg/version"
- "github.com/opencost/opencost/pkg/cloud/aws"
- cloudconfig "github.com/opencost/opencost/pkg/cloud/config"
- "github.com/opencost/opencost/pkg/cloud/gcp"
- "github.com/opencost/opencost/pkg/cloud/provider"
- "github.com/opencost/opencost/pkg/cloudcost"
- "github.com/opencost/opencost/pkg/config"
- clustermap "github.com/opencost/opencost/pkg/costmodel/clusters"
- "github.com/opencost/opencost/pkg/customcost"
- "github.com/opencost/opencost/pkg/kubeconfig"
- "github.com/opencost/opencost/pkg/metrics"
- "github.com/opencost/opencost/pkg/services"
- "github.com/spf13/viper"
- v1 "k8s.io/api/core/v1"
- "github.com/julienschmidt/httprouter"
- "github.com/getsentry/sentry-go"
- "github.com/opencost/opencost/core/pkg/clusters"
- sysenv "github.com/opencost/opencost/core/pkg/env"
- "github.com/opencost/opencost/core/pkg/log"
- "github.com/opencost/opencost/core/pkg/util/json"
- "github.com/opencost/opencost/pkg/cloud/azure"
- "github.com/opencost/opencost/pkg/cloud/models"
- "github.com/opencost/opencost/pkg/cloud/utils"
- "github.com/opencost/opencost/pkg/clustercache"
- "github.com/opencost/opencost/pkg/env"
- "github.com/opencost/opencost/pkg/errors"
- "github.com/opencost/opencost/pkg/prom"
- "github.com/opencost/opencost/pkg/thanos"
- prometheus "github.com/prometheus/client_golang/api"
- prometheusAPI "github.com/prometheus/client_golang/api/prometheus/v1"
- appsv1 "k8s.io/api/apps/v1"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "github.com/patrickmn/go-cache"
- "k8s.io/client-go/kubernetes"
- )
- var sanitizePolicy = bluemonday.UGCPolicy()
- const (
- RFC3339Milli = "2006-01-02T15:04:05.000Z"
- maxCacheMinutes1d = 11
- maxCacheMinutes2d = 17
- maxCacheMinutes7d = 37
- maxCacheMinutes30d = 137
- CustomPricingSetting = "CustomPricing"
- DiscountSetting = "Discount"
- epRules = apiPrefix + "/rules"
- LogSeparator = "+-------------------------------------------------------------------------------------"
- )
- var (
- // gitCommit is set by the build system
- gitCommit string
- // ANSIRegex matches ANSI escape and colors https://en.wikipedia.org/wiki/ANSI_escape_code
- ANSIRegex = regexp.MustCompile("\x1b\\[[0-9;]*m")
- )
- // Accesses defines a singleton application instance, providing access to
- // Prometheus, Kubernetes, the cloud provider, and caches.
- type Accesses struct {
- Router *httprouter.Router
- PrometheusClient prometheus.Client
- ThanosClient prometheus.Client
- KubeClientSet kubernetes.Interface
- ClusterCache clustercache.ClusterCache
- ClusterMap clusters.ClusterMap
- CloudProvider models.Provider
- ConfigFileManager *config.ConfigFileManager
- CloudConfigController *cloudconfig.Controller
- CloudCostPipelineService *cloudcost.PipelineService
- CloudCostQueryService *cloudcost.QueryService
- CustomCostQueryService *customcost.QueryService
- CustomCostPipelineService *customcost.PipelineService
- ClusterInfoProvider clusters.ClusterInfoProvider
- Model *CostModel
- MetricsEmitter *CostModelMetricsEmitter
- OutOfClusterCache *cache.Cache
- AggregateCache *cache.Cache
- CostDataCache *cache.Cache
- ClusterCostsCache *cache.Cache
- CacheExpiration map[time.Duration]time.Duration
- AggAPI Aggregator
- // SettingsCache stores current state of app settings
- SettingsCache *cache.Cache
- // settingsSubscribers tracks channels through which changes to different
- // settings will be published in a pub/sub model
- settingsSubscribers map[string][]chan string
- settingsMutex sync.Mutex
- // registered http service instances
- httpServices services.HTTPServices
- }
- // GetPrometheusClient decides whether the default Prometheus client or the Thanos client
- // should be used.
- func (a *Accesses) GetPrometheusClient(remote bool) prometheus.Client {
- // Use Thanos Client if it exists (enabled) and remote flag set
- var pc prometheus.Client
- if remote && a.ThanosClient != nil {
- pc = a.ThanosClient
- } else {
- pc = a.PrometheusClient
- }
- return pc
- }
- // GetCacheExpiration looks up and returns custom cache expiration for the given duration.
- // If one does not exists, it returns the default cache expiration, which is defined by
- // the particular cache.
- func (a *Accesses) GetCacheExpiration(dur time.Duration) time.Duration {
- if expiration, ok := a.CacheExpiration[dur]; ok {
- return expiration
- }
- return cache.DefaultExpiration
- }
- // GetCacheRefresh determines how long to wait before refreshing the cache for the given duration,
- // which is done 1 minute before we expect the cache to expire, or 1 minute if expiration is
- // not found or is less than 2 minutes.
- func (a *Accesses) GetCacheRefresh(dur time.Duration) time.Duration {
- expiry := a.GetCacheExpiration(dur).Minutes()
- if expiry <= 2.0 {
- return time.Minute
- }
- mins := time.Duration(expiry/2.0) * time.Minute
- return mins
- }
- func (a *Accesses) ClusterCostsFromCacheHandler(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
- w.Header().Set("Content-Type", "application/json")
- duration := 24 * time.Hour
- offset := time.Minute
- durationHrs := "24h"
- fmtOffset := "1m"
- pClient := a.GetPrometheusClient(true)
- key := fmt.Sprintf("%s:%s", durationHrs, fmtOffset)
- if data, valid := a.ClusterCostsCache.Get(key); valid {
- clusterCosts := data.(map[string]*ClusterCosts)
- w.Write(WrapDataWithMessage(clusterCosts, nil, "clusterCosts cache hit"))
- } else {
- data, err := a.ComputeClusterCosts(pClient, a.CloudProvider, duration, offset, true)
- w.Write(WrapDataWithMessage(data, err, fmt.Sprintf("clusterCosts cache miss: %s", key)))
- }
- }
- type Response struct {
- Code int `json:"code"`
- Status string `json:"status"`
- Data interface{} `json:"data"`
- Message string `json:"message,omitempty"`
- Warning string `json:"warning,omitempty"`
- }
- // FilterFunc is a filter that returns true iff the given CostData should be filtered out, and the environment that was used as the filter criteria, if it was an aggregate
- type FilterFunc func(*CostData) (bool, string)
- // FilterCostData allows through only CostData that matches all the given filter functions
- func FilterCostData(data map[string]*CostData, retains []FilterFunc, filters []FilterFunc) (map[string]*CostData, int, map[string]int) {
- result := make(map[string]*CostData)
- filteredEnvironments := make(map[string]int)
- filteredContainers := 0
- DataLoop:
- for key, datum := range data {
- for _, rf := range retains {
- if ok, _ := rf(datum); ok {
- result[key] = datum
- // if any retain function passes, the data is retained and move on
- continue DataLoop
- }
- }
- for _, ff := range filters {
- if ok, environment := ff(datum); !ok {
- if environment != "" {
- filteredEnvironments[environment]++
- }
- filteredContainers++
- // if any filter function check fails, move on to the next datum
- continue DataLoop
- }
- }
- result[key] = datum
- }
- return result, filteredContainers, filteredEnvironments
- }
- func filterFields(fields string, data map[string]*CostData) map[string]CostData {
- fs := strings.Split(fields, ",")
- fmap := make(map[string]bool)
- for _, f := range fs {
- fieldNameLower := strings.ToLower(f) // convert to go struct name by uppercasing first letter
- log.Debugf("to delete: %s", fieldNameLower)
- fmap[fieldNameLower] = true
- }
- filteredData := make(map[string]CostData)
- for cname, costdata := range data {
- s := reflect.TypeOf(*costdata)
- val := reflect.ValueOf(*costdata)
- costdata2 := CostData{}
- cd2 := reflect.New(reflect.Indirect(reflect.ValueOf(costdata2)).Type()).Elem()
- n := s.NumField()
- for i := 0; i < n; i++ {
- field := s.Field(i)
- value := val.Field(i)
- value2 := cd2.Field(i)
- if _, ok := fmap[strings.ToLower(field.Name)]; !ok {
- value2.Set(reflect.Value(value))
- }
- }
- filteredData[cname] = cd2.Interface().(CostData)
- }
- return filteredData
- }
- func normalizeTimeParam(param string) (string, error) {
- if param == "" {
- return "", fmt.Errorf("invalid time param")
- }
- // convert days to hours
- if param[len(param)-1:] == "d" {
- count := param[:len(param)-1]
- val, err := strconv.ParseInt(count, 10, 64)
- if err != nil {
- return "", err
- }
- val = val * 24
- param = fmt.Sprintf("%dh", val)
- }
- return param, nil
- }
- // ParsePercentString takes a string of expected format "N%" and returns a floating point 0.0N.
- // If the "%" symbol is missing, it just returns 0.0N. Empty string is interpreted as "0%" and
- // return 0.0.
- func ParsePercentString(percentStr string) (float64, error) {
- if len(percentStr) == 0 {
- return 0.0, nil
- }
- if percentStr[len(percentStr)-1:] == "%" {
- percentStr = percentStr[:len(percentStr)-1]
- }
- discount, err := strconv.ParseFloat(percentStr, 64)
- if err != nil {
- return 0.0, err
- }
- discount *= 0.01
- return discount, nil
- }
- func WrapData(data interface{}, err error) []byte {
- var resp []byte
- if err != nil {
- log.Errorf("Error returned to client: %s", err.Error())
- resp, _ = json.Marshal(&Response{
- Code: http.StatusInternalServerError,
- Status: "error",
- Message: err.Error(),
- Data: data,
- })
- } else {
- resp, err = json.Marshal(&Response{
- Code: http.StatusOK,
- Status: "success",
- Data: data,
- })
- if err != nil {
- log.Errorf("error marshaling response json: %s", err.Error())
- }
- }
- return resp
- }
- func WrapDataWithMessage(data interface{}, err error, message string) []byte {
- var resp []byte
- if err != nil {
- log.Errorf("Error returned to client: %s", err.Error())
- resp, _ = json.Marshal(&Response{
- Code: http.StatusInternalServerError,
- Status: "error",
- Message: err.Error(),
- Data: data,
- })
- } else {
- resp, _ = json.Marshal(&Response{
- Code: http.StatusOK,
- Status: "success",
- Data: data,
- Message: message,
- })
- }
- return resp
- }
- func WrapDataWithWarning(data interface{}, err error, warning string) []byte {
- var resp []byte
- if err != nil {
- log.Errorf("Error returned to client: %s", err.Error())
- resp, _ = json.Marshal(&Response{
- Code: http.StatusInternalServerError,
- Status: "error",
- Message: err.Error(),
- Warning: warning,
- Data: data,
- })
- } else {
- resp, _ = json.Marshal(&Response{
- Code: http.StatusOK,
- Status: "success",
- Data: data,
- Warning: warning,
- })
- }
- return resp
- }
- func WrapDataWithMessageAndWarning(data interface{}, err error, message, warning string) []byte {
- var resp []byte
- if err != nil {
- log.Errorf("Error returned to client: %s", err.Error())
- resp, _ = json.Marshal(&Response{
- Code: http.StatusInternalServerError,
- Status: "error",
- Message: err.Error(),
- Warning: warning,
- Data: data,
- })
- } else {
- resp, _ = json.Marshal(&Response{
- Code: http.StatusOK,
- Status: "success",
- Data: data,
- Message: message,
- Warning: warning,
- })
- }
- return resp
- }
- // wrapAsObjectItems wraps a slice of items into an object containing a single items list
- // allows our k8s proxy methods to emulate a List() request to k8s API
- func wrapAsObjectItems(items interface{}) map[string]interface{} {
- return map[string]interface{}{
- "items": items,
- }
- }
- // RefreshPricingData needs to be called when a new node joins the fleet, since we cache the relevant subsets of pricing data to avoid storing the whole thing.
- func (a *Accesses) RefreshPricingData(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
- w.Header().Set("Content-Type", "application/json")
- w.Header().Set("Access-Control-Allow-Origin", "*")
- err := a.CloudProvider.DownloadPricingData()
- if err != nil {
- log.Errorf("Error refreshing pricing data: %s", err.Error())
- }
- w.Write(WrapData(nil, err))
- }
- func (a *Accesses) CostDataModel(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
- w.Header().Set("Content-Type", "application/json")
- w.Header().Set("Access-Control-Allow-Origin", "*")
- window := r.URL.Query().Get("timeWindow")
- offset := r.URL.Query().Get("offset")
- fields := r.URL.Query().Get("filterFields")
- namespace := r.URL.Query().Get("namespace")
- if offset != "" {
- offset = "offset " + offset
- }
- data, err := a.Model.ComputeCostData(a.PrometheusClient, a.CloudProvider, window, offset, namespace)
- if fields != "" {
- filteredData := filterFields(fields, data)
- w.Write(WrapData(filteredData, err))
- } else {
- w.Write(WrapData(data, err))
- }
- }
- func (a *Accesses) ClusterCosts(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
- w.Header().Set("Content-Type", "application/json")
- w.Header().Set("Access-Control-Allow-Origin", "*")
- window := r.URL.Query().Get("window")
- offset := r.URL.Query().Get("offset")
- if window == "" {
- w.Write(WrapData(nil, fmt.Errorf("missing window argument")))
- return
- }
- windowDur, err := timeutil.ParseDuration(window)
- if err != nil {
- w.Write(WrapData(nil, fmt.Errorf("error parsing window (%s): %s", window, err)))
- return
- }
- // offset is not a required parameter
- var offsetDur time.Duration
- if offset != "" {
- offsetDur, err = timeutil.ParseDuration(offset)
- if err != nil {
- w.Write(WrapData(nil, fmt.Errorf("error parsing offset (%s): %s", offset, err)))
- return
- }
- }
- useThanos, _ := strconv.ParseBool(r.URL.Query().Get("multi"))
- if useThanos && !thanos.IsEnabled() {
- w.Write(WrapData(nil, fmt.Errorf("Multi=true while Thanos is not enabled.")))
- return
- }
- var client prometheus.Client
- if useThanos {
- client = a.ThanosClient
- offsetDur = thanos.OffsetDuration()
- } else {
- client = a.PrometheusClient
- }
- data, err := a.ComputeClusterCosts(client, a.CloudProvider, windowDur, offsetDur, true)
- w.Write(WrapData(data, err))
- }
- func (a *Accesses) ClusterCostsOverTime(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
- w.Header().Set("Content-Type", "application/json")
- w.Header().Set("Access-Control-Allow-Origin", "*")
- start := r.URL.Query().Get("start")
- end := r.URL.Query().Get("end")
- window := r.URL.Query().Get("window")
- offset := r.URL.Query().Get("offset")
- if window == "" {
- w.Write(WrapData(nil, fmt.Errorf("missing window argument")))
- return
- }
- windowDur, err := timeutil.ParseDuration(window)
- if err != nil {
- w.Write(WrapData(nil, fmt.Errorf("error parsing window (%s): %s", window, err)))
- return
- }
- // offset is not a required parameter
- var offsetDur time.Duration
- if offset != "" {
- offsetDur, err = timeutil.ParseDuration(offset)
- if err != nil {
- w.Write(WrapData(nil, fmt.Errorf("error parsing offset (%s): %s", offset, err)))
- return
- }
- }
- data, err := ClusterCostsOverTime(a.PrometheusClient, a.CloudProvider, start, end, windowDur, offsetDur)
- w.Write(WrapData(data, err))
- }
- func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
- w.Header().Set("Content-Type", "application/json")
- w.Header().Set("Access-Control-Allow-Origin", "*")
- startStr := r.URL.Query().Get("start")
- endStr := r.URL.Query().Get("end")
- windowStr := r.URL.Query().Get("window")
- fields := r.URL.Query().Get("filterFields")
- namespace := r.URL.Query().Get("namespace")
- cluster := r.URL.Query().Get("cluster")
- remote := r.URL.Query().Get("remote")
- remoteEnabled := env.IsRemoteEnabled() && remote != "false"
- layout := "2006-01-02T15:04:05.000Z"
- start, err := time.Parse(layout, startStr)
- if err != nil {
- w.Write(WrapDataWithMessage(nil, fmt.Errorf("invalid start date: %s", startStr), fmt.Sprintf("invalid start date: %s", startStr)))
- return
- }
- end, err := time.Parse(layout, endStr)
- if err != nil {
- w.Write(WrapDataWithMessage(nil, fmt.Errorf("invalid end date: %s", endStr), fmt.Sprintf("invalid end date: %s", endStr)))
- return
- }
- window := opencost.NewWindow(&start, &end)
- if window.IsOpen() || !window.HasDuration() || window.IsNegative() {
- w.Write(WrapDataWithMessage(nil, fmt.Errorf("invalid date range: %s", window), fmt.Sprintf("invalid date range: %s", window)))
- return
- }
- resolution := time.Hour
- if resDur, err := time.ParseDuration(windowStr); err == nil {
- resolution = resDur
- }
- // Use Thanos Client if it exists (enabled) and remote flag set
- var pClient prometheus.Client
- if remote != "false" && a.ThanosClient != nil {
- pClient = a.ThanosClient
- } else {
- pClient = a.PrometheusClient
- }
- data, err := a.Model.ComputeCostDataRange(pClient, a.CloudProvider, window, resolution, namespace, cluster, remoteEnabled)
- if err != nil {
- w.Write(WrapData(nil, err))
- }
- if fields != "" {
- filteredData := filterFields(fields, data)
- w.Write(WrapData(filteredData, err))
- } else {
- w.Write(WrapData(data, err))
- }
- }
- func parseAggregations(customAggregation, aggregator, filterType string) (string, []string, string) {
- var key string
- var filter string
- var val []string
- if customAggregation != "" {
- key = customAggregation
- filter = filterType
- val = strings.Split(customAggregation, ",")
- } else {
- aggregations := strings.Split(aggregator, ",")
- for i, agg := range aggregations {
- aggregations[i] = "kubernetes_" + agg
- }
- key = strings.Join(aggregations, ",")
- filter = "kubernetes_" + filterType
- val = aggregations
- }
- return key, val, filter
- }
- func (a *Accesses) GetAllNodePricing(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
- w.Header().Set("Content-Type", "application/json")
- w.Header().Set("Access-Control-Allow-Origin", "*")
- data, err := a.CloudProvider.AllNodePricing()
- w.Write(WrapData(data, err))
- }
- func (a *Accesses) GetConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
- w.Header().Set("Content-Type", "application/json")
- w.Header().Set("Access-Control-Allow-Origin", "*")
- data, err := a.CloudProvider.GetConfig()
- w.Write(WrapData(data, err))
- }
- func (a *Accesses) UpdateSpotInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
- w.Header().Set("Content-Type", "application/json")
- w.Header().Set("Access-Control-Allow-Origin", "*")
- data, err := a.CloudProvider.UpdateConfig(r.Body, aws.SpotInfoUpdateType)
- if err != nil {
- w.Write(WrapData(data, err))
- return
- }
- w.Write(WrapData(data, err))
- err = a.CloudProvider.DownloadPricingData()
- if err != nil {
- log.Errorf("Error redownloading data on config update: %s", err.Error())
- }
- return
- }
- func (a *Accesses) UpdateAthenaInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
- w.Header().Set("Content-Type", "application/json")
- w.Header().Set("Access-Control-Allow-Origin", "*")
- data, err := a.CloudProvider.UpdateConfig(r.Body, aws.AthenaInfoUpdateType)
- if err != nil {
- w.Write(WrapData(data, err))
- return
- }
- w.Write(WrapData(data, err))
- return
- }
- func (a *Accesses) UpdateBigQueryInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
- w.Header().Set("Content-Type", "application/json")
- w.Header().Set("Access-Control-Allow-Origin", "*")
- data, err := a.CloudProvider.UpdateConfig(r.Body, gcp.BigqueryUpdateType)
- if err != nil {
- w.Write(WrapData(data, err))
- return
- }
- w.Write(WrapData(data, err))
- return
- }
- func (a *Accesses) UpdateAzureStorageConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
- w.Header().Set("Content-Type", "application/json")
- w.Header().Set("Access-Control-Allow-Origin", "*")
- data, err := a.CloudProvider.UpdateConfig(r.Body, azure.AzureStorageUpdateType)
- if err != nil {
- w.Write(WrapData(data, err))
- return
- }
- w.Write(WrapData(data, err))
- return
- }
- func (a *Accesses) UpdateConfigByKey(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
- w.Header().Set("Content-Type", "application/json")
- w.Header().Set("Access-Control-Allow-Origin", "*")
- data, err := a.CloudProvider.UpdateConfig(r.Body, "")
- if err != nil {
- w.Write(WrapData(data, err))
- return
- }
- w.Write(WrapData(data, err))
- return
- }
- func (a *Accesses) ManagementPlatform(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
- w.Header().Set("Content-Type", "application/json")
- w.Header().Set("Access-Control-Allow-Origin", "*")
- data, err := a.CloudProvider.GetManagementPlatform()
- if err != nil {
- w.Write(WrapData(data, err))
- return
- }
- w.Write(WrapData(data, err))
- return
- }
- func (a *Accesses) ClusterInfo(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
- w.Header().Set("Content-Type", "application/json")
- w.Header().Set("Access-Control-Allow-Origin", "*")
- data := a.ClusterInfoProvider.GetClusterInfo()
- w.Write(WrapData(data, nil))
- }
- func (a *Accesses) GetClusterInfoMap(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
- w.Header().Set("Content-Type", "application/json")
- w.Header().Set("Access-Control-Allow-Origin", "*")
- data := a.ClusterMap.AsMap()
- w.Write(WrapData(data, nil))
- }
- func (a *Accesses) GetServiceAccountStatus(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
- w.Header().Set("Content-Type", "application/json")
- w.Header().Set("Access-Control-Allow-Origin", "*")
- w.Write(WrapData(a.CloudProvider.ServiceAccountStatus(), nil))
- }
- func (a *Accesses) GetPricingSourceStatus(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
- w.Header().Set("Content-Type", "application/json")
- w.Header().Set("Access-Control-Allow-Origin", "*")
- w.Write(WrapData(a.CloudProvider.PricingSourceStatus(), nil))
- }
- func (a *Accesses) GetPricingSourceCounts(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
- w.Header().Set("Content-Type", "application/json")
- w.Header().Set("Access-Control-Allow-Origin", "*")
- w.Write(WrapData(a.Model.GetPricingSourceCounts()))
- }
- func (a *Accesses) GetPricingSourceSummary(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
- w.Header().Set("Content-Type", "application/json")
- w.Header().Set("Access-Control-Allow-Origin", "*")
- data := a.CloudProvider.PricingSourceSummary()
- w.Write(WrapData(data, nil))
- }
- func (a *Accesses) GetPrometheusMetadata(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
- w.Header().Set("Content-Type", "application/json")
- w.Header().Set("Access-Control-Allow-Origin", "*")
- w.Write(WrapData(prom.Validate(a.PrometheusClient)))
- }
- func (a *Accesses) PrometheusQuery(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
- w.Header().Set("Content-Type", "application/json")
- w.Header().Set("Access-Control-Allow-Origin", "*")
- qp := httputil.NewQueryParams(r.URL.Query())
- query := qp.Get("query", "")
- if query == "" {
- w.Write(WrapData(nil, fmt.Errorf("Query Parameter 'query' is unset'")))
- return
- }
- // Attempt to parse time as either a unix timestamp or as an RFC3339 value
- var timeVal time.Time
- timeStr := qp.Get("time", "")
- if len(timeStr) > 0 {
- if t, err := strconv.ParseInt(timeStr, 10, 64); err == nil {
- timeVal = time.Unix(t, 0)
- } else if t, err := time.Parse(time.RFC3339, timeStr); err == nil {
- timeVal = t
- }
- // If time is given, but not parse-able, return an error
- if timeVal.IsZero() {
- http.Error(w, fmt.Sprintf("time must be a unix timestamp or RFC3339 value; illegal value given: %s", timeStr), http.StatusBadRequest)
- }
- }
- ctx := prom.NewNamedContext(a.PrometheusClient, prom.FrontendContextName)
- body, err := ctx.RawQuery(query, timeVal)
- if err != nil {
- w.Write(WrapData(nil, fmt.Errorf("Error running query %s. Error: %s", query, err)))
- return
- }
- w.Write(body)
- }
- func (a *Accesses) PrometheusQueryRange(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
- w.Header().Set("Content-Type", "application/json")
- w.Header().Set("Access-Control-Allow-Origin", "*")
- qp := httputil.NewQueryParams(r.URL.Query())
- query := qp.Get("query", "")
- if query == "" {
- fmt.Fprintf(w, "Error parsing query from request parameters.")
- return
- }
- start, end, duration, err := toStartEndStep(qp)
- if err != nil {
- fmt.Fprintf(w, err.Error())
- return
- }
- ctx := prom.NewNamedContext(a.PrometheusClient, prom.FrontendContextName)
- body, err := ctx.RawQueryRange(query, start, end, duration)
- if err != nil {
- fmt.Fprintf(w, "Error running query %s. Error: %s", query, err)
- return
- }
- w.Write(body)
- }
- func (a *Accesses) ThanosQuery(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
- w.Header().Set("Content-Type", "application/json")
- w.Header().Set("Access-Control-Allow-Origin", "*")
- if !thanos.IsEnabled() {
- w.Write(WrapData(nil, fmt.Errorf("ThanosDisabled")))
- return
- }
- qp := httputil.NewQueryParams(r.URL.Query())
- query := qp.Get("query", "")
- if query == "" {
- w.Write(WrapData(nil, fmt.Errorf("Query Parameter 'query' is unset'")))
- return
- }
- // Attempt to parse time as either a unix timestamp or as an RFC3339 value
- var timeVal time.Time
- timeStr := qp.Get("time", "")
- if len(timeStr) > 0 {
- if t, err := strconv.ParseInt(timeStr, 10, 64); err == nil {
- timeVal = time.Unix(t, 0)
- } else if t, err := time.Parse(time.RFC3339, timeStr); err == nil {
- timeVal = t
- }
- // If time is given, but not parse-able, return an error
- if timeVal.IsZero() {
- http.Error(w, fmt.Sprintf("time must be a unix timestamp or RFC3339 value; illegal value given: %s", timeStr), http.StatusBadRequest)
- }
- }
- ctx := prom.NewNamedContext(a.ThanosClient, prom.FrontendContextName)
- body, err := ctx.RawQuery(query, timeVal)
- if err != nil {
- w.Write(WrapData(nil, fmt.Errorf("Error running query %s. Error: %s", query, err)))
- return
- }
- w.Write(body)
- }
- func (a *Accesses) ThanosQueryRange(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
- w.Header().Set("Content-Type", "application/json")
- w.Header().Set("Access-Control-Allow-Origin", "*")
- if !thanos.IsEnabled() {
- w.Write(WrapData(nil, fmt.Errorf("ThanosDisabled")))
- return
- }
- qp := httputil.NewQueryParams(r.URL.Query())
- query := qp.Get("query", "")
- if query == "" {
- fmt.Fprintf(w, "Error parsing query from request parameters.")
- return
- }
- start, end, duration, err := toStartEndStep(qp)
- if err != nil {
- fmt.Fprintf(w, err.Error())
- return
- }
- ctx := prom.NewNamedContext(a.ThanosClient, prom.FrontendContextName)
- body, err := ctx.RawQueryRange(query, start, end, duration)
- if err != nil {
- fmt.Fprintf(w, "Error running query %s. Error: %s", query, err)
- return
- }
- w.Write(body)
- }
- // helper for query range proxy requests
- func toStartEndStep(qp httputil.QueryParams) (start, end time.Time, step time.Duration, err error) {
- var e error
- ss := qp.Get("start", "")
- es := qp.Get("end", "")
- ds := qp.Get("duration", "")
- layout := "2006-01-02T15:04:05.000Z"
- start, e = time.Parse(layout, ss)
- if e != nil {
- err = fmt.Errorf("Error parsing time %s. Error: %s", ss, err)
- return
- }
- end, e = time.Parse(layout, es)
- if e != nil {
- err = fmt.Errorf("Error parsing time %s. Error: %s", es, err)
- return
- }
- step, e = time.ParseDuration(ds)
- if e != nil {
- err = fmt.Errorf("Error parsing duration %s. Error: %s", ds, err)
- return
- }
- err = nil
- return
- }
- func (a *Accesses) GetPrometheusQueueState(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
- w.Header().Set("Content-Type", "application/json")
- w.Header().Set("Access-Control-Allow-Origin", "*")
- promQueueState, err := prom.GetPrometheusQueueState(a.PrometheusClient)
- if err != nil {
- w.Write(WrapData(nil, err))
- return
- }
- result := map[string]*prom.PrometheusQueueState{
- "prometheus": promQueueState,
- }
- if thanos.IsEnabled() {
- thanosQueueState, err := prom.GetPrometheusQueueState(a.ThanosClient)
- if err != nil {
- log.Warnf("Error getting Thanos queue state: %s", err)
- } else {
- result["thanos"] = thanosQueueState
- }
- }
- w.Write(WrapData(result, nil))
- }
- // GetPrometheusMetrics retrieves availability of Prometheus and Thanos metrics
- func (a *Accesses) GetPrometheusMetrics(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
- w.Header().Set("Content-Type", "application/json")
- w.Header().Set("Access-Control-Allow-Origin", "*")
- promMetrics := prom.GetPrometheusMetrics(a.PrometheusClient, "")
- result := map[string][]*prom.PrometheusDiagnostic{
- "prometheus": promMetrics,
- }
- if thanos.IsEnabled() {
- thanosMetrics := prom.GetPrometheusMetrics(a.ThanosClient, thanos.QueryOffset())
- result["thanos"] = thanosMetrics
- }
- w.Write(WrapData(result, nil))
- }
- func (a *Accesses) GetAllPersistentVolumes(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
- w.Header().Set("Content-Type", "application/json")
- w.Header().Set("Access-Control-Allow-Origin", "*")
- pvList := a.ClusterCache.GetAllPersistentVolumes()
- body, err := json.Marshal(wrapAsObjectItems(pvList))
- if err != nil {
- fmt.Fprintf(w, "Error decoding persistent volumes: "+err.Error())
- } else {
- w.Write(body)
- }
- }
- func (a *Accesses) GetAllDeployments(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
- w.Header().Set("Content-Type", "application/json")
- w.Header().Set("Access-Control-Allow-Origin", "*")
- qp := httputil.NewQueryParams(r.URL.Query())
- namespace := qp.Get("namespace", "")
- deploymentsList := a.ClusterCache.GetAllDeployments()
- // filter for provided namespace
- var deployments []*appsv1.Deployment
- if namespace == "" {
- deployments = deploymentsList
- } else {
- deployments = []*appsv1.Deployment{}
- for _, d := range deploymentsList {
- if d.Namespace == namespace {
- deployments = append(deployments, d)
- }
- }
- }
- body, err := json.Marshal(wrapAsObjectItems(deployments))
- if err != nil {
- fmt.Fprintf(w, "Error decoding deployment: "+err.Error())
- } else {
- w.Write(body)
- }
- }
- func (a *Accesses) GetAllStorageClasses(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
- w.Header().Set("Content-Type", "application/json")
- w.Header().Set("Access-Control-Allow-Origin", "*")
- scList := a.ClusterCache.GetAllStorageClasses()
- body, err := json.Marshal(wrapAsObjectItems(scList))
- if err != nil {
- fmt.Fprintf(w, "Error decoding storageclasses: "+err.Error())
- } else {
- w.Write(body)
- }
- }
- func (a *Accesses) GetAllStatefulSets(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
- w.Header().Set("Content-Type", "application/json")
- w.Header().Set("Access-Control-Allow-Origin", "*")
- qp := httputil.NewQueryParams(r.URL.Query())
- namespace := qp.Get("namespace", "")
- statefulSetsList := a.ClusterCache.GetAllStatefulSets()
- // filter for provided namespace
- var statefulSets []*appsv1.StatefulSet
- if namespace == "" {
- statefulSets = statefulSetsList
- } else {
- statefulSets = []*appsv1.StatefulSet{}
- for _, ss := range statefulSetsList {
- if ss.Namespace == namespace {
- statefulSets = append(statefulSets, ss)
- }
- }
- }
- body, err := json.Marshal(wrapAsObjectItems(statefulSets))
- if err != nil {
- fmt.Fprintf(w, "Error decoding deployment: "+err.Error())
- } else {
- w.Write(body)
- }
- }
- func (a *Accesses) GetAllNodes(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
- w.Header().Set("Content-Type", "application/json")
- w.Header().Set("Access-Control-Allow-Origin", "*")
- nodeList := a.ClusterCache.GetAllNodes()
- body, err := json.Marshal(wrapAsObjectItems(nodeList))
- if err != nil {
- fmt.Fprintf(w, "Error decoding nodes: "+err.Error())
- } else {
- w.Write(body)
- }
- }
- func (a *Accesses) GetAllPods(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
- w.Header().Set("Content-Type", "application/json")
- w.Header().Set("Access-Control-Allow-Origin", "*")
- podlist := a.ClusterCache.GetAllPods()
- body, err := json.Marshal(wrapAsObjectItems(podlist))
- if err != nil {
- fmt.Fprintf(w, "Error decoding pods: "+err.Error())
- } else {
- w.Write(body)
- }
- }
- func (a *Accesses) GetAllNamespaces(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
- w.Header().Set("Content-Type", "application/json")
- w.Header().Set("Access-Control-Allow-Origin", "*")
- namespaces := a.ClusterCache.GetAllNamespaces()
- body, err := json.Marshal(wrapAsObjectItems(namespaces))
- if err != nil {
- fmt.Fprintf(w, "Error decoding deployment: "+err.Error())
- } else {
- w.Write(body)
- }
- }
- func (a *Accesses) GetAllDaemonSets(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
- w.Header().Set("Content-Type", "application/json")
- w.Header().Set("Access-Control-Allow-Origin", "*")
- daemonSets := a.ClusterCache.GetAllDaemonSets()
- body, err := json.Marshal(wrapAsObjectItems(daemonSets))
- if err != nil {
- fmt.Fprintf(w, "Error decoding daemon set: "+err.Error())
- } else {
- w.Write(body)
- }
- }
- func (a *Accesses) GetPod(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
- w.Header().Set("Content-Type", "application/json")
- w.Header().Set("Access-Control-Allow-Origin", "*")
- podName := ps.ByName("name")
- podNamespace := ps.ByName("namespace")
- // TODO: ClusterCache API could probably afford to have some better filtering
- allPods := a.ClusterCache.GetAllPods()
- for _, pod := range allPods {
- for _, container := range pod.Spec.Containers {
- container.Env = make([]v1.EnvVar, 0)
- }
- if pod.Namespace == podNamespace && pod.Name == podName {
- body, err := json.Marshal(pod)
- if err != nil {
- fmt.Fprintf(w, "Error decoding pod: "+err.Error())
- } else {
- w.Write(body)
- }
- return
- }
- }
- fmt.Fprintf(w, "Pod not found\n")
- }
- func (a *Accesses) PrometheusRecordingRules(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
- w.Header().Set("Content-Type", "application/json")
- w.Header().Set("Access-Control-Allow-Origin", "*")
- u := a.PrometheusClient.URL(epRules, nil)
- req, err := http.NewRequest(http.MethodGet, u.String(), nil)
- if err != nil {
- fmt.Fprintf(w, "Error creating Prometheus rule request: "+err.Error())
- }
- _, body, err := a.PrometheusClient.Do(r.Context(), req)
- if err != nil {
- fmt.Fprintf(w, "Error making Prometheus rule request: "+err.Error())
- } else {
- w.Write(body)
- }
- }
- func (a *Accesses) PrometheusConfig(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
- w.Header().Set("Content-Type", "application/json")
- w.Header().Set("Access-Control-Allow-Origin", "*")
- pConfig := map[string]string{
- "address": env.GetPrometheusServerEndpoint(),
- }
- body, err := json.Marshal(pConfig)
- if err != nil {
- fmt.Fprintf(w, "Error marshalling prometheus config")
- } else {
- w.Write(body)
- }
- }
- func (a *Accesses) PrometheusTargets(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
- w.Header().Set("Content-Type", "application/json")
- w.Header().Set("Access-Control-Allow-Origin", "*")
- u := a.PrometheusClient.URL(epTargets, nil)
- req, err := http.NewRequest(http.MethodGet, u.String(), nil)
- if err != nil {
- fmt.Fprintf(w, "Error creating Prometheus rule request: "+err.Error())
- }
- _, body, err := a.PrometheusClient.Do(r.Context(), req)
- if err != nil {
- fmt.Fprintf(w, "Error making Prometheus rule request: "+err.Error())
- } else {
- w.Write(body)
- }
- }
- func (a *Accesses) GetOrphanedPods(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
- w.Header().Set("Content-Type", "application/json")
- w.Header().Set("Access-Control-Allow-Origin", "*")
- podlist := a.ClusterCache.GetAllPods()
- var lonePods []*v1.Pod
- for _, pod := range podlist {
- if len(pod.OwnerReferences) == 0 {
- lonePods = append(lonePods, pod)
- }
- }
- body, err := json.Marshal(lonePods)
- if err != nil {
- fmt.Fprintf(w, "Error decoding pod: "+err.Error())
- } else {
- w.Write(body)
- }
- }
- func (a *Accesses) GetInstallNamespace(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
- w.Header().Set("Content-Type", "application/json")
- w.Header().Set("Access-Control-Allow-Origin", "*")
- ns := env.GetKubecostNamespace()
- w.Write([]byte(ns))
- }
- type InstallInfo struct {
- Containers []ContainerInfo `json:"containers"`
- ClusterInfo map[string]string `json:"clusterInfo"`
- Version string `json:"version"`
- }
- type ContainerInfo struct {
- ContainerName string `json:"containerName"`
- Image string `json:"image"`
- ImageID string `json:"imageID"`
- StartTime string `json:"startTime"`
- Restarts int32 `json:"restarts"`
- }
- func (a *Accesses) GetInstallInfo(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
- w.Header().Set("Content-Type", "application/json")
- w.Header().Set("Access-Control-Allow-Origin", "*")
- pods, err := a.KubeClientSet.CoreV1().Pods(env.GetKubecostNamespace()).List(context.Background(), metav1.ListOptions{
- LabelSelector: "app=cost-analyzer",
- FieldSelector: "status.phase=Running",
- Limit: 1,
- })
- if err != nil {
- writeErrorResponse(w, 500, fmt.Sprintf("Unable to list pods: %s", err.Error()))
- return
- }
- info := InstallInfo{
- ClusterInfo: make(map[string]string),
- Version: version.FriendlyVersion(),
- }
- // If we have zero pods either something is weird with the install since the app selector is not exposed in the helm
- // chart or more likely we are running locally - in either case Images field will return as null
- if len(pods.Items) > 0 {
- for _, pod := range pods.Items {
- for _, container := range pod.Status.ContainerStatuses {
- c := ContainerInfo{
- ContainerName: container.Name,
- Image: container.Image,
- ImageID: container.ImageID,
- StartTime: pod.Status.StartTime.String(),
- Restarts: container.RestartCount,
- }
- info.Containers = append(info.Containers, c)
- }
- }
- }
- nodes := a.ClusterCache.GetAllNodes()
- cachePods := a.ClusterCache.GetAllPods()
- info.ClusterInfo["nodeCount"] = strconv.Itoa(len(nodes))
- info.ClusterInfo["podCount"] = strconv.Itoa(len(cachePods))
- body, err := json.Marshal(info)
- if err != nil {
- writeErrorResponse(w, 500, fmt.Sprintf("Error decoding pod: %s", err.Error()))
- return
- }
- w.Write(body)
- }
- // logsFor pulls the logs for a specific pod, namespace, and container
- func logsFor(c kubernetes.Interface, namespace string, pod string, container string, dur time.Duration, ctx context.Context) (string, error) {
- since := time.Now().UTC().Add(-dur)
- logOpts := v1.PodLogOptions{
- SinceTime: &metav1.Time{Time: since},
- }
- if container != "" {
- logOpts.Container = container
- }
- req := c.CoreV1().Pods(namespace).GetLogs(pod, &logOpts)
- reader, err := req.Stream(ctx)
- if err != nil {
- return "", err
- }
- podLogs, err := io.ReadAll(reader)
- if err != nil {
- return "", err
- }
- // If color is already disabled then we don't need to process the logs
- // to drop ANSI colors
- if !viper.GetBool("disable-log-color") {
- podLogs = ANSIRegex.ReplaceAll(podLogs, []byte{})
- }
- return string(podLogs), nil
- }
- func (a *Accesses) GetPodLogs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
- w.Header().Set("Content-Type", "application/json")
- w.Header().Set("Access-Control-Allow-Origin", "*")
- qp := httputil.NewQueryParams(r.URL.Query())
- ns := qp.Get("namespace", env.GetKubecostNamespace())
- pod := qp.Get("pod", "")
- selector := qp.Get("selector", "")
- container := qp.Get("container", "")
- since := qp.Get("since", "24h")
- sinceDuration, err := time.ParseDuration(since)
- if err != nil {
- fmt.Fprintf(w, "Invalid Duration String: "+err.Error())
- return
- }
- var logResult string
- appendLog := func(ns string, pod string, container string, l string) {
- if l == "" {
- return
- }
- logResult += fmt.Sprintf("%s\n| %s:%s:%s\n%s\n%s\n\n", LogSeparator, ns, pod, container, LogSeparator, l)
- }
- if pod != "" {
- pd, err := a.KubeClientSet.CoreV1().Pods(ns).Get(r.Context(), pod, metav1.GetOptions{})
- if err != nil {
- fmt.Fprintf(w, "Error Finding Pod: "+err.Error())
- return
- }
- if container != "" {
- var foundContainer bool
- for _, cont := range pd.Spec.Containers {
- if strings.EqualFold(cont.Name, container) {
- foundContainer = true
- break
- }
- }
- if !foundContainer {
- fmt.Fprintf(w, "Could not find container: "+container)
- return
- }
- }
- logs, err := logsFor(a.KubeClientSet, ns, pod, container, sinceDuration, r.Context())
- if err != nil {
- fmt.Fprintf(w, "Error Getting Logs: "+err.Error())
- return
- }
- appendLog(ns, pod, container, logs)
- w.Write([]byte(logResult))
- return
- }
- if selector != "" {
- pods, err := a.KubeClientSet.CoreV1().Pods(ns).List(r.Context(), metav1.ListOptions{LabelSelector: selector})
- if err != nil {
- fmt.Fprintf(w, "Error Finding Pod: "+err.Error())
- return
- }
- for _, pd := range pods.Items {
- for _, cont := range pd.Spec.Containers {
- logs, err := logsFor(a.KubeClientSet, ns, pd.Name, cont.Name, sinceDuration, r.Context())
- if err != nil {
- continue
- }
- appendLog(ns, pd.Name, cont.Name, logs)
- }
- }
- }
- w.Write([]byte(logResult))
- }
- func (a *Accesses) AddServiceKey(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
- w.Header().Set("Content-Type", "application/json")
- w.Header().Set("Access-Control-Allow-Origin", "*")
- r.ParseForm()
- key := r.PostForm.Get("key")
- k := []byte(key)
- err := os.WriteFile(path.Join(env.GetConfigPathWithDefault(env.DefaultConfigMountPath), "key.json"), k, 0644)
- if err != nil {
- fmt.Fprintf(w, "Error writing service key: "+err.Error())
- }
- w.WriteHeader(http.StatusOK)
- }
- func (a *Accesses) GetHelmValues(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
- w.Header().Set("Content-Type", "application/json")
- w.Header().Set("Access-Control-Allow-Origin", "*")
- encodedValues := sysenv.Get("HELM_VALUES", "")
- if encodedValues == "" {
- fmt.Fprintf(w, "Values reporting disabled")
- return
- }
- result, err := base64.StdEncoding.DecodeString(encodedValues)
- if err != nil {
- fmt.Fprintf(w, "Failed to decode encoded values: %s", err)
- return
- }
- w.Write(result)
- }
- func (a *Accesses) Status(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
- w.Header().Set("Content-Type", "application/json")
- w.Header().Set("Access-Control-Allow-Origin", "*")
- promServer := env.GetPrometheusServerEndpoint()
- api := prometheusAPI.NewAPI(a.PrometheusClient)
- result, err := api.Buildinfo(r.Context())
- if err != nil {
- fmt.Fprintf(w, "Using Prometheus at "+promServer+". Error: "+err.Error())
- } else {
- fmt.Fprintf(w, "Using Prometheus at "+promServer+". Version: "+result.Version)
- }
- }
- type LogLevelRequestResponse struct {
- Level string `json:"level"`
- }
- func (a *Accesses) GetLogLevel(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
- w.Header().Set("Content-Type", "application/json")
- w.Header().Set("Access-Control-Allow-Origin", "*")
- level := log.GetLogLevel()
- llrr := LogLevelRequestResponse{
- Level: level,
- }
- body, err := json.Marshal(llrr)
- if err != nil {
- http.Error(w, fmt.Sprintf("unable to retrive log level"), http.StatusInternalServerError)
- return
- }
- _, err = w.Write(body)
- if err != nil {
- http.Error(w, fmt.Sprintf("unable to write response: %s", body), http.StatusInternalServerError)
- return
- }
- }
- func (a *Accesses) SetLogLevel(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
- params := LogLevelRequestResponse{}
- err := json.NewDecoder(r.Body).Decode(¶ms)
- if err != nil {
- http.Error(w, fmt.Sprintf("unable to decode request body, error: %s", err), http.StatusBadRequest)
- return
- }
- err = log.SetLogLevel(params.Level)
- if err != nil {
- http.Error(w, fmt.Sprintf("level must be a valid log level according to zerolog; level given: %s, error: %s", params.Level, err), http.StatusBadRequest)
- return
- }
- w.WriteHeader(http.StatusOK)
- }
- // captures the panic event in sentry
- func capturePanicEvent(err string, stack string) {
- msg := fmt.Sprintf("Panic: %s\nStackTrace: %s\n", err, stack)
- log.Infof(msg)
- sentry.CurrentHub().CaptureEvent(&sentry.Event{
- Level: sentry.LevelError,
- Message: msg,
- })
- sentry.Flush(5 * time.Second)
- }
- // handle any panics reported by the errors package
- func handlePanic(p errors.Panic) bool {
- err := p.Error
- if err != nil {
- if err, ok := err.(error); ok {
- capturePanicEvent(err.Error(), p.Stack)
- }
- if err, ok := err.(string); ok {
- capturePanicEvent(err, p.Stack)
- }
- }
- // Return true to recover iff the type is http, otherwise allow kubernetes
- // to recover.
- return p.Type == errors.PanicTypeHTTP
- }
- func Initialize(additionalConfigWatchers ...*watcher.ConfigMapWatcher) *Accesses {
- configWatchers := watcher.NewConfigMapWatchers(additionalConfigWatchers...)
- var err error
- if errorReportingEnabled {
- err = sentry.Init(sentry.ClientOptions{Release: version.FriendlyVersion()})
- if err != nil {
- log.Infof("Failed to initialize sentry for error reporting")
- } else {
- err = errors.SetPanicHandler(handlePanic)
- if err != nil {
- log.Infof("Failed to set panic handler: %s", err)
- }
- }
- }
- address := env.GetPrometheusServerEndpoint()
- if address == "" {
- log.Fatalf("No address for prometheus set in $%s. Aborting.", env.PrometheusServerEndpointEnvVar)
- }
- queryConcurrency := env.GetMaxQueryConcurrency()
- log.Infof("Prometheus/Thanos Client Max Concurrency set to %d", queryConcurrency)
- timeout := 120 * time.Second
- keepAlive := 120 * time.Second
- tlsHandshakeTimeout := 10 * time.Second
- scrapeInterval := env.GetKubecostScrapeInterval()
- var rateLimitRetryOpts *prom.RateLimitRetryOpts = nil
- if env.IsPrometheusRetryOnRateLimitResponse() {
- rateLimitRetryOpts = &prom.RateLimitRetryOpts{
- MaxRetries: env.GetPrometheusRetryOnRateLimitMaxRetries(),
- DefaultRetryWait: env.GetPrometheusRetryOnRateLimitDefaultWait(),
- }
- }
- promCli, err := prom.NewPrometheusClient(address, &prom.PrometheusClientConfig{
- Timeout: timeout,
- KeepAlive: keepAlive,
- TLSHandshakeTimeout: tlsHandshakeTimeout,
- TLSInsecureSkipVerify: env.GetInsecureSkipVerify(),
- RateLimitRetryOpts: rateLimitRetryOpts,
- Auth: &prom.ClientAuth{
- Username: env.GetDBBasicAuthUsername(),
- Password: env.GetDBBasicAuthUserPassword(),
- BearerToken: env.GetDBBearerToken(),
- },
- QueryConcurrency: queryConcurrency,
- QueryLogFile: "",
- HeaderXScopeOrgId: env.GetPrometheusHeaderXScopeOrgId(),
- })
- if err != nil {
- log.Fatalf("Failed to create prometheus client, Error: %v", err)
- }
- m, err := prom.Validate(promCli)
- if err != nil || !m.Running {
- if err != nil {
- log.Errorf("Failed to query prometheus at %s. Error: %s . Troubleshooting help available at: %s", address, err.Error(), prom.PrometheusTroubleshootingURL)
- } else if !m.Running {
- log.Errorf("Prometheus at %s is not running. Troubleshooting help available at: %s", address, prom.PrometheusTroubleshootingURL)
- }
- } else {
- log.Infof("Success: retrieved the 'up' query against prometheus at: " + address)
- }
- api := prometheusAPI.NewAPI(promCli)
- _, err = api.Buildinfo(context.Background())
- if err != nil {
- log.Infof("No valid prometheus config file at %s. Error: %s . Troubleshooting help available at: %s. Ignore if using cortex/mimir/thanos here.", address, err.Error(), prom.PrometheusTroubleshootingURL)
- } else {
- log.Infof("Retrieved a prometheus config file from: %s", address)
- }
- if scrapeInterval == 0 {
- scrapeInterval = time.Minute
- // Lookup scrape interval for kubecost job, update if found
- si, err := prom.ScrapeIntervalFor(promCli, env.GetKubecostJobName())
- if err == nil {
- scrapeInterval = si
- }
- }
- log.Infof("Using scrape interval of %f", scrapeInterval.Seconds())
- // Kubernetes API setup
- kubeClientset, err := kubeconfig.LoadKubeClient("")
- if err != nil {
- log.Fatalf("Failed to build Kubernetes client: %s", err.Error())
- }
- // Create ConfigFileManager for synchronization of shared configuration
- confManager := config.NewConfigFileManager(&config.ConfigFileManagerOpts{
- BucketStoreConfig: env.GetKubecostConfigBucket(),
- LocalConfigPath: "/",
- })
- configPrefix := env.GetConfigPathWithDefault("/var/configs/")
- // Create Kubernetes Cluster Cache + Watchers
- var k8sCache clustercache.ClusterCache
- if env.IsClusterCacheFileEnabled() {
- importLocation := confManager.ConfigFileAt(path.Join(configPrefix, "cluster-cache.json"))
- k8sCache = clustercache.NewClusterImporter(importLocation)
- } else {
- k8sCache = clustercache.NewKubernetesClusterCache(kubeClientset)
- }
- k8sCache.Run()
- cloudProviderKey := env.GetCloudProviderAPIKey()
- cloudProvider, err := provider.NewProvider(k8sCache, cloudProviderKey, confManager)
- if err != nil {
- panic(err.Error())
- }
- // Append the pricing config watcher
- configWatchers.AddWatcher(provider.ConfigWatcherFor(cloudProvider))
- configWatchers.AddWatcher(metrics.GetMetricsConfigWatcher())
- watchConfigFunc := configWatchers.ToWatchFunc()
- watchedConfigs := configWatchers.GetWatchedConfigs()
- kubecostNamespace := env.GetKubecostNamespace()
- // We need an initial invocation because the init of the cache has happened before we had access to the provider.
- for _, cw := range watchedConfigs {
- configs, err := kubeClientset.CoreV1().ConfigMaps(kubecostNamespace).Get(context.Background(), cw, metav1.GetOptions{})
- if err != nil {
- log.Infof("No %s configmap found at install time, using existing configs: %s", cw, err.Error())
- } else {
- log.Infof("Found configmap %s, watching...", configs.Name)
- watchConfigFunc(configs)
- }
- }
- k8sCache.SetConfigMapUpdateFunc(watchConfigFunc)
- remoteEnabled := env.IsRemoteEnabled()
- if remoteEnabled {
- info, err := cloudProvider.ClusterInfo()
- log.Infof("Saving cluster with id:'%s', and name:'%s' to durable storage", info["id"], info["name"])
- if err != nil {
- log.Infof("Error saving cluster id %s", err.Error())
- }
- _, _, err = utils.GetOrCreateClusterMeta(info["id"], info["name"])
- if err != nil {
- log.Infof("Unable to set cluster id '%s' for cluster '%s', %s", info["id"], info["name"], err.Error())
- }
- }
- // Thanos Client
- var thanosClient prometheus.Client
- if thanos.IsEnabled() {
- thanosAddress := thanos.QueryURL()
- if thanosAddress != "" {
- thanosCli, _ := thanos.NewThanosClient(thanosAddress, &prom.PrometheusClientConfig{
- Timeout: timeout,
- KeepAlive: keepAlive,
- TLSHandshakeTimeout: tlsHandshakeTimeout,
- TLSInsecureSkipVerify: env.GetInsecureSkipVerify(),
- RateLimitRetryOpts: rateLimitRetryOpts,
- Auth: &prom.ClientAuth{
- Username: env.GetMultiClusterBasicAuthUsername(),
- Password: env.GetMultiClusterBasicAuthPassword(),
- BearerToken: env.GetMultiClusterBearerToken(),
- },
- QueryConcurrency: queryConcurrency,
- QueryLogFile: env.GetQueryLoggingFile(),
- })
- _, err = prom.Validate(thanosCli)
- if err != nil {
- log.Warnf("Failed to query Thanos at %s. Error: %s.", thanosAddress, err.Error())
- thanosClient = thanosCli
- } else {
- log.Infof("Success: retrieved the 'up' query against Thanos at: " + thanosAddress)
- thanosClient = thanosCli
- }
- } else {
- log.Infof("Error resolving environment variable: $%s", env.ThanosQueryUrlEnvVar)
- }
- }
- // ClusterInfo Provider to provide the cluster map with local and remote cluster data
- var clusterInfoProvider clusters.ClusterInfoProvider
- if env.IsClusterInfoFileEnabled() {
- clusterInfoFile := confManager.ConfigFileAt(path.Join(configPrefix, "cluster-info.json"))
- clusterInfoProvider = NewConfiguredClusterInfoProvider(clusterInfoFile)
- } else {
- clusterInfoProvider = NewLocalClusterInfoProvider(kubeClientset, cloudProvider)
- }
- // Initialize ClusterMap for maintaining ClusterInfo by ClusterID
- var clusterMap clusters.ClusterMap
- if thanosClient != nil {
- clusterMap = clustermap.NewClusterMap(thanosClient, clusterInfoProvider, 10*time.Minute)
- } else {
- clusterMap = clustermap.NewClusterMap(promCli, clusterInfoProvider, 5*time.Minute)
- }
- // cache responses from model and aggregation for a default of 10 minutes;
- // clear expired responses every 20 minutes
- aggregateCache := cache.New(time.Minute*10, time.Minute*20)
- costDataCache := cache.New(time.Minute*10, time.Minute*20)
- clusterCostsCache := cache.New(cache.NoExpiration, cache.NoExpiration)
- outOfClusterCache := cache.New(time.Minute*5, time.Minute*10)
- settingsCache := cache.New(cache.NoExpiration, cache.NoExpiration)
- // query durations that should be cached longer should be registered here
- // use relatively prime numbers to minimize likelihood of synchronized
- // attempts at cache warming
- day := 24 * time.Hour
- cacheExpiration := map[time.Duration]time.Duration{
- day: maxCacheMinutes1d * time.Minute,
- 2 * day: maxCacheMinutes2d * time.Minute,
- 7 * day: maxCacheMinutes7d * time.Minute,
- 30 * day: maxCacheMinutes30d * time.Minute,
- }
- var pc prometheus.Client
- if thanosClient != nil {
- pc = thanosClient
- } else {
- pc = promCli
- }
- costModel := NewCostModel(pc, cloudProvider, k8sCache, clusterMap, scrapeInterval)
- metricsEmitter := NewCostModelMetricsEmitter(promCli, k8sCache, cloudProvider, clusterInfoProvider, costModel)
- a := &Accesses{
- Router: httprouter.New(),
- PrometheusClient: promCli,
- ThanosClient: thanosClient,
- KubeClientSet: kubeClientset,
- ClusterCache: k8sCache,
- ClusterMap: clusterMap,
- CloudProvider: cloudProvider,
- CloudConfigController: cloudconfig.NewController(cloudProvider),
- ConfigFileManager: confManager,
- ClusterInfoProvider: clusterInfoProvider,
- Model: costModel,
- MetricsEmitter: metricsEmitter,
- AggregateCache: aggregateCache,
- CostDataCache: costDataCache,
- ClusterCostsCache: clusterCostsCache,
- OutOfClusterCache: outOfClusterCache,
- SettingsCache: settingsCache,
- CacheExpiration: cacheExpiration,
- httpServices: services.NewCostModelServices(),
- }
- // Use the Accesses instance, itself, as the CostModelAggregator. This is
- // confusing and unconventional, but necessary so that we can swap it
- // out for the ETL-adapted version elsewhere.
- // TODO clean this up once ETL is open-sourced.
- a.AggAPI = a
- // Initialize mechanism for subscribing to settings changes
- a.InitializeSettingsPubSub()
- err = a.CloudProvider.DownloadPricingData()
- if err != nil {
- log.Infof("Failed to download pricing data: " + err.Error())
- }
- // Warm the aggregate cache unless explicitly set to false
- if env.IsCacheWarmingEnabled() {
- log.Infof("Init: AggregateCostModel cache warming enabled")
- a.warmAggregateCostModelCache()
- } else {
- log.Infof("Init: AggregateCostModel cache warming disabled")
- }
- if !env.IsKubecostMetricsPodEnabled() {
- a.MetricsEmitter.Start()
- }
- log.Infof("Custom Costs enabled: %t", env.IsCustomCostEnabled())
- if env.IsCustomCostEnabled() {
- hourlyRepo := customcost.NewMemoryRepository()
- dailyRepo := customcost.NewMemoryRepository()
- ingConfig := customcost.DefaultIngestorConfiguration()
- var err error
- a.CustomCostPipelineService, err = customcost.NewPipelineService(hourlyRepo, dailyRepo, ingConfig)
- if err != nil {
- log.Errorf("error instantiating custom cost pipeline service: %v", err)
- return nil
- }
- customCostQuerier := customcost.NewRepositoryQuerier(hourlyRepo, dailyRepo, ingConfig.HourlyDuration, ingConfig.DailyDuration)
- a.CustomCostQueryService = customcost.NewQueryService(customCostQuerier)
- }
- a.Router.GET("/costDataModel", a.CostDataModel)
- a.Router.GET("/costDataModelRange", a.CostDataModelRange)
- a.Router.GET("/aggregatedCostModel", a.AggregateCostModelHandler)
- a.Router.GET("/allocation/compute", a.ComputeAllocationHandler)
- a.Router.GET("/allocation/compute/summary", a.ComputeAllocationHandlerSummary)
- a.Router.GET("/allNodePricing", a.GetAllNodePricing)
- a.Router.POST("/refreshPricing", a.RefreshPricingData)
- a.Router.GET("/clusterCostsOverTime", a.ClusterCostsOverTime)
- a.Router.GET("/clusterCosts", a.ClusterCosts)
- a.Router.GET("/clusterCostsFromCache", a.ClusterCostsFromCacheHandler)
- a.Router.GET("/validatePrometheus", a.GetPrometheusMetadata)
- a.Router.GET("/managementPlatform", a.ManagementPlatform)
- a.Router.GET("/clusterInfo", a.ClusterInfo)
- a.Router.GET("/clusterInfoMap", a.GetClusterInfoMap)
- a.Router.GET("/serviceAccountStatus", a.GetServiceAccountStatus)
- a.Router.GET("/pricingSourceStatus", a.GetPricingSourceStatus)
- a.Router.GET("/pricingSourceSummary", a.GetPricingSourceSummary)
- a.Router.GET("/pricingSourceCounts", a.GetPricingSourceCounts)
- // endpoints migrated from server
- a.Router.GET("/allPersistentVolumes", a.GetAllPersistentVolumes)
- a.Router.GET("/allDeployments", a.GetAllDeployments)
- a.Router.GET("/allStorageClasses", a.GetAllStorageClasses)
- a.Router.GET("/allStatefulSets", a.GetAllStatefulSets)
- a.Router.GET("/allNodes", a.GetAllNodes)
- a.Router.GET("/allPods", a.GetAllPods)
- a.Router.GET("/allNamespaces", a.GetAllNamespaces)
- a.Router.GET("/allDaemonSets", a.GetAllDaemonSets)
- a.Router.GET("/pod/:namespace/:name", a.GetPod)
- a.Router.GET("/prometheusRecordingRules", a.PrometheusRecordingRules)
- a.Router.GET("/prometheusConfig", a.PrometheusConfig)
- a.Router.GET("/prometheusTargets", a.PrometheusTargets)
- a.Router.GET("/orphanedPods", a.GetOrphanedPods)
- a.Router.GET("/installNamespace", a.GetInstallNamespace)
- a.Router.GET("/installInfo", a.GetInstallInfo)
- a.Router.GET("/podLogs", a.GetPodLogs)
- a.Router.POST("/serviceKey", a.AddServiceKey)
- a.Router.GET("/helmValues", a.GetHelmValues)
- a.Router.GET("/status", a.Status)
- // prom query proxies
- a.Router.GET("/prometheusQuery", a.PrometheusQuery)
- a.Router.GET("/prometheusQueryRange", a.PrometheusQueryRange)
- a.Router.GET("/thanosQuery", a.ThanosQuery)
- a.Router.GET("/thanosQueryRange", a.ThanosQueryRange)
- // diagnostics
- a.Router.GET("/diagnostics/requestQueue", a.GetPrometheusQueueState)
- a.Router.GET("/diagnostics/prometheusMetrics", a.GetPrometheusMetrics)
- a.Router.GET("/logs/level", a.GetLogLevel)
- a.Router.POST("/logs/level", a.SetLogLevel)
- a.Router.GET("/cloud/config/export", a.CloudConfigController.GetExportConfigHandler())
- a.Router.GET("/cloud/config/enable", a.CloudConfigController.GetEnableConfigHandler())
- a.Router.GET("/cloud/config/disable", a.CloudConfigController.GetDisableConfigHandler())
- a.Router.GET("/cloud/config/delete", a.CloudConfigController.GetDeleteConfigHandler())
- if env.IsCustomCostEnabled() {
- a.Router.GET("/customCost/total", a.CustomCostQueryService.GetCustomCostTotalHandler())
- a.Router.GET("/customCost/timeseries", a.CustomCostQueryService.GetCustomCostTimeseriesHandler())
- }
- // this endpoint is intentionally left out of the "if env.IsCustomCostEnabled()" conditional; in the handler, it is
- // valid for CustomCostPipelineService to be nil
- a.Router.GET("/customCost/status", a.CustomCostPipelineService.GetCustomCostStatusHandler())
- a.httpServices.RegisterAll(a.Router)
- return a
- }
- func InitializeWithoutKubernetes() *Accesses {
- var err error
- if errorReportingEnabled {
- err = sentry.Init(sentry.ClientOptions{Release: version.FriendlyVersion()})
- if err != nil {
- log.Infof("Failed to initialize sentry for error reporting")
- } else {
- err = errors.SetPanicHandler(handlePanic)
- if err != nil {
- log.Infof("Failed to set panic handler: %s", err)
- }
- }
- }
- a := &Accesses{
- Router: httprouter.New(),
- CloudConfigController: cloudconfig.NewController(nil),
- httpServices: services.NewCostModelServices(),
- }
- a.Router.GET("/logs/level", a.GetLogLevel)
- a.Router.POST("/logs/level", a.SetLogLevel)
- a.httpServices.RegisterAll(a.Router)
- return a
- }
- func writeErrorResponse(w http.ResponseWriter, code int, message string) {
- out := map[string]string{
- "message": message,
- }
- bytes, err := json.Marshal(out)
- if err != nil {
- w.Header().Set("Content-Type", "text/plain")
- w.WriteHeader(500)
- fmt.Fprint(w, "unable to marshall json for error")
- log.Warnf("Failed to marshall JSON for error response: %s", err.Error())
- return
- }
- w.WriteHeader(code)
- fmt.Fprint(w, string(bytes))
- }
|