Преглед изворни кода

fix: address multiple issues with ephemeral pod run

* add helpful prints during wait
* fix multiple race conditions:
  * if the pod is ready before we start listening to events.
  * if the pod exits before we start listening to events.
  * if k8s doesnt know about the new pod yet when we ask for a listener
Joseph Gilley пре 4 година
родитељ
комит
10ff594b2b
1 измењених фајлова са 144 додато и 81 уклоњено
  1. 144 81
      cli/cmd/run.go

+ 144 - 81
cli/cmd/run.go

@@ -2,6 +2,7 @@ package cmd
 
 import (
 	"context"
+	"errors"
 	"fmt"
 	"io"
 	"os"
@@ -15,6 +16,7 @@ import (
 	v1 "k8s.io/api/core/v1"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	"k8s.io/apimachinery/pkg/fields"
+	"k8s.io/apimachinery/pkg/watch"
 	"k8s.io/kubectl/pkg/util/term"
 
 	"k8s.io/apimachinery/pkg/runtime"
@@ -255,8 +257,10 @@ func executeRun(config *PorterRunSharedConfig, namespace, name, container string
 		Out: os.Stdout,
 		Raw: true,
 	}
+	size := t.GetSize()
+	sizeQueue := t.MonitorSize(size)
 
-	fn := func() error {
+	return t.Safe(func() error {
 		exec, err := remotecommand.NewSPDYExecutor(config.RestConf, "POST", req.URL())
 
 		if err != nil {
@@ -268,116 +272,175 @@ func executeRun(config *PorterRunSharedConfig, namespace, name, container string
 			Stdout: os.Stdout,
 			Stderr: os.Stderr,
 			Tty:    true,
+
+			TerminalSizeQueue: sizeQueue,
 		})
-	}
+	})
+}
 
-	if err := t.Safe(fn); err != nil {
+func executeRunEphemeral(config *PorterRunSharedConfig, namespace, name, container string, args []string) error {
+	existing, err := getExistingPod(config, name, namespace)
+
+	if err != nil {
 		return err
 	}
 
-	return nil
+	newPod, err := createPodFromExisting(config, existing, args)
+	podName := newPod.ObjectMeta.Name
+
+	// delete the ephemeral pod no matter what
+	defer deletePod(config, podName, namespace)
+
+	color.New(color.FgYellow).Printf("Waiting for pod %s to be ready...", podName)
+	if err = waitForPod(config, newPod); err != nil {
+		color.New(color.FgRed).Println("failed")
+		return handlePodAttachError(err, config, namespace, podName, container)
+	}
+
+	// refresh pod info for latest status
+	newPod, err = config.Clientset.CoreV1().
+		Pods(newPod.Namespace).
+		Get(context.Background(), newPod.Name, metav1.GetOptions{})
+
+	// pod exited while we were waiting.  maybe an error maybe not.
+	// we dont know if the user wanted an interactive shell or not.
+	// if it was an error the logs hopefully say so.
+	if isPodExited(newPod) {
+		color.New(color.FgGreen).Println("complete!")
+		var writtenBytes int64
+		writtenBytes, _ = pipePodLogsToStdout(config, namespace, podName, container, false)
+
+		if verbose || writtenBytes == 0 {
+			color.New(color.FgYellow).Println("Could not get logs. Pod events:\n")
+			pipeEventsToStdout(config, namespace, podName, container, false)
+		}
+		return nil
+	}
+	color.New(color.FgGreen).Println("ready!")
+
+	color.New(color.FgYellow).Println("Attempting connection to the container. If you don't see a command prompt, try pressing enter.")
+	req := config.RestClient.Post().
+		Resource("pods").
+		Name(podName).
+		Namespace("default").
+		SubResource("attach")
+
+	req.Param("stdin", "true")
+	req.Param("stdout", "true")
+	req.Param("tty", "true")
+	req.Param("container", container)
+
+	t := term.TTY{
+		In:  os.Stdin,
+		Out: os.Stdout,
+		Raw: true,
+	}
+	size := t.GetSize()
+	sizeQueue := t.MonitorSize(size)
+
+	if err = t.Safe(func() error {
+		exec, err := remotecommand.NewSPDYExecutor(config.RestConf, "POST", req.URL())
+		if err != nil {
+			return err
+		}
+		return exec.Stream(remotecommand.StreamOptions{
+			Stdin:  os.Stdin,
+			Stdout: os.Stdout,
+			Stderr: os.Stderr,
+			Tty:    true,
+
+			TerminalSizeQueue: sizeQueue,
+		})
+	}); err != nil {
+		// ugly way to catch no TTY errors, such as when running command "echo \"hello\""
+		return handlePodAttachError(err, config, namespace, podName, container)
+	}
+
+	if verbose {
+		color.New(color.FgYellow).Println("Pod events:\n")
+		pipeEventsToStdout(config, namespace, podName, container, false)
+	}
+
+	return err
 }
 
 func waitForPod(config *PorterRunSharedConfig, pod *v1.Pod) error {
-	watch, err := config.Clientset.CoreV1().Pods(pod.Namespace).Watch(context.Background(), metav1.ListOptions{
-		FieldSelector: fields.OneTermEqualSelector("metadata.name", pod.Name).String(),
-	})
+	var (
+		w   watch.Interface
+		err error
+		ok  bool
+	)
+	// immediately after creating a pod, the API may return a 404. heuristically 1
+	// second seems to be plenty.
+	watchRetries := 3
+	for i := 0; i < watchRetries; i++ {
+		selector := fields.OneTermEqualSelector("metadata.name", pod.Name).String()
+		w, err = config.Clientset.CoreV1().
+			Pods(pod.Namespace).
+			Watch(context.Background(), metav1.ListOptions{FieldSelector: selector})
+
+		if err == nil {
+			break
+		}
+		time.Sleep(time.Second)
+	}
 	if err != nil {
 		return err
 	}
-	defer watch.Stop()
+	defer w.Stop()
 	for {
 		select {
-		case evt := <-watch.ResultChan():
-			pod, ok := evt.Object.(*v1.Pod)
+		case <-time.Tick(time.Second):
+			// poll every second in case we already missed the ready event while
+			// creating the listener.
+			pod, err = config.Clientset.CoreV1().
+				Pods(pod.Namespace).
+				Get(context.Background(), pod.Name, metav1.GetOptions{})
+			if isPodReady(pod) || isPodExited(pod) {
+				return nil
+			}
+		case evt := <-w.ResultChan():
+			pod, ok = evt.Object.(*v1.Pod)
 			if !ok {
 				return fmt.Errorf("unexpected object type: %T", evt.Object)
 			}
-			ready := false
-			conditions := pod.Status.Conditions
-			for i := range conditions {
-				if conditions[i].Type == v1.PodReady {
-					ready = pod.Status.Conditions[i].Status == v1.ConditionTrue
-				}
-			}
-			if ready {
+			if isPodReady(pod) || isPodExited(pod) {
 				return nil
 			}
-		case <-time.After(time.Second * 30):
-			return fmt.Errorf("timed out waiting for pod")
+		case <-time.After(time.Second * 10):
+			return errors.New("timed out waiting for pod")
 		}
 	}
 }
 
-func executeRunEphemeral(config *PorterRunSharedConfig, namespace, name, container string, args []string) error {
-	existing, err := getExistingPod(config, name, namespace)
-
-	if err != nil {
-		return err
-	}
-
-	newPod, err := createPodFromExisting(config, existing, args)
-	podName := newPod.ObjectMeta.Name
-
-	err = waitForPod(config, newPod)
-
-	if err == nil {
-		color.New(color.FgYellow).Println("Attempting connection to the container, this may take up to 10 seconds. If you don't see a command prompt, try pressing enter.")
-		req := config.RestClient.Post().
-			Resource("pods").
-			Name(podName).
-			Namespace("default").
-			SubResource("attach")
-
-		req.Param("stdin", "true")
-		req.Param("stdout", "true")
-		req.Param("tty", "true")
-		req.Param("container", container)
-
-		t := term.TTY{
-			In:  os.Stdin,
-			Out: os.Stdout,
-			Raw: true,
+func isPodReady(pod *v1.Pod) bool {
+	ready := false
+	conditions := pod.Status.Conditions
+	for i := range conditions {
+		if conditions[i].Type == v1.PodReady {
+			ready = pod.Status.Conditions[i].Status == v1.ConditionTrue
 		}
-		size := t.GetSize()
-		sizeQueue := t.MonitorSize(size)
-		err = t.Safe(func() error {
-			exec, err := remotecommand.NewSPDYExecutor(config.RestConf, "POST", req.URL())
-			if err != nil {
-				return err
-			}
-			return exec.Stream(remotecommand.StreamOptions{
-				Stdin:             os.Stdin,
-				Stdout:            os.Stdout,
-				Stderr:            os.Stderr,
-				Tty:               true,
-				TerminalSizeQueue: sizeQueue,
-			})
-		})
 	}
+	return ready
+}
 
-	// ugly way to catch no TTY errors, such as when running command "echo \"hello\""
-	if err != nil {
-		color.New(color.FgYellow).Println("Could not open a shell to this container. Container logs:\n")
-
-		var writtenBytes int64
-
-		writtenBytes, err = pipePodLogsToStdout(config, namespace, podName, container, false)
+func isPodExited(pod *v1.Pod) bool {
+	return pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed
+}
 
-		if verbose || writtenBytes == 0 {
-			color.New(color.FgYellow).Println("Could not get logs. Pod events:\n")
+func handlePodAttachError(err error, config *PorterRunSharedConfig, namespace, podName, container string) error {
+	if verbose {
+		color.New(color.FgYellow).Printf("Error: %s\n", err)
+	}
+	color.New(color.FgYellow).Println("Could not open a shell to this container. Container logs:\n")
 
-			err = pipeEventsToStdout(config, namespace, podName, container, false)
-		}
-	} else if verbose {
-		color.New(color.FgYellow).Println("Pod events:\n")
+	var writtenBytes int64
+	writtenBytes, _ = pipePodLogsToStdout(config, namespace, podName, container, false)
 
+	if verbose || writtenBytes == 0 {
+		color.New(color.FgYellow).Println("Could not get logs. Pod events:\n")
 		pipeEventsToStdout(config, namespace, podName, container, false)
 	}
-
-	// delete the ephemeral pod
-	deletePod(config, podName, namespace)
-
 	return err
 }