run.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565
  1. package cmd
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "os"
  8. "strings"
  9. "time"
  10. "github.com/fatih/color"
  11. api "github.com/porter-dev/porter/api/client"
  12. "github.com/porter-dev/porter/api/types"
  13. "github.com/porter-dev/porter/cli/cmd/utils"
  14. "github.com/spf13/cobra"
  15. v1 "k8s.io/api/core/v1"
  16. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  17. "k8s.io/apimachinery/pkg/fields"
  18. "k8s.io/apimachinery/pkg/watch"
  19. "k8s.io/kubectl/pkg/util/term"
  20. "k8s.io/apimachinery/pkg/runtime"
  21. "k8s.io/apimachinery/pkg/runtime/schema"
  22. "k8s.io/client-go/kubernetes"
  23. "k8s.io/client-go/rest"
  24. "k8s.io/client-go/tools/clientcmd"
  25. "k8s.io/client-go/tools/remotecommand"
  26. )
  27. var namespace string
  28. var verbose bool
  29. // runCmd represents the "porter run" base command when called
  30. // without any subcommands
  31. var runCmd = &cobra.Command{
  32. Use: "run [release] -- COMMAND [args...]",
  33. Args: cobra.MinimumNArgs(2),
  34. Short: "Runs a command inside a connected cluster container.",
  35. Run: func(cmd *cobra.Command, args []string) {
  36. err := checkLoginAndRun(args, run)
  37. if err != nil {
  38. os.Exit(1)
  39. }
  40. },
  41. }
  42. var existingPod bool
  43. func init() {
  44. rootCmd.AddCommand(runCmd)
  45. runCmd.PersistentFlags().StringVar(
  46. &namespace,
  47. "namespace",
  48. "default",
  49. "namespace of release to connect to",
  50. )
  51. runCmd.PersistentFlags().BoolVarP(
  52. &existingPod,
  53. "existing_pod",
  54. "e",
  55. false,
  56. "whether to connect to an existing pod",
  57. )
  58. runCmd.PersistentFlags().BoolVarP(
  59. &verbose,
  60. "verbose",
  61. "v",
  62. false,
  63. "whether to print verbose output",
  64. )
  65. }
  66. func run(_ *types.GetAuthenticatedUserResponse, client *api.Client, args []string) error {
  67. color.New(color.FgGreen).Println("Running", strings.Join(args[1:], " "), "for release", args[0])
  68. podsSimple, err := getPods(client, namespace, args[0])
  69. if err != nil {
  70. return fmt.Errorf("Could not retrieve list of pods: %s", err.Error())
  71. }
  72. // if length of pods is 0, throw error
  73. var selectedPod podSimple
  74. if len(podsSimple) == 0 {
  75. return fmt.Errorf("At least one pod must exist in this deployment.")
  76. } else if len(podsSimple) == 1 || !existingPod {
  77. selectedPod = podsSimple[0]
  78. } else {
  79. podNames := make([]string, 0)
  80. for _, podSimple := range podsSimple {
  81. podNames = append(podNames, podSimple.Name)
  82. }
  83. selectedPodName, err := utils.PromptSelect("Select the pod:", podNames)
  84. if err != nil {
  85. return err
  86. }
  87. // find selected pod
  88. for _, podSimple := range podsSimple {
  89. if selectedPodName == podSimple.Name {
  90. selectedPod = podSimple
  91. }
  92. }
  93. }
  94. var selectedContainerName string
  95. // if the selected pod has multiple container, spawn selector
  96. if len(selectedPod.ContainerNames) == 0 {
  97. return fmt.Errorf("At least one pod must exist in this deployment.")
  98. } else if len(selectedPod.ContainerNames) == 1 {
  99. selectedContainerName = selectedPod.ContainerNames[0]
  100. } else {
  101. selectedContainer, err := utils.PromptSelect("Select the container:", selectedPod.ContainerNames)
  102. if err != nil {
  103. return err
  104. }
  105. selectedContainerName = selectedContainer
  106. }
  107. config := &PorterRunSharedConfig{
  108. Client: client,
  109. }
  110. err = config.setSharedConfig()
  111. if err != nil {
  112. return fmt.Errorf("Could not retrieve kube credentials: %s", err.Error())
  113. }
  114. if existingPod {
  115. return executeRun(config, namespace, selectedPod.Name, selectedContainerName, args[1:])
  116. }
  117. return executeRunEphemeral(config, namespace, selectedPod.Name, selectedContainerName, args[1:])
  118. }
  119. type PorterRunSharedConfig struct {
  120. Client *api.Client
  121. RestConf *rest.Config
  122. Clientset *kubernetes.Clientset
  123. RestClient *rest.RESTClient
  124. }
  125. func (p *PorterRunSharedConfig) setSharedConfig() error {
  126. pID := config.Project
  127. cID := config.Cluster
  128. kubeResp, err := p.Client.GetKubeconfig(context.TODO(), pID, cID)
  129. if err != nil {
  130. return err
  131. }
  132. kubeBytes := kubeResp.Kubeconfig
  133. cmdConf, err := clientcmd.NewClientConfigFromBytes(kubeBytes)
  134. if err != nil {
  135. return err
  136. }
  137. restConf, err := cmdConf.ClientConfig()
  138. if err != nil {
  139. return err
  140. }
  141. restConf.GroupVersion = &schema.GroupVersion{
  142. Group: "api",
  143. Version: "v1",
  144. }
  145. restConf.NegotiatedSerializer = runtime.NewSimpleNegotiatedSerializer(runtime.SerializerInfo{})
  146. p.RestConf = restConf
  147. clientset, err := kubernetes.NewForConfig(restConf)
  148. if err != nil {
  149. return err
  150. }
  151. p.Clientset = clientset
  152. restClient, err := rest.RESTClientFor(restConf)
  153. if err != nil {
  154. return err
  155. }
  156. p.RestClient = restClient
  157. return nil
  158. }
  159. type podSimple struct {
  160. Name string
  161. ContainerNames []string
  162. }
  163. func getPods(client *api.Client, namespace, releaseName string) ([]podSimple, error) {
  164. pID := config.Project
  165. cID := config.Cluster
  166. resp, err := client.GetK8sAllPods(context.TODO(), pID, cID, namespace, releaseName)
  167. if err != nil {
  168. return nil, err
  169. }
  170. pods := *resp
  171. res := make([]podSimple, 0)
  172. for _, pod := range pods {
  173. containerNames := make([]string, 0)
  174. for _, container := range pod.Spec.Containers {
  175. containerNames = append(containerNames, container.Name)
  176. }
  177. res = append(res, podSimple{
  178. Name: pod.ObjectMeta.Name,
  179. ContainerNames: containerNames,
  180. })
  181. }
  182. return res, nil
  183. }
  184. func executeRun(config *PorterRunSharedConfig, namespace, name, container string, args []string) error {
  185. req := config.RestClient.Post().
  186. Resource("pods").
  187. Name(name).
  188. Namespace(namespace).
  189. SubResource("exec")
  190. for _, arg := range args {
  191. req.Param("command", arg)
  192. }
  193. req.Param("stdin", "true")
  194. req.Param("stdout", "true")
  195. req.Param("tty", "true")
  196. req.Param("container", container)
  197. t := term.TTY{
  198. In: os.Stdin,
  199. Out: os.Stdout,
  200. Raw: true,
  201. }
  202. size := t.GetSize()
  203. sizeQueue := t.MonitorSize(size)
  204. return t.Safe(func() error {
  205. exec, err := remotecommand.NewSPDYExecutor(config.RestConf, "POST", req.URL())
  206. if err != nil {
  207. return err
  208. }
  209. return exec.Stream(remotecommand.StreamOptions{
  210. Stdin: os.Stdin,
  211. Stdout: os.Stdout,
  212. Stderr: os.Stderr,
  213. Tty: true,
  214. TerminalSizeQueue: sizeQueue,
  215. })
  216. })
  217. }
  218. func executeRunEphemeral(config *PorterRunSharedConfig, namespace, name, container string, args []string) error {
  219. existing, err := getExistingPod(config, name, namespace)
  220. if err != nil {
  221. return err
  222. }
  223. newPod, err := createPodFromExisting(config, existing, args)
  224. podName := newPod.ObjectMeta.Name
  225. // delete the ephemeral pod no matter what
  226. defer deletePod(config, podName, namespace)
  227. color.New(color.FgYellow).Printf("Waiting for pod %s to be ready...", podName)
  228. if err = waitForPod(config, newPod); err != nil {
  229. color.New(color.FgRed).Println("failed")
  230. return handlePodAttachError(err, config, namespace, podName, container)
  231. }
  232. // refresh pod info for latest status
  233. newPod, err = config.Clientset.CoreV1().
  234. Pods(newPod.Namespace).
  235. Get(context.Background(), newPod.Name, metav1.GetOptions{})
  236. // pod exited while we were waiting. maybe an error maybe not.
  237. // we dont know if the user wanted an interactive shell or not.
  238. // if it was an error the logs hopefully say so.
  239. if isPodExited(newPod) {
  240. color.New(color.FgGreen).Println("complete!")
  241. var writtenBytes int64
  242. writtenBytes, _ = pipePodLogsToStdout(config, namespace, podName, container, false)
  243. if verbose || writtenBytes == 0 {
  244. color.New(color.FgYellow).Println("Could not get logs. Pod events:")
  245. pipeEventsToStdout(config, namespace, podName, container, false)
  246. }
  247. return nil
  248. }
  249. color.New(color.FgGreen).Println("ready!")
  250. color.New(color.FgYellow).Println("Attempting connection to the container. If you don't see a command prompt, try pressing enter.")
  251. req := config.RestClient.Post().
  252. Resource("pods").
  253. Name(podName).
  254. Namespace(namespace).
  255. SubResource("attach")
  256. req.Param("stdin", "true")
  257. req.Param("stdout", "true")
  258. req.Param("tty", "true")
  259. req.Param("container", container)
  260. t := term.TTY{
  261. In: os.Stdin,
  262. Out: os.Stdout,
  263. Raw: true,
  264. }
  265. size := t.GetSize()
  266. sizeQueue := t.MonitorSize(size)
  267. if err = t.Safe(func() error {
  268. exec, err := remotecommand.NewSPDYExecutor(config.RestConf, "POST", req.URL())
  269. if err != nil {
  270. return err
  271. }
  272. return exec.Stream(remotecommand.StreamOptions{
  273. Stdin: os.Stdin,
  274. Stdout: os.Stdout,
  275. Stderr: os.Stderr,
  276. Tty: true,
  277. TerminalSizeQueue: sizeQueue,
  278. })
  279. }); err != nil {
  280. // ugly way to catch no TTY errors, such as when running command "echo \"hello\""
  281. return handlePodAttachError(err, config, namespace, podName, container)
  282. }
  283. if verbose {
  284. color.New(color.FgYellow).Println("Pod events:")
  285. pipeEventsToStdout(config, namespace, podName, container, false)
  286. }
  287. return err
  288. }
  289. func waitForPod(config *PorterRunSharedConfig, pod *v1.Pod) error {
  290. var (
  291. w watch.Interface
  292. err error
  293. ok bool
  294. )
  295. // immediately after creating a pod, the API may return a 404. heuristically 1
  296. // second seems to be plenty.
  297. watchRetries := 3
  298. for i := 0; i < watchRetries; i++ {
  299. selector := fields.OneTermEqualSelector("metadata.name", pod.Name).String()
  300. w, err = config.Clientset.CoreV1().
  301. Pods(pod.Namespace).
  302. Watch(context.Background(), metav1.ListOptions{FieldSelector: selector})
  303. if err == nil {
  304. break
  305. }
  306. time.Sleep(time.Second)
  307. }
  308. if err != nil {
  309. return err
  310. }
  311. defer w.Stop()
  312. for {
  313. select {
  314. case <-time.Tick(time.Second):
  315. // poll every second in case we already missed the ready event while
  316. // creating the listener.
  317. pod, err = config.Clientset.CoreV1().
  318. Pods(pod.Namespace).
  319. Get(context.Background(), pod.Name, metav1.GetOptions{})
  320. if isPodReady(pod) || isPodExited(pod) {
  321. return nil
  322. }
  323. case evt := <-w.ResultChan():
  324. pod, ok = evt.Object.(*v1.Pod)
  325. if !ok {
  326. return fmt.Errorf("unexpected object type: %T", evt.Object)
  327. }
  328. if isPodReady(pod) || isPodExited(pod) {
  329. return nil
  330. }
  331. case <-time.After(time.Second * 10):
  332. return errors.New("timed out waiting for pod")
  333. }
  334. }
  335. }
  336. func isPodReady(pod *v1.Pod) bool {
  337. ready := false
  338. conditions := pod.Status.Conditions
  339. for i := range conditions {
  340. if conditions[i].Type == v1.PodReady {
  341. ready = pod.Status.Conditions[i].Status == v1.ConditionTrue
  342. }
  343. }
  344. return ready
  345. }
  346. func isPodExited(pod *v1.Pod) bool {
  347. return pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed
  348. }
  349. func handlePodAttachError(err error, config *PorterRunSharedConfig, namespace, podName, container string) error {
  350. if verbose {
  351. color.New(color.FgYellow).Printf("Error: %s\n", err)
  352. }
  353. color.New(color.FgYellow).Println("Could not open a shell to this container. Container logs:")
  354. var writtenBytes int64
  355. writtenBytes, _ = pipePodLogsToStdout(config, namespace, podName, container, false)
  356. if verbose || writtenBytes == 0 {
  357. color.New(color.FgYellow).Println("Could not get logs. Pod events:")
  358. pipeEventsToStdout(config, namespace, podName, container, false)
  359. }
  360. return err
  361. }
  362. func pipePodLogsToStdout(config *PorterRunSharedConfig, namespace, name, container string, follow bool) (int64, error) {
  363. podLogOpts := v1.PodLogOptions{
  364. Container: container,
  365. Follow: follow,
  366. }
  367. req := config.Clientset.CoreV1().Pods(namespace).GetLogs(name, &podLogOpts)
  368. podLogs, err := req.Stream(
  369. context.Background(),
  370. )
  371. if err != nil {
  372. return 0, err
  373. }
  374. defer podLogs.Close()
  375. return io.Copy(os.Stdout, podLogs)
  376. }
  377. func pipeEventsToStdout(config *PorterRunSharedConfig, namespace, name, container string, follow bool) error {
  378. // update the config in case the operation has taken longer than token expiry time
  379. config.setSharedConfig()
  380. // creates the clientset
  381. resp, err := config.Clientset.CoreV1().Events(namespace).List(
  382. context.TODO(),
  383. metav1.ListOptions{
  384. FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=%s", name, namespace),
  385. },
  386. )
  387. if err != nil {
  388. return err
  389. }
  390. for _, event := range resp.Items {
  391. color.New(color.FgRed).Println(event.Message)
  392. }
  393. return nil
  394. }
  395. func getExistingPod(config *PorterRunSharedConfig, name, namespace string) (*v1.Pod, error) {
  396. return config.Clientset.CoreV1().Pods(namespace).Get(
  397. context.Background(),
  398. name,
  399. metav1.GetOptions{},
  400. )
  401. }
  402. func deletePod(config *PorterRunSharedConfig, name, namespace string) error {
  403. // update the config in case the operation has taken longer than token expiry time
  404. config.setSharedConfig()
  405. err := config.Clientset.CoreV1().Pods(namespace).Delete(
  406. context.Background(),
  407. name,
  408. metav1.DeleteOptions{},
  409. )
  410. if err != nil {
  411. color.New(color.FgRed).Printf("Could not delete ephemeral pod: %s\n", err.Error())
  412. return err
  413. }
  414. color.New(color.FgGreen).Println("Sucessfully deleted ephemeral pod")
  415. return nil
  416. }
  417. func createPodFromExisting(config *PorterRunSharedConfig, existing *v1.Pod, args []string) (*v1.Pod, error) {
  418. newPod := existing.DeepCopy()
  419. // only copy the pod spec, overwrite metadata
  420. newPod.ObjectMeta = metav1.ObjectMeta{
  421. Name: strings.ToLower(fmt.Sprintf("%s-copy-%s", existing.ObjectMeta.Name, utils.String(4))),
  422. Namespace: existing.ObjectMeta.Namespace,
  423. }
  424. newPod.Status = v1.PodStatus{}
  425. // only use "primary" container
  426. newPod.Spec.Containers = newPod.Spec.Containers[0:1]
  427. // set restart policy to never
  428. newPod.Spec.RestartPolicy = v1.RestartPolicyNever
  429. // change the command in the pod to the passed in pod command
  430. cmdRoot := args[0]
  431. cmdArgs := make([]string, 0)
  432. if len(args) > 1 {
  433. cmdArgs = args[1:]
  434. }
  435. newPod.Spec.Containers[0].Command = []string{cmdRoot}
  436. newPod.Spec.Containers[0].Args = cmdArgs
  437. newPod.Spec.Containers[0].TTY = true
  438. newPod.Spec.Containers[0].Stdin = true
  439. newPod.Spec.Containers[0].StdinOnce = true
  440. newPod.Spec.NodeName = ""
  441. // remove health checks and probes
  442. newPod.Spec.Containers[0].LivenessProbe = nil
  443. newPod.Spec.Containers[0].ReadinessProbe = nil
  444. newPod.Spec.Containers[0].StartupProbe = nil
  445. // create the pod and return it
  446. return config.Clientset.CoreV1().Pods(existing.ObjectMeta.Namespace).Create(
  447. context.Background(),
  448. newPod,
  449. metav1.CreateOptions{},
  450. )
  451. }