ソースを参照

fix merge conflicts

Alexander Belanger 4 年 前
コミット
3c80c0ffb4
3 ファイル変更278 行追加190 行削除
  1. 6 5
      cli/cmd/deploy/deploy.go
  2. 14 5
      docs/guides/jobs-and-cron-jobs.md
  3. 258 180
      internal/kubernetes/agent.go

+ 6 - 5
cli/cmd/deploy/deploy.go

@@ -284,16 +284,17 @@ func (d *DeployAgent) Push() error {
 // reuses the configuration set for the application. If overrideValues is not nil,
 // it will merge the overriding values with the existing configuration.
 func (d *DeployAgent) UpdateImageAndValues(overrideValues map[string]interface{}) error {
+	// if this is a job chart, set "paused" to false so that the job doesn't run, unless
+	// the user has explicitly overriden the "paused" field
+	if _, exists := overrideValues["paused"]; d.release.Chart.Name() == "job" && !exists {
+		overrideValues["paused"] = true
+	}
+
 	mergedValues := utils.CoalesceValues(d.release.Config, overrideValues)
 
 	// overwrite the tag based on a new image
 	currImageSection := mergedValues["image"].(map[string]interface{})
 
-	// if this is a job chart, set "paused" to false so that the job doesn't run
-	if d.release.Chart.Name() == "job" {
-		mergedValues["paused"] = true
-	}
-
 	// if the current image section is hello-porter, the image must be overriden
 	if currImageSection["repository"] == "public.ecr.aws/o1j4x7p4/hello-porter" ||
 		currImageSection["repository"] == "public.ecr.aws/o1j4x7p4/hello-porter-job" {

+ 14 - 5
docs/guides/jobs-and-cron-jobs.md

@@ -1,4 +1,4 @@
-You can create one-time jobs or cron jobs on Porter, which can be linked [from your Github repo](https://docs.getporter.dev/docs/applications) or [from an existing Docker image registry](https://docs.getporter.dev/docs/deploying-from-docker-image-registry). Cron jobs are meant to run on a schedule using a specified [cron expression](https://en.wikipedia.org/wiki/Cron#CRON_expression), while one-time jobs are meant to be triggered manually or on every push to your Github repository. Here are some use-cases for each type of job:
+You can create one-time jobs or cron jobs on Porter, which can be linked [from your Github repo](https://docs.getporter.dev/docs/applications) or [from an existing Docker image registry](https://docs.getporter.dev/docs/deploying-from-docker-image-registry). Cron jobs are meant to run on a schedule using a specified [cron expression](https://en.wikipedia.org/wiki/Cron#CRON_expression), while one-time jobs are meant to be triggered manually. Here are some use-cases for each type of job:
 
 - Run one-time jobs for database migration scripts, data processing, or generally scripts that are designed to run to completion on an unpredictable schedule
 - Run cron jobs for tasks that should run on a specified schedule, such as scraping data at a specified interval, cleaning up rows in a database, taking backups of a DB, or sending batch notifications at a specified time every day
@@ -23,11 +23,20 @@ To re-run the job, simply click the "Rerun job" button in the bottom right corne
 
 ## Running One-Time Jobs from Github Repositories
 
-When you set up a one-time job to deploy from a Github repository, the job will automatically be run on each push to a specific branch in the Github repository. There are cases where it is useful to run jobs on each push to your `main` branch: for example, running a schema migration script so that your data schema is always up to date. However, if you do not want the job to run frequently, you should create a branch that you push to only when you want the job to be re-run. 
+When you set up a one-time job to deploy from a Github repository, the job will **not** run automatically -- the Github action will simply update the image used to run the job. 
 
-> 🚧
-> 
-> **Note:** we are working on a better solution for deploying jobs from a Github repository, so that the job only rebuilds when you want it to. This will be addressed in the next release.
+To get the Github action to run the job automatically, you can use [this Github action](https://github.com/porter-dev/porter-run-job-action). For example:
+
+```yaml
+# ... the rest of your Github action
+    - name: Run Porter job
+      uses: porter-dev/porter-run-job-action@v0.1.0
+      with:
+        job: <job-name> # TODO: replace w/ name of your job
+        cluster: <cluster-id> # TODO: replace w/ cluster ID
+        project: <project-id> # TODO: replace w/ project ID
+        token: ${{ secrets.PORTER_TOKEN_12 }}
+```
 
 # Deploying a Cron Job
 

+ 258 - 180
internal/kubernetes/agent.go

@@ -11,6 +11,7 @@ import (
 	"io"
 	"io/ioutil"
 	"strings"
+	"time"
 
 	"github.com/porter-dev/porter/api/server/shared/config/env"
 	"github.com/porter-dev/porter/internal/kubernetes/provisioner"
@@ -29,6 +30,7 @@ import (
 	"github.com/porter-dev/porter/internal/repository"
 	"golang.org/x/oauth2"
 
+	errors2 "errors"
 	"github.com/gorilla/websocket"
 	"github.com/porter-dev/porter/internal/helm/grapher"
 	appsv1 "k8s.io/api/apps/v1"
@@ -68,6 +70,31 @@ type ListOptions struct {
 	FieldSelector string
 }
 
+type AuthError struct{}
+
+func (e *AuthError) Error() string {
+	return "Unauthorized error"
+}
+
+// UpdateClientset updates the Agent's Clientset (this refreshes auth tokens)
+func (a *Agent) UpdateClientset() error {
+	restConf, err := a.RESTClientGetter.ToRESTConfig()
+
+	if err != nil {
+		return err
+	}
+
+	clientset, err := kubernetes.NewForConfig(restConf)
+
+	if err != nil {
+		return err
+	}
+
+	a.Clientset = clientset
+
+	return nil
+}
+
 // CreateConfigMap creates the configmap given the key-value pairs and namespace
 func (a *Agent) CreateConfigMap(name string, namespace string, configMap map[string]string) (*v1.ConfigMap, error) {
 	return a.Clientset.CoreV1().ConfigMaps(namespace).Create(
@@ -580,104 +607,144 @@ func (a *Agent) StopJobWithJobSidecar(namespace, name string) error {
 	})
 }
 
+// RunWebsocketTask will run a websocket task. If the websocket returns an anauthorized error, it will restart
+// the task some number of times until failing
+func (a *Agent) RunWebsocketTask(task func() error) error {
+
+	lastTime := int64(0)
+
+	for {
+		if err := a.UpdateClientset(); err != nil {
+			return err
+		}
+
+		err := task()
+
+		if err == nil {
+			return nil
+		}
+
+		if !errors2.Is(err, &AuthError{}) {
+			return err
+		}
+
+		if time.Now().Unix()-lastTime < 60 { // don't regenerate connection if too many unauthorized errors
+			return err
+		}
+
+		lastTime = time.Now().Unix()
+	}
+}
+
 // StreamControllerStatus streams controller status. Supports Deployment, StatefulSet, ReplicaSet, and DaemonSet
 // TODO: Support Jobs
 func (a *Agent) StreamControllerStatus(conn *websocket.Conn, kind string, selectors string) error {
-	// selectors is an array of max length 1. StreamControllerStatus accepts calls without the selectors argument.
-	// selectors argument is a single string with comma separated key=value pairs. (e.g. "app=porter,porter=true")
-	tweakListOptionsFunc := func(options *metav1.ListOptions) {
-		options.LabelSelector = selectors
-	}
 
-	factory := informers.NewSharedInformerFactoryWithOptions(
-		a.Clientset,
-		0,
-		informers.WithTweakListOptions(tweakListOptionsFunc),
-	)
+	run := func() error {
+		// selectors is an array of max length 1. StreamControllerStatus accepts calls without the selectors argument.
+		// selectors argument is a single string with comma separated key=value pairs. (e.g. "app=porter,porter=true")
+		tweakListOptionsFunc := func(options *metav1.ListOptions) {
+			options.LabelSelector = selectors
+		}
 
-	var informer cache.SharedInformer
-
-	// Spins up an informer depending on kind. Convert to lowercase for robustness
-	switch strings.ToLower(kind) {
-	case "deployment":
-		informer = factory.Apps().V1().Deployments().Informer()
-	case "statefulset":
-		informer = factory.Apps().V1().StatefulSets().Informer()
-	case "replicaset":
-		informer = factory.Apps().V1().ReplicaSets().Informer()
-	case "daemonset":
-		informer = factory.Apps().V1().DaemonSets().Informer()
-	case "job":
-		informer = factory.Batch().V1().Jobs().Informer()
-	case "cronjob":
-		informer = factory.Batch().V1beta1().CronJobs().Informer()
-	case "namespace":
-		informer = factory.Core().V1().Namespaces().Informer()
-	case "pod":
-		informer = factory.Core().V1().Pods().Informer()
-	}
+		factory := informers.NewSharedInformerFactoryWithOptions(
+			a.Clientset,
+			0,
+			informers.WithTweakListOptions(tweakListOptionsFunc),
+		)
 
-	stopper := make(chan struct{})
-	errorchan := make(chan error)
-	defer close(stopper)
-
-	informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
-		UpdateFunc: func(oldObj, newObj interface{}) {
-			msg := Message{
-				EventType: "UPDATE",
-				Object:    newObj,
-				Kind:      strings.ToLower(kind),
-			}
-			if writeErr := conn.WriteJSON(msg); writeErr != nil {
-				errorchan <- writeErr
-				return
-			}
-		},
-		AddFunc: func(obj interface{}) {
-			msg := Message{
-				EventType: "ADD",
-				Object:    obj,
-				Kind:      strings.ToLower(kind),
-			}
+		var informer cache.SharedInformer
+
+		// Spins up an informer depending on kind. Convert to lowercase for robustness
+		switch strings.ToLower(kind) {
+		case "deployment":
+			informer = factory.Apps().V1().Deployments().Informer()
+		case "statefulset":
+			informer = factory.Apps().V1().StatefulSets().Informer()
+		case "replicaset":
+			informer = factory.Apps().V1().ReplicaSets().Informer()
+		case "daemonset":
+			informer = factory.Apps().V1().DaemonSets().Informer()
+		case "job":
+			informer = factory.Batch().V1().Jobs().Informer()
+		case "cronjob":
+			informer = factory.Batch().V1beta1().CronJobs().Informer()
+		case "namespace":
+			informer = factory.Core().V1().Namespaces().Informer()
+		case "pod":
+			informer = factory.Core().V1().Pods().Informer()
+		}
 
-			if writeErr := conn.WriteJSON(msg); writeErr != nil {
-				errorchan <- writeErr
-				return
-			}
-		},
-		DeleteFunc: func(obj interface{}) {
-			msg := Message{
-				EventType: "DELETE",
-				Object:    obj,
-				Kind:      strings.ToLower(kind),
-			}
+		stopper := make(chan struct{})
+		errorchan := make(chan error)
+		defer close(stopper)
 
-			if writeErr := conn.WriteJSON(msg); writeErr != nil {
-				errorchan <- writeErr
-				return
+		informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
+			if strings.HasSuffix(err.Error(), ": Unauthorized") {
+				errorchan <- &AuthError{}
 			}
-		},
-	})
+		})
+
+		informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
+			UpdateFunc: func(oldObj, newObj interface{}) {
+				msg := Message{
+					EventType: "UPDATE",
+					Object:    newObj,
+					Kind:      strings.ToLower(kind),
+				}
+				if writeErr := conn.WriteJSON(msg); writeErr != nil {
+					errorchan <- writeErr
+					return
+				}
+			},
+			AddFunc: func(obj interface{}) {
+				msg := Message{
+					EventType: "ADD",
+					Object:    obj,
+					Kind:      strings.ToLower(kind),
+				}
 
-	go func() {
-		// listens for websocket closing handshake
-		for {
-			if _, _, err := conn.ReadMessage(); err != nil {
-				conn.Close()
-				errorchan <- nil
-				return
+				if writeErr := conn.WriteJSON(msg); writeErr != nil {
+					errorchan <- writeErr
+					return
+				}
+			},
+			DeleteFunc: func(obj interface{}) {
+				msg := Message{
+					EventType: "DELETE",
+					Object:    obj,
+					Kind:      strings.ToLower(kind),
+				}
+
+				if writeErr := conn.WriteJSON(msg); writeErr != nil {
+					errorchan <- writeErr
+					return
+				}
+			},
+		})
+
+		go func() {
+			// listens for websocket closing handshake
+			for {
+				if _, _, err := conn.ReadMessage(); err != nil {
+					conn.Close()
+					errorchan <- nil
+					return
+				}
 			}
-		}
-	}()
+		}()
 
-	go informer.Run(stopper)
+		go informer.Run(stopper)
 
-	for {
-		select {
-		case err := <-errorchan:
-			return err
+		for {
+			select {
+			case err := <-errorchan:
+				return err
+			}
 		}
 	}
+
+	return a.RunWebsocketTask(run)
 }
 
 var b64 = base64.StdEncoding
@@ -750,132 +817,143 @@ func parseSecretToHelmRelease(secret v1.Secret, chartList []string) (*rspb.Relea
 }
 
 func (a *Agent) StreamHelmReleases(conn *websocket.Conn, namespace string, chartList []string, selectors string) error {
-	tweakListOptionsFunc := func(options *metav1.ListOptions) {
-		options.LabelSelector = selectors
-	}
 
-	factory := informers.NewSharedInformerFactoryWithOptions(
-		a.Clientset,
-		0,
-		informers.WithTweakListOptions(tweakListOptionsFunc),
-		informers.WithNamespace(namespace),
-	)
+	run := func() error {
+		tweakListOptionsFunc := func(options *metav1.ListOptions) {
+			options.LabelSelector = selectors
+		}
 
-	informer := factory.Core().V1().Secrets().Informer()
+		factory := informers.NewSharedInformerFactoryWithOptions(
+			a.Clientset,
+			0,
+			informers.WithTweakListOptions(tweakListOptionsFunc),
+			informers.WithNamespace(namespace),
+		)
 
-	stopper := make(chan struct{})
-	errorchan := make(chan error)
-	defer close(stopper)
+		informer := factory.Core().V1().Secrets().Informer()
 
-	informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
-		UpdateFunc: func(oldObj, newObj interface{}) {
-			secretObj, ok := newObj.(*v1.Secret)
+		stopper := make(chan struct{})
+		errorchan := make(chan error)
+		defer close(stopper)
 
-			if !ok {
-				errorchan <- fmt.Errorf("could not cast to secret")
-				return
+		informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
+			if strings.HasSuffix(err.Error(), ": Unauthorized") {
+				errorchan <- &AuthError{}
 			}
+		})
 
-			helm_object, isNotHelmRelease, err := parseSecretToHelmRelease(*secretObj, chartList)
+		informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
+			UpdateFunc: func(oldObj, newObj interface{}) {
+				secretObj, ok := newObj.(*v1.Secret)
 
-			if isNotHelmRelease && err == nil {
-				return
-			}
+				if !ok {
+					errorchan <- fmt.Errorf("could not cast to secret")
+					return
+				}
 
-			if err != nil {
-				errorchan <- err
-				return
-			}
+				helm_object, isNotHelmRelease, err := parseSecretToHelmRelease(*secretObj, chartList)
 
-			msg := Message{
-				EventType: "UPDATE",
-				Object:    helm_object,
-			}
+				if isNotHelmRelease && err == nil {
+					return
+				}
 
-			if writeErr := conn.WriteJSON(msg); writeErr != nil {
-				errorchan <- writeErr
-				return
-			}
-		},
-		AddFunc: func(obj interface{}) {
-			secretObj, ok := obj.(*v1.Secret)
+				if err != nil {
+					errorchan <- err
+					return
+				}
 
-			if !ok {
-				errorchan <- fmt.Errorf("could not cast to secret")
-				return
-			}
+				msg := Message{
+					EventType: "UPDATE",
+					Object:    helm_object,
+				}
 
-			helm_object, isNotHelmRelease, err := parseSecretToHelmRelease(*secretObj, chartList)
+				if writeErr := conn.WriteJSON(msg); writeErr != nil {
+					errorchan <- writeErr
+					return
+				}
+			},
+			AddFunc: func(obj interface{}) {
+				secretObj, ok := obj.(*v1.Secret)
 
-			if isNotHelmRelease && err == nil {
-				return
-			}
+				if !ok {
+					errorchan <- fmt.Errorf("could not cast to secret")
+					return
+				}
 
-			if err != nil {
-				errorchan <- err
-				return
-			}
+				helm_object, isNotHelmRelease, err := parseSecretToHelmRelease(*secretObj, chartList)
 
-			msg := Message{
-				EventType: "ADD",
-				Object:    helm_object,
-			}
+				if isNotHelmRelease && err == nil {
+					return
+				}
 
-			if writeErr := conn.WriteJSON(msg); writeErr != nil {
-				errorchan <- writeErr
-				return
-			}
-		},
-		DeleteFunc: func(obj interface{}) {
-			secretObj, ok := obj.(*v1.Secret)
+				if err != nil {
+					errorchan <- err
+					return
+				}
 
-			if !ok {
-				errorchan <- fmt.Errorf("could not cast to secret")
-				return
-			}
+				msg := Message{
+					EventType: "ADD",
+					Object:    helm_object,
+				}
 
-			helm_object, isNotHelmRelease, err := parseSecretToHelmRelease(*secretObj, chartList)
+				if writeErr := conn.WriteJSON(msg); writeErr != nil {
+					errorchan <- writeErr
+					return
+				}
+			},
+			DeleteFunc: func(obj interface{}) {
+				secretObj, ok := obj.(*v1.Secret)
 
-			if isNotHelmRelease && err == nil {
-				return
-			}
+				if !ok {
+					errorchan <- fmt.Errorf("could not cast to secret")
+					return
+				}
 
-			if err != nil {
-				errorchan <- err
-				return
-			}
+				helm_object, isNotHelmRelease, err := parseSecretToHelmRelease(*secretObj, chartList)
 
-			msg := Message{
-				EventType: "DELETE",
-				Object:    helm_object,
-			}
+				if isNotHelmRelease && err == nil {
+					return
+				}
 
-			if writeErr := conn.WriteJSON(msg); writeErr != nil {
-				errorchan <- writeErr
-				return
-			}
-		},
-	})
+				if err != nil {
+					errorchan <- err
+					return
+				}
 
-	go func() {
-		// listens for websocket closing handshake
-		for {
-			if _, _, err := conn.ReadMessage(); err != nil {
-				conn.Close()
-				errorchan <- nil
-				return
+				msg := Message{
+					EventType: "DELETE",
+					Object:    helm_object,
+				}
+
+				if writeErr := conn.WriteJSON(msg); writeErr != nil {
+					errorchan <- writeErr
+					return
+				}
+			},
+		})
+
+		go func() {
+			// listens for websocket closing handshake
+			for {
+				if _, _, err := conn.ReadMessage(); err != nil {
+					conn.Close()
+					errorchan <- nil
+					return
+				}
 			}
-		}
-	}()
+		}()
 
-	go informer.Run(stopper)
+		go informer.Run(stopper)
 
-	for {
-		select {
-		case err := <-errorchan:
-			return err
+		for {
+			select {
+			case err := <-errorchan:
+				return err
+			}
 		}
 	}
+
+	return a.RunWebsocketTask(run)
 }
 
 type SharedProvisionOpts struct {