|
|
@@ -11,6 +11,7 @@ import (
|
|
|
"io"
|
|
|
"io/ioutil"
|
|
|
"strings"
|
|
|
+ "time"
|
|
|
|
|
|
"github.com/porter-dev/porter/internal/kubernetes/provisioner"
|
|
|
"github.com/porter-dev/porter/internal/kubernetes/provisioner/aws"
|
|
|
@@ -28,6 +29,7 @@ import (
|
|
|
"github.com/porter-dev/porter/internal/repository"
|
|
|
"golang.org/x/oauth2"
|
|
|
|
|
|
+ errors2 "errors"
|
|
|
"github.com/gorilla/websocket"
|
|
|
"github.com/porter-dev/porter/internal/helm/grapher"
|
|
|
appsv1 "k8s.io/api/apps/v1"
|
|
|
@@ -69,6 +71,31 @@ type ListOptions struct {
|
|
|
FieldSelector string
|
|
|
}
|
|
|
|
|
|
+type AuthError struct{}
|
|
|
+
|
|
|
+func (e *AuthError) Error() string {
|
|
|
+ return "Unauthorized error"
|
|
|
+}
|
|
|
+
|
|
|
+// UpdateClientset updates the Agent's Clientset (this refreshes auth tokens)
|
|
|
+func (a *Agent) UpdateClientset() error {
|
|
|
+ restConf, err := a.RESTClientGetter.ToRESTConfig()
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ clientset, err := kubernetes.NewForConfig(restConf)
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ a.Clientset = clientset
|
|
|
+
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
// CreateConfigMap creates the configmap given the key-value pairs and namespace
|
|
|
func (a *Agent) CreateConfigMap(name string, namespace string, configMap map[string]string) (*v1.ConfigMap, error) {
|
|
|
return a.Clientset.CoreV1().ConfigMaps(namespace).Create(
|
|
|
@@ -535,104 +562,144 @@ func (a *Agent) StopJobWithJobSidecar(namespace, name string) error {
|
|
|
})
|
|
|
}
|
|
|
|
|
|
+// RunWebsocketTask will run a websocket task. If the websocket returns an anauthorized error, it will restart
|
|
|
+// the task some number of times until failing
|
|
|
+func (a *Agent) RunWebsocketTask(task func() error) error {
|
|
|
+
|
|
|
+ lastTime := int64(0)
|
|
|
+
|
|
|
+ for {
|
|
|
+ if err := a.UpdateClientset(); err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ err := task()
|
|
|
+
|
|
|
+ if err == nil {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ if !errors2.Is(err, &AuthError{}) {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ if time.Now().Unix()-lastTime < 60 { // don't regenerate connection if too many unauthorized errors
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ lastTime = time.Now().Unix()
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
// StreamControllerStatus streams controller status. Supports Deployment, StatefulSet, ReplicaSet, and DaemonSet
|
|
|
// TODO: Support Jobs
|
|
|
func (a *Agent) StreamControllerStatus(conn *websocket.Conn, kind string, selectors string) error {
|
|
|
- // selectors is an array of max length 1. StreamControllerStatus accepts calls without the selectors argument.
|
|
|
- // selectors argument is a single string with comma separated key=value pairs. (e.g. "app=porter,porter=true")
|
|
|
- tweakListOptionsFunc := func(options *metav1.ListOptions) {
|
|
|
- options.LabelSelector = selectors
|
|
|
- }
|
|
|
|
|
|
- factory := informers.NewSharedInformerFactoryWithOptions(
|
|
|
- a.Clientset,
|
|
|
- 0,
|
|
|
- informers.WithTweakListOptions(tweakListOptionsFunc),
|
|
|
- )
|
|
|
+ run := func() error {
|
|
|
+ // selectors is an array of max length 1. StreamControllerStatus accepts calls without the selectors argument.
|
|
|
+ // selectors argument is a single string with comma separated key=value pairs. (e.g. "app=porter,porter=true")
|
|
|
+ tweakListOptionsFunc := func(options *metav1.ListOptions) {
|
|
|
+ options.LabelSelector = selectors
|
|
|
+ }
|
|
|
|
|
|
- var informer cache.SharedInformer
|
|
|
-
|
|
|
- // Spins up an informer depending on kind. Convert to lowercase for robustness
|
|
|
- switch strings.ToLower(kind) {
|
|
|
- case "deployment":
|
|
|
- informer = factory.Apps().V1().Deployments().Informer()
|
|
|
- case "statefulset":
|
|
|
- informer = factory.Apps().V1().StatefulSets().Informer()
|
|
|
- case "replicaset":
|
|
|
- informer = factory.Apps().V1().ReplicaSets().Informer()
|
|
|
- case "daemonset":
|
|
|
- informer = factory.Apps().V1().DaemonSets().Informer()
|
|
|
- case "job":
|
|
|
- informer = factory.Batch().V1().Jobs().Informer()
|
|
|
- case "cronjob":
|
|
|
- informer = factory.Batch().V1beta1().CronJobs().Informer()
|
|
|
- case "namespace":
|
|
|
- informer = factory.Core().V1().Namespaces().Informer()
|
|
|
- case "pod":
|
|
|
- informer = factory.Core().V1().Pods().Informer()
|
|
|
- }
|
|
|
+ factory := informers.NewSharedInformerFactoryWithOptions(
|
|
|
+ a.Clientset,
|
|
|
+ 0,
|
|
|
+ informers.WithTweakListOptions(tweakListOptionsFunc),
|
|
|
+ )
|
|
|
|
|
|
- stopper := make(chan struct{})
|
|
|
- errorchan := make(chan error)
|
|
|
- defer close(stopper)
|
|
|
-
|
|
|
- informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
|
|
- UpdateFunc: func(oldObj, newObj interface{}) {
|
|
|
- msg := Message{
|
|
|
- EventType: "UPDATE",
|
|
|
- Object: newObj,
|
|
|
- Kind: strings.ToLower(kind),
|
|
|
- }
|
|
|
- if writeErr := conn.WriteJSON(msg); writeErr != nil {
|
|
|
- errorchan <- writeErr
|
|
|
- return
|
|
|
- }
|
|
|
- },
|
|
|
- AddFunc: func(obj interface{}) {
|
|
|
- msg := Message{
|
|
|
- EventType: "ADD",
|
|
|
- Object: obj,
|
|
|
- Kind: strings.ToLower(kind),
|
|
|
- }
|
|
|
+ var informer cache.SharedInformer
|
|
|
+
|
|
|
+ // Spins up an informer depending on kind. Convert to lowercase for robustness
|
|
|
+ switch strings.ToLower(kind) {
|
|
|
+ case "deployment":
|
|
|
+ informer = factory.Apps().V1().Deployments().Informer()
|
|
|
+ case "statefulset":
|
|
|
+ informer = factory.Apps().V1().StatefulSets().Informer()
|
|
|
+ case "replicaset":
|
|
|
+ informer = factory.Apps().V1().ReplicaSets().Informer()
|
|
|
+ case "daemonset":
|
|
|
+ informer = factory.Apps().V1().DaemonSets().Informer()
|
|
|
+ case "job":
|
|
|
+ informer = factory.Batch().V1().Jobs().Informer()
|
|
|
+ case "cronjob":
|
|
|
+ informer = factory.Batch().V1beta1().CronJobs().Informer()
|
|
|
+ case "namespace":
|
|
|
+ informer = factory.Core().V1().Namespaces().Informer()
|
|
|
+ case "pod":
|
|
|
+ informer = factory.Core().V1().Pods().Informer()
|
|
|
+ }
|
|
|
|
|
|
- if writeErr := conn.WriteJSON(msg); writeErr != nil {
|
|
|
- errorchan <- writeErr
|
|
|
- return
|
|
|
- }
|
|
|
- },
|
|
|
- DeleteFunc: func(obj interface{}) {
|
|
|
- msg := Message{
|
|
|
- EventType: "DELETE",
|
|
|
- Object: obj,
|
|
|
- Kind: strings.ToLower(kind),
|
|
|
- }
|
|
|
+ stopper := make(chan struct{})
|
|
|
+ errorchan := make(chan error)
|
|
|
+ defer close(stopper)
|
|
|
|
|
|
- if writeErr := conn.WriteJSON(msg); writeErr != nil {
|
|
|
- errorchan <- writeErr
|
|
|
- return
|
|
|
+ informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
|
|
|
+ if strings.HasSuffix(err.Error(), ": Unauthorized") {
|
|
|
+ errorchan <- &AuthError{}
|
|
|
}
|
|
|
- },
|
|
|
- })
|
|
|
+ })
|
|
|
+
|
|
|
+ informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
|
|
+ UpdateFunc: func(oldObj, newObj interface{}) {
|
|
|
+ msg := Message{
|
|
|
+ EventType: "UPDATE",
|
|
|
+ Object: newObj,
|
|
|
+ Kind: strings.ToLower(kind),
|
|
|
+ }
|
|
|
+ if writeErr := conn.WriteJSON(msg); writeErr != nil {
|
|
|
+ errorchan <- writeErr
|
|
|
+ return
|
|
|
+ }
|
|
|
+ },
|
|
|
+ AddFunc: func(obj interface{}) {
|
|
|
+ msg := Message{
|
|
|
+ EventType: "ADD",
|
|
|
+ Object: obj,
|
|
|
+ Kind: strings.ToLower(kind),
|
|
|
+ }
|
|
|
|
|
|
- go func() {
|
|
|
- // listens for websocket closing handshake
|
|
|
- for {
|
|
|
- if _, _, err := conn.ReadMessage(); err != nil {
|
|
|
- conn.Close()
|
|
|
- errorchan <- nil
|
|
|
- return
|
|
|
+ if writeErr := conn.WriteJSON(msg); writeErr != nil {
|
|
|
+ errorchan <- writeErr
|
|
|
+ return
|
|
|
+ }
|
|
|
+ },
|
|
|
+ DeleteFunc: func(obj interface{}) {
|
|
|
+ msg := Message{
|
|
|
+ EventType: "DELETE",
|
|
|
+ Object: obj,
|
|
|
+ Kind: strings.ToLower(kind),
|
|
|
+ }
|
|
|
+
|
|
|
+ if writeErr := conn.WriteJSON(msg); writeErr != nil {
|
|
|
+ errorchan <- writeErr
|
|
|
+ return
|
|
|
+ }
|
|
|
+ },
|
|
|
+ })
|
|
|
+
|
|
|
+ go func() {
|
|
|
+ // listens for websocket closing handshake
|
|
|
+ for {
|
|
|
+ if _, _, err := conn.ReadMessage(); err != nil {
|
|
|
+ conn.Close()
|
|
|
+ errorchan <- nil
|
|
|
+ return
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
- }()
|
|
|
+ }()
|
|
|
|
|
|
- go informer.Run(stopper)
|
|
|
+ go informer.Run(stopper)
|
|
|
|
|
|
- for {
|
|
|
- select {
|
|
|
- case err := <-errorchan:
|
|
|
- return err
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case err := <-errorchan:
|
|
|
+ return err
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ return a.RunWebsocketTask(run)
|
|
|
}
|
|
|
|
|
|
var b64 = base64.StdEncoding
|
|
|
@@ -705,132 +772,143 @@ func parseSecretToHelmRelease(secret v1.Secret, chartList []string) (*rspb.Relea
|
|
|
}
|
|
|
|
|
|
func (a *Agent) StreamHelmReleases(conn *websocket.Conn, namespace string, chartList []string, selectors string) error {
|
|
|
- tweakListOptionsFunc := func(options *metav1.ListOptions) {
|
|
|
- options.LabelSelector = selectors
|
|
|
- }
|
|
|
|
|
|
- factory := informers.NewSharedInformerFactoryWithOptions(
|
|
|
- a.Clientset,
|
|
|
- 0,
|
|
|
- informers.WithTweakListOptions(tweakListOptionsFunc),
|
|
|
- informers.WithNamespace(namespace),
|
|
|
- )
|
|
|
+ run := func() error {
|
|
|
+ tweakListOptionsFunc := func(options *metav1.ListOptions) {
|
|
|
+ options.LabelSelector = selectors
|
|
|
+ }
|
|
|
|
|
|
- informer := factory.Core().V1().Secrets().Informer()
|
|
|
+ factory := informers.NewSharedInformerFactoryWithOptions(
|
|
|
+ a.Clientset,
|
|
|
+ 0,
|
|
|
+ informers.WithTweakListOptions(tweakListOptionsFunc),
|
|
|
+ informers.WithNamespace(namespace),
|
|
|
+ )
|
|
|
|
|
|
- stopper := make(chan struct{})
|
|
|
- errorchan := make(chan error)
|
|
|
- defer close(stopper)
|
|
|
+ informer := factory.Core().V1().Secrets().Informer()
|
|
|
|
|
|
- informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
|
|
- UpdateFunc: func(oldObj, newObj interface{}) {
|
|
|
- secretObj, ok := newObj.(*v1.Secret)
|
|
|
+ stopper := make(chan struct{})
|
|
|
+ errorchan := make(chan error)
|
|
|
+ defer close(stopper)
|
|
|
|
|
|
- if !ok {
|
|
|
- errorchan <- fmt.Errorf("could not cast to secret")
|
|
|
- return
|
|
|
+ informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
|
|
|
+ if strings.HasSuffix(err.Error(), ": Unauthorized") {
|
|
|
+ errorchan <- &AuthError{}
|
|
|
}
|
|
|
+ })
|
|
|
|
|
|
- helm_object, isNotHelmRelease, err := parseSecretToHelmRelease(*secretObj, chartList)
|
|
|
+ informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
|
|
+ UpdateFunc: func(oldObj, newObj interface{}) {
|
|
|
+ secretObj, ok := newObj.(*v1.Secret)
|
|
|
|
|
|
- if isNotHelmRelease && err == nil {
|
|
|
- return
|
|
|
- }
|
|
|
+ if !ok {
|
|
|
+ errorchan <- fmt.Errorf("could not cast to secret")
|
|
|
+ return
|
|
|
+ }
|
|
|
|
|
|
- if err != nil {
|
|
|
- errorchan <- err
|
|
|
- return
|
|
|
- }
|
|
|
+ helm_object, isNotHelmRelease, err := parseSecretToHelmRelease(*secretObj, chartList)
|
|
|
|
|
|
- msg := Message{
|
|
|
- EventType: "UPDATE",
|
|
|
- Object: helm_object,
|
|
|
- }
|
|
|
+ if isNotHelmRelease && err == nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
|
|
|
- if writeErr := conn.WriteJSON(msg); writeErr != nil {
|
|
|
- errorchan <- writeErr
|
|
|
- return
|
|
|
- }
|
|
|
- },
|
|
|
- AddFunc: func(obj interface{}) {
|
|
|
- secretObj, ok := obj.(*v1.Secret)
|
|
|
+ if err != nil {
|
|
|
+ errorchan <- err
|
|
|
+ return
|
|
|
+ }
|
|
|
|
|
|
- if !ok {
|
|
|
- errorchan <- fmt.Errorf("could not cast to secret")
|
|
|
- return
|
|
|
- }
|
|
|
+ msg := Message{
|
|
|
+ EventType: "UPDATE",
|
|
|
+ Object: helm_object,
|
|
|
+ }
|
|
|
|
|
|
- helm_object, isNotHelmRelease, err := parseSecretToHelmRelease(*secretObj, chartList)
|
|
|
+ if writeErr := conn.WriteJSON(msg); writeErr != nil {
|
|
|
+ errorchan <- writeErr
|
|
|
+ return
|
|
|
+ }
|
|
|
+ },
|
|
|
+ AddFunc: func(obj interface{}) {
|
|
|
+ secretObj, ok := obj.(*v1.Secret)
|
|
|
|
|
|
- if isNotHelmRelease && err == nil {
|
|
|
- return
|
|
|
- }
|
|
|
+ if !ok {
|
|
|
+ errorchan <- fmt.Errorf("could not cast to secret")
|
|
|
+ return
|
|
|
+ }
|
|
|
|
|
|
- if err != nil {
|
|
|
- errorchan <- err
|
|
|
- return
|
|
|
- }
|
|
|
+ helm_object, isNotHelmRelease, err := parseSecretToHelmRelease(*secretObj, chartList)
|
|
|
|
|
|
- msg := Message{
|
|
|
- EventType: "ADD",
|
|
|
- Object: helm_object,
|
|
|
- }
|
|
|
+ if isNotHelmRelease && err == nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
|
|
|
- if writeErr := conn.WriteJSON(msg); writeErr != nil {
|
|
|
- errorchan <- writeErr
|
|
|
- return
|
|
|
- }
|
|
|
- },
|
|
|
- DeleteFunc: func(obj interface{}) {
|
|
|
- secretObj, ok := obj.(*v1.Secret)
|
|
|
+ if err != nil {
|
|
|
+ errorchan <- err
|
|
|
+ return
|
|
|
+ }
|
|
|
|
|
|
- if !ok {
|
|
|
- errorchan <- fmt.Errorf("could not cast to secret")
|
|
|
- return
|
|
|
- }
|
|
|
+ msg := Message{
|
|
|
+ EventType: "ADD",
|
|
|
+ Object: helm_object,
|
|
|
+ }
|
|
|
|
|
|
- helm_object, isNotHelmRelease, err := parseSecretToHelmRelease(*secretObj, chartList)
|
|
|
+ if writeErr := conn.WriteJSON(msg); writeErr != nil {
|
|
|
+ errorchan <- writeErr
|
|
|
+ return
|
|
|
+ }
|
|
|
+ },
|
|
|
+ DeleteFunc: func(obj interface{}) {
|
|
|
+ secretObj, ok := obj.(*v1.Secret)
|
|
|
|
|
|
- if isNotHelmRelease && err == nil {
|
|
|
- return
|
|
|
- }
|
|
|
+ if !ok {
|
|
|
+ errorchan <- fmt.Errorf("could not cast to secret")
|
|
|
+ return
|
|
|
+ }
|
|
|
|
|
|
- if err != nil {
|
|
|
- errorchan <- err
|
|
|
- return
|
|
|
- }
|
|
|
+ helm_object, isNotHelmRelease, err := parseSecretToHelmRelease(*secretObj, chartList)
|
|
|
|
|
|
- msg := Message{
|
|
|
- EventType: "DELETE",
|
|
|
- Object: helm_object,
|
|
|
- }
|
|
|
+ if isNotHelmRelease && err == nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
|
|
|
- if writeErr := conn.WriteJSON(msg); writeErr != nil {
|
|
|
- errorchan <- writeErr
|
|
|
- return
|
|
|
- }
|
|
|
- },
|
|
|
- })
|
|
|
+ if err != nil {
|
|
|
+ errorchan <- err
|
|
|
+ return
|
|
|
+ }
|
|
|
|
|
|
- go func() {
|
|
|
- // listens for websocket closing handshake
|
|
|
- for {
|
|
|
- if _, _, err := conn.ReadMessage(); err != nil {
|
|
|
- conn.Close()
|
|
|
- errorchan <- nil
|
|
|
- return
|
|
|
+ msg := Message{
|
|
|
+ EventType: "DELETE",
|
|
|
+ Object: helm_object,
|
|
|
+ }
|
|
|
+
|
|
|
+ if writeErr := conn.WriteJSON(msg); writeErr != nil {
|
|
|
+ errorchan <- writeErr
|
|
|
+ return
|
|
|
+ }
|
|
|
+ },
|
|
|
+ })
|
|
|
+
|
|
|
+ go func() {
|
|
|
+ // listens for websocket closing handshake
|
|
|
+ for {
|
|
|
+ if _, _, err := conn.ReadMessage(); err != nil {
|
|
|
+ conn.Close()
|
|
|
+ errorchan <- nil
|
|
|
+ return
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
- }()
|
|
|
+ }()
|
|
|
|
|
|
- go informer.Run(stopper)
|
|
|
+ go informer.Run(stopper)
|
|
|
|
|
|
- for {
|
|
|
- select {
|
|
|
- case err := <-errorchan:
|
|
|
- return err
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case err := <-errorchan:
|
|
|
+ return err
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ return a.RunWebsocketTask(run)
|
|
|
}
|
|
|
|
|
|
// ProvisionECR spawns a new provisioning pod that creates an ECR instance
|