Kaynağa Gözat

Updates to the agent/exporter.

Matt Bolt 4 yıl önce
ebeveyn
işleme
6987baef24
5 değiştirilmiş dosya ile 48 ekleme ve 13 silme
  1. 1 1
      Makefile
  2. 16 9
      pkg/costmodel/router.go
  3. 2 1
      pkg/kubecost/window.go
  4. 4 2
      pkg/prom/query.go
  5. 25 0
      pkg/storage/s3storage.go

+ 1 - 1
Makefile

@@ -1,4 +1,4 @@
-VERSION?=v0.0.2-SNAPSHOT
+VERSION?=v0.0.3-SNAPSHOT
 REGISTRY?=gcr.io
 PROJECT_ID?=kubecost1
 APPNAME?=kube-metrics

+ 16 - 9
pkg/costmodel/router.go

@@ -354,6 +354,13 @@ func WrapDataWithMessageAndWarning(data interface{}, err error, message, warning
 	return resp
 }
 
+// wrapAsObjectItems wraps a slice of items into an object containing a single items list
+func wrapAsObjectItems(items interface{}) map[string]interface{} {
+	return map[string]interface{}{
+		"items": items,
+	}
+}
+
 // RefreshPricingData needs to be called when a new node joins the fleet, since we cache the relevant subsets of pricing data to avoid storing the whole thing.
 func (a *Accesses) RefreshPricingData(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
 	w.Header().Set("Content-Type", "application/json")
@@ -929,7 +936,7 @@ func (a *Accesses) GetAllPersistentVolumes(w http.ResponseWriter, r *http.Reques
 
 	pvList := a.ClusterCache.GetAllPersistentVolumes()
 
-	body, err := json.Marshal(pvList)
+	body, err := json.Marshal(wrapAsObjectItems(pvList))
 	if err != nil {
 		fmt.Fprintf(w, "Error decoding persistent volumes: "+err.Error())
 	} else {
@@ -962,7 +969,7 @@ func (a *Accesses) GetAllDeployments(w http.ResponseWriter, r *http.Request, ps
 		}
 	}
 
-	body, err := json.Marshal(deployments)
+	body, err := json.Marshal(wrapAsObjectItems(deployments))
 	if err != nil {
 		fmt.Fprintf(w, "Error decoding deployment: "+err.Error())
 	} else {
@@ -976,7 +983,7 @@ func (a *Accesses) GetAllStorageClasses(w http.ResponseWriter, r *http.Request,
 
 	scList := a.ClusterCache.GetAllStorageClasses()
 
-	body, err := json.Marshal(scList)
+	body, err := json.Marshal(wrapAsObjectItems(scList))
 	if err != nil {
 		fmt.Fprintf(w, "Error decoding storageclasses: "+err.Error())
 	} else {
@@ -1008,7 +1015,7 @@ func (a *Accesses) GetAllStatefulSets(w http.ResponseWriter, r *http.Request, ps
 		}
 	}
 
-	body, err := json.Marshal(statefulSets)
+	body, err := json.Marshal(wrapAsObjectItems(statefulSets))
 	if err != nil {
 		fmt.Fprintf(w, "Error decoding deployment: "+err.Error())
 	} else {
@@ -1022,7 +1029,7 @@ func (a *Accesses) GetAllNodes(w http.ResponseWriter, r *http.Request, ps httpro
 
 	nodeList := a.ClusterCache.GetAllNodes()
 
-	body, err := json.Marshal(nodeList)
+	body, err := json.Marshal(wrapAsObjectItems(nodeList))
 	if err != nil {
 		fmt.Fprintf(w, "Error decoding nodes: "+err.Error())
 	} else {
@@ -1036,7 +1043,7 @@ func (a *Accesses) GetAllPods(w http.ResponseWriter, r *http.Request, ps httprou
 
 	podlist := a.ClusterCache.GetAllPods()
 
-	body, err := json.Marshal(podlist)
+	body, err := json.Marshal(wrapAsObjectItems(podlist))
 	if err != nil {
 		fmt.Fprintf(w, "Error decoding pods: "+err.Error())
 	} else {
@@ -1050,7 +1057,7 @@ func (a *Accesses) GetAllNamespaces(w http.ResponseWriter, r *http.Request, ps h
 
 	namespaces := a.ClusterCache.GetAllNamespaces()
 
-	body, err := json.Marshal(namespaces)
+	body, err := json.Marshal(wrapAsObjectItems(namespaces))
 	if err != nil {
 		fmt.Fprintf(w, "Error decoding deployment: "+err.Error())
 	} else {
@@ -1064,7 +1071,7 @@ func (a *Accesses) GetAllDaemonSets(w http.ResponseWriter, r *http.Request, ps h
 
 	daemonSets := a.ClusterCache.GetAllDaemonSets()
 
-	body, err := json.Marshal(daemonSets)
+	body, err := json.Marshal(wrapAsObjectItems(daemonSets))
 	if err != nil {
 		fmt.Fprintf(w, "Error decoding daemon set: "+err.Error())
 	} else {
@@ -1399,7 +1406,7 @@ func Initialize(additionalConfigWatchers ...*watcher.ConfigMapWatcher) *Accesses
 	keepAlive := 120 * time.Second
 	scrapeInterval := time.Minute
 
-	promCli, err := prom.NewPrometheusClient(address, timeout, keepAlive, queryConcurrency, "")
+	promCli, err := prom.NewPrometheusClient(address, timeout, keepAlive, queryConcurrency, "/var/configs/queries.log")
 	if err != nil {
 		klog.Fatalf("Failed to create prometheus client, Error: %v", err)
 	}

+ 2 - 1
pkg/kubecost/window.go

@@ -3,12 +3,13 @@ package kubecost
 import (
 	"bytes"
 	"fmt"
-	"github.com/kubecost/cost-model/pkg/util/timeutil"
 	"math"
 	"regexp"
 	"strconv"
 	"time"
 
+	"github.com/kubecost/cost-model/pkg/util/timeutil"
+
 	"github.com/kubecost/cost-model/pkg/env"
 	"github.com/kubecost/cost-model/pkg/thanos"
 )

+ 4 - 2
pkg/prom/query.go

@@ -180,8 +180,10 @@ func (ctx *Context) RawQuery(query string) ([]byte, error) {
 	// for non-range queries, we set the timestamp for the query to time-offset
 	// this is a special use case that's typically only used when our primary
 	// prom db has delayed insertion (thanos, cortex, etc...)
-	if promQueryOffset != 0 {
-		q.Set("time", time.Now().UTC().Add(-promQueryOffset).Format(time.RFC3339))
+	if promQueryOffset != 0 && ctx.name != AllocationContextName {
+		q.Set("time", time.Now().Add(-promQueryOffset).UTC().Format(time.RFC3339))
+	} else {
+		q.Set("time", time.Now().UTC().Format(time.RFC3339))
 	}
 
 	u.RawQuery = q.Encode()

+ 25 - 0
pkg/storage/s3storage.go

@@ -305,11 +305,15 @@ func validate(conf S3Config) error {
 
 // FullPath returns the storage working path combined with the path provided
 func (s3 *S3Storage) FullPath(name string) string {
+	name = s3.trimLeading(name)
+
 	return name
 }
 
 // Get returns a reader for the given object name.
 func (s3 *S3Storage) Read(name string) ([]byte, error) {
+	name = s3.trimLeading(name)
+
 	log.Infof("S3Storage::Read(%s)", name)
 	ctx := context.Background()
 
@@ -319,6 +323,7 @@ func (s3 *S3Storage) Read(name string) ([]byte, error) {
 
 // Exists checks if the given object exists.
 func (s3 *S3Storage) Exists(name string) (bool, error) {
+	name = s3.trimLeading(name)
 	//log.Infof("S3Storage::Exists(%s)", name)
 
 	ctx := context.Background()
@@ -336,6 +341,8 @@ func (s3 *S3Storage) Exists(name string) (bool, error) {
 
 // Upload the contents of the reader as an object into the bucket.
 func (s3 *S3Storage) Write(name string, data []byte) error {
+	name = s3.trimLeading(name)
+
 	log.Infof("S3Storage::Write(%s)", name)
 
 	ctx := context.Background()
@@ -363,6 +370,8 @@ func (s3 *S3Storage) Write(name string, data []byte) error {
 
 // Attributes returns information about the specified object.
 func (s3 *S3Storage) Stat(name string) (*StorageInfo, error) {
+	name = s3.trimLeading(name)
+
 	//log.Infof("S3Storage::Stat(%s)", name)
 	ctx := context.Background()
 
@@ -383,6 +392,8 @@ func (s3 *S3Storage) Stat(name string) (*StorageInfo, error) {
 
 // Delete removes the object with the given name.
 func (s3 *S3Storage) Remove(name string) error {
+	name = s3.trimLeading(name)
+
 	log.Infof("S3Storage::Remove(%s)", name)
 	ctx := context.Background()
 
@@ -390,6 +401,8 @@ func (s3 *S3Storage) Remove(name string) error {
 }
 
 func (s3 *S3Storage) List(path string) ([]*StorageInfo, error) {
+	path = s3.trimLeading(path)
+
 	log.Infof("S3Storage::List(%s)", path)
 	ctx := context.Background()
 
@@ -430,6 +443,18 @@ func (s3 *S3Storage) List(path string) ([]*StorageInfo, error) {
 	return stats, nil
 }
 
+// trimLeading removes a leading / from the file name
+func (s3 *S3Storage) trimLeading(file string) string {
+	if len(file) == 0 {
+		return file
+	}
+
+	if file[0] == '/' {
+		return file[1:]
+	}
+	return file
+}
+
 // trimName removes the leading directory prefix
 func (s3 *S3Storage) trimName(file string) string {
 	slashIndex := strings.LastIndex(file, "/")