Browse Source

Update router to use cluster cache for all kubernetes related access.

Matt Bolt 4 years ago
parent
commit
ebe0f52a91
1 changed files with 88 additions and 81 deletions
  1. 88 81
      pkg/costmodel/router.go

+ 88 - 81
pkg/costmodel/router.go

@@ -7,7 +7,6 @@ import (
 	"fmt"
 	"io/ioutil"
 	"net/http"
-	"os"
 	"reflect"
 	"strconv"
 	"strings"
@@ -24,8 +23,6 @@ import (
 	v1 "k8s.io/api/core/v1"
 	"k8s.io/klog"
 
-	k8serrors "k8s.io/apimachinery/pkg/api/errors"
-
 	"github.com/julienschmidt/httprouter"
 
 	sentry "github.com/getsentry/sentry-go"
@@ -42,6 +39,7 @@ import (
 	"github.com/kubecost/cost-model/pkg/util/json"
 	prometheus "github.com/prometheus/client_golang/api"
 	prometheusAPI "github.com/prometheus/client_golang/api/prometheus/v1"
+	appsv1 "k8s.io/api/apps/v1"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 
 	"github.com/patrickmn/go-cache"
@@ -77,6 +75,7 @@ type Accesses struct {
 	PrometheusClient    prometheus.Client
 	ThanosClient        prometheus.Client
 	KubeClientSet       kubernetes.Interface
+	ClusterCache        clustercache.ClusterCache
 	ClusterMap          clusters.ClusterMap
 	CloudProvider       cloud.Provider
 	ConfigFileManager   *config.ConfigFileManager
@@ -928,10 +927,7 @@ func (a *Accesses) GetAllPersistentVolumes(w http.ResponseWriter, r *http.Reques
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
 
-	pvList, err := a.KubeClientSet.CoreV1().PersistentVolumes().List(r.Context(), metav1.ListOptions{})
-	if err != nil {
-		fmt.Fprintf(w, "Error getting persistent volume %v\n", err)
-	}
+	pvList := a.ClusterCache.GetAllPersistentVolumes()
 
 	body, err := json.Marshal(pvList)
 	if err != nil {
@@ -945,12 +941,28 @@ func (a *Accesses) GetAllPersistentVolumes(w http.ResponseWriter, r *http.Reques
 func (a *Accesses) GetAllDeployments(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
-	namespace := r.URL.Query().Get("namespace")
-	deploymentsList, err := a.KubeClientSet.AppsV1().Deployments(namespace).List(r.Context(), metav1.ListOptions{})
-	if err != nil {
-		fmt.Fprintf(w, "Error getting deployments %v\n", err)
+
+	qp := httputil.NewQueryParams(r.URL.Query())
+
+	namespace := qp.Get("namespace", "")
+
+	deploymentsList := a.ClusterCache.GetAllDeployments()
+
+	// filter for provided namespace
+	var deployments []*appsv1.Deployment
+	if namespace == "" {
+		deployments = deploymentsList
+	} else {
+		deployments = []*appsv1.Deployment{}
+
+		for _, d := range deploymentsList {
+			if d.Namespace == namespace {
+				deployments = append(deployments, d)
+			}
+		}
 	}
-	body, err := json.Marshal(deploymentsList)
+
+	body, err := json.Marshal(deployments)
 	if err != nil {
 		fmt.Fprintf(w, "Error decoding deployment: "+err.Error())
 	} else {
@@ -962,10 +974,8 @@ func (a *Accesses) GetAllStorageClasses(w http.ResponseWriter, r *http.Request,
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
 
-	scList, err := a.KubeClientSet.StorageV1().StorageClasses().List(r.Context(), metav1.ListOptions{})
-	if err != nil {
-		fmt.Fprintf(w, "Error getting storageclasses: "+err.Error())
-	}
+	scList := a.ClusterCache.GetAllStorageClasses()
+
 	body, err := json.Marshal(scList)
 	if err != nil {
 		fmt.Fprintf(w, "Error decoding storageclasses: "+err.Error())
@@ -977,12 +987,28 @@ func (a *Accesses) GetAllStorageClasses(w http.ResponseWriter, r *http.Request,
 func (a *Accesses) GetAllStatefulSets(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
-	namespace := r.URL.Query().Get("namespace")
-	deploymentsList, err := a.KubeClientSet.AppsV1().StatefulSets(namespace).List(r.Context(), metav1.ListOptions{})
-	if err != nil {
-		fmt.Fprintf(w, "Error getting deployments %v\n", err)
+
+	qp := httputil.NewQueryParams(r.URL.Query())
+
+	namespace := qp.Get("namespace", "")
+
+	statefulSetsList := a.ClusterCache.GetAllStatefulSets()
+
+	// filter for provided namespace
+	var statefulSets []*appsv1.StatefulSet
+	if namespace == "" {
+		statefulSets = statefulSetsList
+	} else {
+		statefulSets = []*appsv1.StatefulSet{}
+
+		for _, ss := range statefulSetsList {
+			if ss.Namespace == namespace {
+				statefulSets = append(statefulSets, ss)
+			}
+		}
 	}
-	body, err := json.Marshal(deploymentsList)
+
+	body, err := json.Marshal(statefulSets)
 	if err != nil {
 		fmt.Fprintf(w, "Error decoding deployment: "+err.Error())
 	} else {
@@ -994,10 +1020,7 @@ func (a *Accesses) GetAllNodes(w http.ResponseWriter, r *http.Request, ps httpro
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
 
-	nodeList, err := a.KubeClientSet.CoreV1().Nodes().List(r.Context(), metav1.ListOptions{})
-	if err != nil {
-		fmt.Fprintf(w, "Error getting node %v\n", err)
-	}
+	nodeList := a.ClusterCache.GetAllNodes()
 
 	body, err := json.Marshal(nodeList)
 	if err != nil {
@@ -1011,10 +1034,7 @@ func (a *Accesses) GetAllPods(w http.ResponseWriter, r *http.Request, ps httprou
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
 
-	podlist, err := a.KubeClientSet.CoreV1().Pods("").List(r.Context(), metav1.ListOptions{})
-	if err != nil {
-		fmt.Fprintf(w, "Error getting pod %v\n", err)
-	}
+	podlist := a.ClusterCache.GetAllPods()
 
 	body, err := json.Marshal(podlist)
 	if err != nil {
@@ -1028,10 +1048,8 @@ func (a *Accesses) GetAllNamespaces(w http.ResponseWriter, r *http.Request, ps h
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
 
-	namespaces, err := a.KubeClientSet.CoreV1().Namespaces().List(r.Context(), metav1.ListOptions{})
-	if err != nil {
-		fmt.Fprintf(w, "Error getting namespaces %v\n", err)
-	}
+	namespaces := a.ClusterCache.GetAllNamespaces()
+
 	body, err := json.Marshal(namespaces)
 	if err != nil {
 		fmt.Fprintf(w, "Error decoding deployment: "+err.Error())
@@ -1043,10 +1061,9 @@ func (a *Accesses) GetAllNamespaces(w http.ResponseWriter, r *http.Request, ps h
 func (a *Accesses) GetAllDaemonSets(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
-	daemonSets, err := a.KubeClientSet.AppsV1().DaemonSets("").List(r.Context(), metav1.ListOptions{})
-	if err != nil {
-		fmt.Fprintf(w, "Error getting daemon sets %v\n", err)
-	}
+
+	daemonSets := a.ClusterCache.GetAllDaemonSets()
+
 	body, err := json.Marshal(daemonSets)
 	if err != nil {
 		fmt.Fprintf(w, "Error decoding daemon set: "+err.Error())
@@ -1062,24 +1079,21 @@ func (a *Accesses) GetPod(w http.ResponseWriter, r *http.Request, ps httprouter.
 	podName := ps.ByName("name")
 	podNamespace := ps.ByName("namespace")
 
-	// Examples for error handling:
-	// - Use helper functions like e.g. errors.IsNotFound()
-	// - And/or cast to StatusError and use its properties like e.g. ErrStatus.Message
-	pod, err := a.KubeClientSet.CoreV1().Pods(podNamespace).Get(r.Context(), podName, metav1.GetOptions{})
-	if k8serrors.IsNotFound(err) {
-		fmt.Fprintf(w, "Pod not found\n")
-	} else if statusError, isStatus := err.(*k8serrors.StatusError); isStatus {
-		fmt.Fprintf(w, "Error getting pod %v\n", statusError.ErrStatus.Message)
-	} else if err != nil {
-		fmt.Fprintf(w, "Error getting pod: "+err.Error())
-	} else {
-		body, err := json.Marshal(pod)
-		if err != nil {
-			fmt.Fprintf(w, "Error decoding pod: "+err.Error())
-		} else {
-			w.Write(body)
+	// TODO: ClusterCache API could probably afford to have some better filtering
+	allPods := a.ClusterCache.GetAllPods()
+	for _, pod := range allPods {
+		if pod.Namespace == podNamespace && pod.Name == podName {
+			body, err := json.Marshal(pod)
+			if err != nil {
+				fmt.Fprintf(w, "Error decoding pod: "+err.Error())
+			} else {
+				w.Write(body)
+			}
+			return
 		}
 	}
+
+	fmt.Fprintf(w, "Pod not found\n")
 }
 
 func (a *Accesses) PrometheusRecordingRules(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
@@ -1105,9 +1119,9 @@ func (a *Accesses) PrometheusConfig(w http.ResponseWriter, r *http.Request, _ ht
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
 
-	pConfig := make(map[string]string)
-
-	pConfig["address"] = os.Getenv("PROMETHEUS_SERVER_ENDPOINT")
+	pConfig := map[string]string{
+		"address": env.GetPrometheusServerEndpoint(),
+	}
 
 	body, err := json.Marshal(pConfig)
 	if err != nil {
@@ -1140,13 +1154,10 @@ func (a *Accesses) GetOrphanedPods(w http.ResponseWriter, r *http.Request, ps ht
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
 
-	podlist, err := a.KubeClientSet.CoreV1().Pods("").List(r.Context(), metav1.ListOptions{})
-	if err != nil {
-		fmt.Fprintf(w, "Error getting pod %v\n", err)
-	}
+	podlist := a.ClusterCache.GetAllPods()
 
-	var lonePods []v1.Pod
-	for _, pod := range podlist.Items {
+	var lonePods []*v1.Pod
+	for _, pod := range podlist {
 		if len(pod.OwnerReferences) == 0 {
 			lonePods = append(lonePods, pod)
 		}
@@ -1164,10 +1175,11 @@ func (a *Accesses) GetInstallNamespace(w http.ResponseWriter, r *http.Request, _
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
 
-	ns := os.Getenv("KUBECOST_NAMESPACE")
+	ns := env.GetKubecostNamespace()
 	w.Write([]byte(ns))
 }
 
+// logsFor pulls the logs for a specific pod, namespace, and container
 func logsFor(c kubernetes.Interface, namespace string, pod string, container string, dur time.Duration, ctx context.Context) (string, error) {
 	since := time.Now().UTC().Add(-dur)
 
@@ -1196,18 +1208,13 @@ func (a *Accesses) GetPodLogs(w http.ResponseWriter, r *http.Request, ps httprou
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
 
-	q := r.URL.Query()
-	ns := q.Get("namespace")
-	if ns == "" {
-		ns = os.Getenv("KUBECOST_NAMESPACE")
-	}
-	pod := q.Get("pod")
-	selector := q.Get("selector")
-	container := q.Get("container")
-	since := q.Get("since")
-	if since == "" {
-		since = "24h"
-	}
+	qp := httputil.NewQueryParams(r.URL.Query())
+
+	ns := qp.Get("namespace", env.GetKubecostNamespace())
+	pod := qp.Get("pod", "")
+	selector := qp.Get("selector", "")
+	container := qp.Get("container", "")
+	since := qp.Get("since", "24h")
 
 	sinceDuration, err := time.ParseDuration(since)
 	if err != nil {
@@ -1284,8 +1291,6 @@ func (a *Accesses) AddServiceKey(w http.ResponseWriter, r *http.Request, ps http
 
 	r.ParseForm()
 
-	//p.CloudProvider.AddServiceKey(r.PostForm)
-
 	key := r.PostForm.Get("key")
 	k := []byte(key)
 	err := ioutil.WriteFile("/var/configs/key.json", k, 0644)
@@ -1300,7 +1305,7 @@ func (a *Accesses) GetHelmValues(w http.ResponseWriter, r *http.Request, ps http
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
 
-	encodedValues := os.Getenv("HELM_VALUES")
+	encodedValues := env.Get("HELM_VALUES", "")
 	if encodedValues == "" {
 		fmt.Fprintf(w, "Values reporting disabled")
 		return
@@ -1319,14 +1324,15 @@ func (a *Accesses) Status(w http.ResponseWriter, r *http.Request, _ httprouter.P
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
 
+	promServer := env.GetPrometheusServerEndpoint()
+
 	api := prometheusAPI.NewAPI(a.PrometheusClient)
 	result, err := api.Config(r.Context())
 	if err != nil {
-
-		fmt.Fprintf(w, "Using Prometheus at "+os.Getenv("PROMETHEUS_SERVER_ENDPOINT")+". Error: "+err.Error())
+		fmt.Fprintf(w, "Using Prometheus at "+promServer+". Error: "+err.Error())
 	} else {
 
-		fmt.Fprintf(w, "Using Prometheus at "+os.Getenv("PROMETHEUS_SERVER_ENDPOINT")+". PrometheusConfig: "+result.YAML)
+		fmt.Fprintf(w, "Using Prometheus at "+promServer+". PrometheusConfig: "+result.YAML)
 	}
 }
 
@@ -1568,6 +1574,7 @@ func Initialize(additionalConfigWatchers ...*watcher.ConfigMapWatcher) *Accesses
 		PrometheusClient:    promCli,
 		ThanosClient:        thanosClient,
 		KubeClientSet:       kubeClientset,
+		ClusterCache:        k8sCache,
 		ClusterMap:          clusterMap,
 		CloudProvider:       cloudProvider,
 		ConfigFileManager:   confManager,