run.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562
  1. package cmd
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "os"
  8. "strings"
  9. "time"
  10. "github.com/fatih/color"
  11. "github.com/porter-dev/porter/cli/cmd/api"
  12. "github.com/porter-dev/porter/cli/cmd/utils"
  13. "github.com/spf13/cobra"
  14. v1 "k8s.io/api/core/v1"
  15. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  16. "k8s.io/apimachinery/pkg/fields"
  17. "k8s.io/apimachinery/pkg/watch"
  18. "k8s.io/kubectl/pkg/util/term"
  19. "k8s.io/apimachinery/pkg/runtime"
  20. "k8s.io/apimachinery/pkg/runtime/schema"
  21. "k8s.io/client-go/kubernetes"
  22. "k8s.io/client-go/rest"
  23. "k8s.io/client-go/tools/clientcmd"
  24. "k8s.io/client-go/tools/remotecommand"
  25. )
  26. var namespace string
  27. var verbose bool
  28. // runCmd represents the "porter run" base command when called
  29. // without any subcommands
  30. var runCmd = &cobra.Command{
  31. Use: "run [release] -- COMMAND [args...]",
  32. Args: cobra.MinimumNArgs(2),
  33. Short: "Runs a command inside a connected cluster container.",
  34. Run: func(cmd *cobra.Command, args []string) {
  35. err := checkLoginAndRun(args, run)
  36. if err != nil {
  37. os.Exit(1)
  38. }
  39. },
  40. }
  41. var existingPod bool
  42. func init() {
  43. rootCmd.AddCommand(runCmd)
  44. runCmd.PersistentFlags().StringVar(
  45. &namespace,
  46. "namespace",
  47. "default",
  48. "namespace of release to connect to",
  49. )
  50. runCmd.PersistentFlags().BoolVarP(
  51. &existingPod,
  52. "existing_pod",
  53. "e",
  54. false,
  55. "whether to connect to an existing pod",
  56. )
  57. runCmd.PersistentFlags().BoolVarP(
  58. &verbose,
  59. "verbose",
  60. "v",
  61. false,
  62. "whether to print verbose output",
  63. )
  64. }
  65. func run(_ *api.AuthCheckResponse, client *api.Client, args []string) error {
  66. color.New(color.FgGreen).Println("Running", strings.Join(args[1:], " "), "for release", args[0])
  67. podsSimple, err := getPods(client, namespace, args[0])
  68. if err != nil {
  69. return fmt.Errorf("Could not retrieve list of pods: %s", err.Error())
  70. }
  71. // if length of pods is 0, throw error
  72. var selectedPod podSimple
  73. if len(podsSimple) == 0 {
  74. return fmt.Errorf("At least one pod must exist in this deployment.")
  75. } else if len(podsSimple) == 1 || !existingPod {
  76. selectedPod = podsSimple[0]
  77. } else {
  78. podNames := make([]string, 0)
  79. for _, podSimple := range podsSimple {
  80. podNames = append(podNames, podSimple.Name)
  81. }
  82. selectedPodName, err := utils.PromptSelect("Select the pod:", podNames)
  83. if err != nil {
  84. return err
  85. }
  86. // find selected pod
  87. for _, podSimple := range podsSimple {
  88. if selectedPodName == podSimple.Name {
  89. selectedPod = podSimple
  90. }
  91. }
  92. }
  93. var selectedContainerName string
  94. // if the selected pod has multiple container, spawn selector
  95. if len(selectedPod.ContainerNames) == 0 {
  96. return fmt.Errorf("At least one pod must exist in this deployment.")
  97. } else if len(selectedPod.ContainerNames) == 1 {
  98. selectedContainerName = selectedPod.ContainerNames[0]
  99. } else {
  100. selectedContainer, err := utils.PromptSelect("Select the container:", selectedPod.ContainerNames)
  101. if err != nil {
  102. return err
  103. }
  104. selectedContainerName = selectedContainer
  105. }
  106. config := &PorterRunSharedConfig{
  107. Client: client,
  108. }
  109. err = config.setSharedConfig()
  110. if err != nil {
  111. return fmt.Errorf("Could not retrieve kube credentials: %s", err.Error())
  112. }
  113. if existingPod {
  114. return executeRun(config, namespace, selectedPod.Name, selectedContainerName, args[1:])
  115. }
  116. return executeRunEphemeral(config, namespace, selectedPod.Name, selectedContainerName, args[1:])
  117. }
  118. type PorterRunSharedConfig struct {
  119. Client *api.Client
  120. RestConf *rest.Config
  121. Clientset *kubernetes.Clientset
  122. RestClient *rest.RESTClient
  123. }
  124. func (p *PorterRunSharedConfig) setSharedConfig() error {
  125. pID := config.Project
  126. cID := config.Cluster
  127. kubeResp, err := p.Client.GetKubeconfig(context.TODO(), pID, cID)
  128. if err != nil {
  129. return err
  130. }
  131. kubeBytes := kubeResp.Kubeconfig
  132. cmdConf, err := clientcmd.NewClientConfigFromBytes(kubeBytes)
  133. if err != nil {
  134. return err
  135. }
  136. restConf, err := cmdConf.ClientConfig()
  137. if err != nil {
  138. return err
  139. }
  140. restConf.GroupVersion = &schema.GroupVersion{
  141. Group: "api",
  142. Version: "v1",
  143. }
  144. restConf.NegotiatedSerializer = runtime.NewSimpleNegotiatedSerializer(runtime.SerializerInfo{})
  145. p.RestConf = restConf
  146. clientset, err := kubernetes.NewForConfig(restConf)
  147. if err != nil {
  148. return err
  149. }
  150. p.Clientset = clientset
  151. restClient, err := rest.RESTClientFor(restConf)
  152. if err != nil {
  153. return err
  154. }
  155. p.RestClient = restClient
  156. return nil
  157. }
  158. type podSimple struct {
  159. Name string
  160. ContainerNames []string
  161. }
  162. func getPods(client *api.Client, namespace, releaseName string) ([]podSimple, error) {
  163. pID := config.Project
  164. cID := config.Cluster
  165. resp, err := client.GetK8sAllPods(context.TODO(), pID, cID, namespace, releaseName)
  166. if err != nil {
  167. return nil, err
  168. }
  169. res := make([]podSimple, 0)
  170. for _, pod := range resp {
  171. containerNames := make([]string, 0)
  172. for _, container := range pod.Spec.Containers {
  173. containerNames = append(containerNames, container.Name)
  174. }
  175. res = append(res, podSimple{
  176. Name: pod.ObjectMeta.Name,
  177. ContainerNames: containerNames,
  178. })
  179. }
  180. return res, nil
  181. }
  182. func executeRun(config *PorterRunSharedConfig, namespace, name, container string, args []string) error {
  183. req := config.RestClient.Post().
  184. Resource("pods").
  185. Name(name).
  186. Namespace(namespace).
  187. SubResource("exec")
  188. for _, arg := range args {
  189. req.Param("command", arg)
  190. }
  191. req.Param("stdin", "true")
  192. req.Param("stdout", "true")
  193. req.Param("tty", "true")
  194. req.Param("container", container)
  195. t := term.TTY{
  196. In: os.Stdin,
  197. Out: os.Stdout,
  198. Raw: true,
  199. }
  200. size := t.GetSize()
  201. sizeQueue := t.MonitorSize(size)
  202. return t.Safe(func() error {
  203. exec, err := remotecommand.NewSPDYExecutor(config.RestConf, "POST", req.URL())
  204. if err != nil {
  205. return err
  206. }
  207. return exec.Stream(remotecommand.StreamOptions{
  208. Stdin: os.Stdin,
  209. Stdout: os.Stdout,
  210. Stderr: os.Stderr,
  211. Tty: true,
  212. TerminalSizeQueue: sizeQueue,
  213. })
  214. })
  215. }
  216. func executeRunEphemeral(config *PorterRunSharedConfig, namespace, name, container string, args []string) error {
  217. existing, err := getExistingPod(config, name, namespace)
  218. if err != nil {
  219. return err
  220. }
  221. newPod, err := createPodFromExisting(config, existing, args)
  222. podName := newPod.ObjectMeta.Name
  223. // delete the ephemeral pod no matter what
  224. defer deletePod(config, podName, namespace)
  225. color.New(color.FgYellow).Printf("Waiting for pod %s to be ready...", podName)
  226. if err = waitForPod(config, newPod); err != nil {
  227. color.New(color.FgRed).Println("failed")
  228. return handlePodAttachError(err, config, namespace, podName, container)
  229. }
  230. // refresh pod info for latest status
  231. newPod, err = config.Clientset.CoreV1().
  232. Pods(newPod.Namespace).
  233. Get(context.Background(), newPod.Name, metav1.GetOptions{})
  234. // pod exited while we were waiting. maybe an error maybe not.
  235. // we dont know if the user wanted an interactive shell or not.
  236. // if it was an error the logs hopefully say so.
  237. if isPodExited(newPod) {
  238. color.New(color.FgGreen).Println("complete!")
  239. var writtenBytes int64
  240. writtenBytes, _ = pipePodLogsToStdout(config, namespace, podName, container, false)
  241. if verbose || writtenBytes == 0 {
  242. color.New(color.FgYellow).Println("Could not get logs. Pod events:\n")
  243. pipeEventsToStdout(config, namespace, podName, container, false)
  244. }
  245. return nil
  246. }
  247. color.New(color.FgGreen).Println("ready!")
  248. color.New(color.FgYellow).Println("Attempting connection to the container. If you don't see a command prompt, try pressing enter.")
  249. req := config.RestClient.Post().
  250. Resource("pods").
  251. Name(podName).
  252. Namespace("default").
  253. SubResource("attach")
  254. req.Param("stdin", "true")
  255. req.Param("stdout", "true")
  256. req.Param("tty", "true")
  257. req.Param("container", container)
  258. t := term.TTY{
  259. In: os.Stdin,
  260. Out: os.Stdout,
  261. Raw: true,
  262. }
  263. size := t.GetSize()
  264. sizeQueue := t.MonitorSize(size)
  265. if err = t.Safe(func() error {
  266. exec, err := remotecommand.NewSPDYExecutor(config.RestConf, "POST", req.URL())
  267. if err != nil {
  268. return err
  269. }
  270. return exec.Stream(remotecommand.StreamOptions{
  271. Stdin: os.Stdin,
  272. Stdout: os.Stdout,
  273. Stderr: os.Stderr,
  274. Tty: true,
  275. TerminalSizeQueue: sizeQueue,
  276. })
  277. }); err != nil {
  278. // ugly way to catch no TTY errors, such as when running command "echo \"hello\""
  279. return handlePodAttachError(err, config, namespace, podName, container)
  280. }
  281. if verbose {
  282. color.New(color.FgYellow).Println("Pod events:\n")
  283. pipeEventsToStdout(config, namespace, podName, container, false)
  284. }
  285. return err
  286. }
  287. func waitForPod(config *PorterRunSharedConfig, pod *v1.Pod) error {
  288. var (
  289. w watch.Interface
  290. err error
  291. ok bool
  292. )
  293. // immediately after creating a pod, the API may return a 404. heuristically 1
  294. // second seems to be plenty.
  295. watchRetries := 3
  296. for i := 0; i < watchRetries; i++ {
  297. selector := fields.OneTermEqualSelector("metadata.name", pod.Name).String()
  298. w, err = config.Clientset.CoreV1().
  299. Pods(pod.Namespace).
  300. Watch(context.Background(), metav1.ListOptions{FieldSelector: selector})
  301. if err == nil {
  302. break
  303. }
  304. time.Sleep(time.Second)
  305. }
  306. if err != nil {
  307. return err
  308. }
  309. defer w.Stop()
  310. for {
  311. select {
  312. case <-time.Tick(time.Second):
  313. // poll every second in case we already missed the ready event while
  314. // creating the listener.
  315. pod, err = config.Clientset.CoreV1().
  316. Pods(pod.Namespace).
  317. Get(context.Background(), pod.Name, metav1.GetOptions{})
  318. if isPodReady(pod) || isPodExited(pod) {
  319. return nil
  320. }
  321. case evt := <-w.ResultChan():
  322. pod, ok = evt.Object.(*v1.Pod)
  323. if !ok {
  324. return fmt.Errorf("unexpected object type: %T", evt.Object)
  325. }
  326. if isPodReady(pod) || isPodExited(pod) {
  327. return nil
  328. }
  329. case <-time.After(time.Second * 10):
  330. return errors.New("timed out waiting for pod")
  331. }
  332. }
  333. }
  334. func isPodReady(pod *v1.Pod) bool {
  335. ready := false
  336. conditions := pod.Status.Conditions
  337. for i := range conditions {
  338. if conditions[i].Type == v1.PodReady {
  339. ready = pod.Status.Conditions[i].Status == v1.ConditionTrue
  340. }
  341. }
  342. return ready
  343. }
  344. func isPodExited(pod *v1.Pod) bool {
  345. return pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed
  346. }
  347. func handlePodAttachError(err error, config *PorterRunSharedConfig, namespace, podName, container string) error {
  348. if verbose {
  349. color.New(color.FgYellow).Printf("Error: %s\n", err)
  350. }
  351. color.New(color.FgYellow).Println("Could not open a shell to this container. Container logs:\n")
  352. var writtenBytes int64
  353. writtenBytes, _ = pipePodLogsToStdout(config, namespace, podName, container, false)
  354. if verbose || writtenBytes == 0 {
  355. color.New(color.FgYellow).Println("Could not get logs. Pod events:\n")
  356. pipeEventsToStdout(config, namespace, podName, container, false)
  357. }
  358. return err
  359. }
  360. func pipePodLogsToStdout(config *PorterRunSharedConfig, namespace, name, container string, follow bool) (int64, error) {
  361. podLogOpts := v1.PodLogOptions{
  362. Container: container,
  363. Follow: follow,
  364. }
  365. req := config.Clientset.CoreV1().Pods(namespace).GetLogs(name, &podLogOpts)
  366. podLogs, err := req.Stream(
  367. context.Background(),
  368. )
  369. if err != nil {
  370. return 0, err
  371. }
  372. defer podLogs.Close()
  373. return io.Copy(os.Stdout, podLogs)
  374. }
  375. func pipeEventsToStdout(config *PorterRunSharedConfig, namespace, name, container string, follow bool) error {
  376. // update the config in case the operation has taken longer than token expiry time
  377. config.setSharedConfig()
  378. // creates the clientset
  379. resp, err := config.Clientset.CoreV1().Events(namespace).List(
  380. context.TODO(),
  381. metav1.ListOptions{
  382. FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=%s", name, namespace),
  383. },
  384. )
  385. if err != nil {
  386. return err
  387. }
  388. for _, event := range resp.Items {
  389. color.New(color.FgRed).Println(event.Message)
  390. }
  391. return nil
  392. }
  393. func getExistingPod(config *PorterRunSharedConfig, name, namespace string) (*v1.Pod, error) {
  394. return config.Clientset.CoreV1().Pods(namespace).Get(
  395. context.Background(),
  396. name,
  397. metav1.GetOptions{},
  398. )
  399. }
  400. func deletePod(config *PorterRunSharedConfig, name, namespace string) error {
  401. // update the config in case the operation has taken longer than token expiry time
  402. config.setSharedConfig()
  403. err := config.Clientset.CoreV1().Pods(namespace).Delete(
  404. context.Background(),
  405. name,
  406. metav1.DeleteOptions{},
  407. )
  408. if err != nil {
  409. color.New(color.FgRed).Println("Could not delete ephemeral pod: %s", err.Error())
  410. return err
  411. }
  412. color.New(color.FgGreen).Println("Sucessfully deleted ephemeral pod")
  413. return nil
  414. }
  415. func createPodFromExisting(config *PorterRunSharedConfig, existing *v1.Pod, args []string) (*v1.Pod, error) {
  416. newPod := existing.DeepCopy()
  417. // only copy the pod spec, overwrite metadata
  418. newPod.ObjectMeta = metav1.ObjectMeta{
  419. Name: strings.ToLower(fmt.Sprintf("%s-copy-%s", existing.ObjectMeta.Name, utils.String(4))),
  420. Namespace: existing.ObjectMeta.Namespace,
  421. }
  422. newPod.Status = v1.PodStatus{}
  423. // only use "primary" container
  424. newPod.Spec.Containers = newPod.Spec.Containers[0:1]
  425. // set restart policy to never
  426. newPod.Spec.RestartPolicy = v1.RestartPolicyNever
  427. // change the command in the pod to the passed in pod command
  428. cmdRoot := args[0]
  429. cmdArgs := make([]string, 0)
  430. if len(args) > 1 {
  431. cmdArgs = args[1:]
  432. }
  433. newPod.Spec.Containers[0].Command = []string{cmdRoot}
  434. newPod.Spec.Containers[0].Args = cmdArgs
  435. newPod.Spec.Containers[0].TTY = true
  436. newPod.Spec.Containers[0].Stdin = true
  437. newPod.Spec.Containers[0].StdinOnce = true
  438. newPod.Spec.NodeName = ""
  439. // remove health checks and probes
  440. newPod.Spec.Containers[0].LivenessProbe = nil
  441. newPod.Spec.Containers[0].ReadinessProbe = nil
  442. newPod.Spec.Containers[0].StartupProbe = nil
  443. // create the pod and return it
  444. return config.Clientset.CoreV1().Pods(existing.ObjectMeta.Namespace).Create(
  445. context.Background(),
  446. newPod,
  447. metav1.CreateOptions{},
  448. )
  449. }