run.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499
  1. package cmd
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "os"
  7. "strings"
  8. "time"
  9. "github.com/fatih/color"
  10. "github.com/porter-dev/porter/cli/cmd/api"
  11. "github.com/porter-dev/porter/cli/cmd/utils"
  12. "github.com/spf13/cobra"
  13. v1 "k8s.io/api/core/v1"
  14. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  15. "k8s.io/apimachinery/pkg/fields"
  16. "k8s.io/kubectl/pkg/util/term"
  17. "k8s.io/apimachinery/pkg/runtime"
  18. "k8s.io/apimachinery/pkg/runtime/schema"
  19. "k8s.io/client-go/kubernetes"
  20. "k8s.io/client-go/rest"
  21. "k8s.io/client-go/tools/clientcmd"
  22. "k8s.io/client-go/tools/remotecommand"
  23. )
  24. var namespace string
  25. var verbose bool
  26. // runCmd represents the "porter run" base command when called
  27. // without any subcommands
  28. var runCmd = &cobra.Command{
  29. Use: "run [release] -- COMMAND [args...]",
  30. Args: cobra.MinimumNArgs(2),
  31. Short: "Runs a command inside a connected cluster container.",
  32. Run: func(cmd *cobra.Command, args []string) {
  33. err := checkLoginAndRun(args, run)
  34. if err != nil {
  35. os.Exit(1)
  36. }
  37. },
  38. }
  39. var existingPod bool
  40. func init() {
  41. rootCmd.AddCommand(runCmd)
  42. runCmd.PersistentFlags().StringVar(
  43. &namespace,
  44. "namespace",
  45. "default",
  46. "namespace of release to connect to",
  47. )
  48. runCmd.PersistentFlags().BoolVarP(
  49. &existingPod,
  50. "existing_pod",
  51. "e",
  52. false,
  53. "whether to connect to an existing pod",
  54. )
  55. runCmd.PersistentFlags().BoolVarP(
  56. &verbose,
  57. "verbose",
  58. "v",
  59. false,
  60. "whether to print verbose output",
  61. )
  62. }
  63. func run(_ *api.AuthCheckResponse, client *api.Client, args []string) error {
  64. color.New(color.FgGreen).Println("Running", strings.Join(args[1:], " "), "for release", args[0])
  65. podsSimple, err := getPods(client, namespace, args[0])
  66. if err != nil {
  67. return fmt.Errorf("Could not retrieve list of pods: %s", err.Error())
  68. }
  69. // if length of pods is 0, throw error
  70. var selectedPod podSimple
  71. if len(podsSimple) == 0 {
  72. return fmt.Errorf("At least one pod must exist in this deployment.")
  73. } else if len(podsSimple) == 1 || !existingPod {
  74. selectedPod = podsSimple[0]
  75. } else {
  76. podNames := make([]string, 0)
  77. for _, podSimple := range podsSimple {
  78. podNames = append(podNames, podSimple.Name)
  79. }
  80. selectedPodName, err := utils.PromptSelect("Select the pod:", podNames)
  81. if err != nil {
  82. return err
  83. }
  84. // find selected pod
  85. for _, podSimple := range podsSimple {
  86. if selectedPodName == podSimple.Name {
  87. selectedPod = podSimple
  88. }
  89. }
  90. }
  91. var selectedContainerName string
  92. // if the selected pod has multiple container, spawn selector
  93. if len(selectedPod.ContainerNames) == 0 {
  94. return fmt.Errorf("At least one pod must exist in this deployment.")
  95. } else if len(selectedPod.ContainerNames) == 1 {
  96. selectedContainerName = selectedPod.ContainerNames[0]
  97. } else {
  98. selectedContainer, err := utils.PromptSelect("Select the container:", selectedPod.ContainerNames)
  99. if err != nil {
  100. return err
  101. }
  102. selectedContainerName = selectedContainer
  103. }
  104. config := &PorterRunSharedConfig{
  105. Client: client,
  106. }
  107. err = config.setSharedConfig()
  108. if err != nil {
  109. return fmt.Errorf("Could not retrieve kube credentials: %s", err.Error())
  110. }
  111. if existingPod {
  112. return executeRun(config, namespace, selectedPod.Name, selectedContainerName, args[1:])
  113. }
  114. return executeRunEphemeral(config, namespace, selectedPod.Name, selectedContainerName, args[1:])
  115. }
  116. type PorterRunSharedConfig struct {
  117. Client *api.Client
  118. RestConf *rest.Config
  119. Clientset *kubernetes.Clientset
  120. RestClient *rest.RESTClient
  121. }
  122. func (p *PorterRunSharedConfig) setSharedConfig() error {
  123. pID := config.Project
  124. cID := config.Cluster
  125. kubeResp, err := p.Client.GetKubeconfig(context.TODO(), pID, cID)
  126. if err != nil {
  127. return err
  128. }
  129. kubeBytes := kubeResp.Kubeconfig
  130. cmdConf, err := clientcmd.NewClientConfigFromBytes(kubeBytes)
  131. if err != nil {
  132. return err
  133. }
  134. restConf, err := cmdConf.ClientConfig()
  135. if err != nil {
  136. return err
  137. }
  138. restConf.GroupVersion = &schema.GroupVersion{
  139. Group: "api",
  140. Version: "v1",
  141. }
  142. restConf.NegotiatedSerializer = runtime.NewSimpleNegotiatedSerializer(runtime.SerializerInfo{})
  143. p.RestConf = restConf
  144. clientset, err := kubernetes.NewForConfig(restConf)
  145. if err != nil {
  146. return err
  147. }
  148. p.Clientset = clientset
  149. restClient, err := rest.RESTClientFor(restConf)
  150. if err != nil {
  151. return err
  152. }
  153. p.RestClient = restClient
  154. return nil
  155. }
  156. type podSimple struct {
  157. Name string
  158. ContainerNames []string
  159. }
  160. func getPods(client *api.Client, namespace, releaseName string) ([]podSimple, error) {
  161. pID := config.Project
  162. cID := config.Cluster
  163. resp, err := client.GetK8sAllPods(context.TODO(), pID, cID, namespace, releaseName)
  164. if err != nil {
  165. return nil, err
  166. }
  167. res := make([]podSimple, 0)
  168. for _, pod := range resp {
  169. containerNames := make([]string, 0)
  170. for _, container := range pod.Spec.Containers {
  171. containerNames = append(containerNames, container.Name)
  172. }
  173. res = append(res, podSimple{
  174. Name: pod.ObjectMeta.Name,
  175. ContainerNames: containerNames,
  176. })
  177. }
  178. return res, nil
  179. }
  180. func executeRun(config *PorterRunSharedConfig, namespace, name, container string, args []string) error {
  181. req := config.RestClient.Post().
  182. Resource("pods").
  183. Name(name).
  184. Namespace(namespace).
  185. SubResource("exec")
  186. for _, arg := range args {
  187. req.Param("command", arg)
  188. }
  189. req.Param("stdin", "true")
  190. req.Param("stdout", "true")
  191. req.Param("tty", "true")
  192. req.Param("container", container)
  193. t := term.TTY{
  194. In: os.Stdin,
  195. Out: os.Stdout,
  196. Raw: true,
  197. }
  198. fn := func() error {
  199. exec, err := remotecommand.NewSPDYExecutor(config.RestConf, "POST", req.URL())
  200. if err != nil {
  201. return err
  202. }
  203. return exec.Stream(remotecommand.StreamOptions{
  204. Stdin: os.Stdin,
  205. Stdout: os.Stdout,
  206. Stderr: os.Stderr,
  207. Tty: true,
  208. })
  209. }
  210. if err := t.Safe(fn); err != nil {
  211. return err
  212. }
  213. return nil
  214. }
  215. func waitForPod(config *PorterRunSharedConfig, pod *v1.Pod) error {
  216. watch, err := config.Clientset.CoreV1().Pods(pod.Namespace).Watch(context.Background(), metav1.ListOptions{
  217. FieldSelector: fields.OneTermEqualSelector("metadata.name", pod.Name).String(),
  218. })
  219. if err != nil {
  220. return err
  221. }
  222. defer watch.Stop()
  223. for {
  224. select {
  225. case evt := <-watch.ResultChan():
  226. pod, ok := evt.Object.(*v1.Pod)
  227. if !ok {
  228. return fmt.Errorf("unexpected object type: %T", evt.Object)
  229. }
  230. ready := false
  231. conditions := pod.Status.Conditions
  232. for i := range conditions {
  233. if conditions[i].Type == v1.PodReady {
  234. ready = pod.Status.Conditions[i].Status == v1.ConditionTrue
  235. }
  236. }
  237. if ready {
  238. return nil
  239. }
  240. case <-time.After(time.Second * 30):
  241. return fmt.Errorf("timed out waiting for pod")
  242. }
  243. }
  244. }
  245. func executeRunEphemeral(config *PorterRunSharedConfig, namespace, name, container string, args []string) error {
  246. existing, err := getExistingPod(config, name, namespace)
  247. if err != nil {
  248. return err
  249. }
  250. newPod, err := createPodFromExisting(config, existing, args)
  251. podName := newPod.ObjectMeta.Name
  252. err = waitForPod(config, newPod)
  253. if err == nil {
  254. color.New(color.FgYellow).Println("Attempting connection to the container, this may take up to 10 seconds. If you don't see a command prompt, try pressing enter.")
  255. req := config.RestClient.Post().
  256. Resource("pods").
  257. Name(podName).
  258. Namespace("default").
  259. SubResource("attach")
  260. req.Param("stdin", "true")
  261. req.Param("stdout", "true")
  262. req.Param("tty", "true")
  263. req.Param("container", container)
  264. t := term.TTY{
  265. In: os.Stdin,
  266. Out: os.Stdout,
  267. Raw: true,
  268. }
  269. size := t.GetSize()
  270. sizeQueue := t.MonitorSize(size)
  271. err = t.Safe(func() error {
  272. exec, err := remotecommand.NewSPDYExecutor(config.RestConf, "POST", req.URL())
  273. if err != nil {
  274. return err
  275. }
  276. return exec.Stream(remotecommand.StreamOptions{
  277. Stdin: os.Stdin,
  278. Stdout: os.Stdout,
  279. Stderr: os.Stderr,
  280. Tty: true,
  281. TerminalSizeQueue: sizeQueue,
  282. })
  283. })
  284. }
  285. // ugly way to catch no TTY errors, such as when running command "echo \"hello\""
  286. if err != nil {
  287. color.New(color.FgYellow).Println("Could not open a shell to this container. Container logs:\n")
  288. var writtenBytes int64
  289. writtenBytes, err = pipePodLogsToStdout(config, namespace, podName, container, false)
  290. if verbose || writtenBytes == 0 {
  291. color.New(color.FgYellow).Println("Could not get logs. Pod events:\n")
  292. err = pipeEventsToStdout(config, namespace, podName, container, false)
  293. }
  294. } else if verbose {
  295. color.New(color.FgYellow).Println("Pod events:\n")
  296. pipeEventsToStdout(config, namespace, podName, container, false)
  297. }
  298. // delete the ephemeral pod
  299. deletePod(config, podName, namespace)
  300. return err
  301. }
  302. func pipePodLogsToStdout(config *PorterRunSharedConfig, namespace, name, container string, follow bool) (int64, error) {
  303. podLogOpts := v1.PodLogOptions{
  304. Container: container,
  305. Follow: follow,
  306. }
  307. req := config.Clientset.CoreV1().Pods(namespace).GetLogs(name, &podLogOpts)
  308. podLogs, err := req.Stream(
  309. context.Background(),
  310. )
  311. if err != nil {
  312. return 0, err
  313. }
  314. defer podLogs.Close()
  315. return io.Copy(os.Stdout, podLogs)
  316. }
  317. func pipeEventsToStdout(config *PorterRunSharedConfig, namespace, name, container string, follow bool) error {
  318. // update the config in case the operation has taken longer than token expiry time
  319. config.setSharedConfig()
  320. // creates the clientset
  321. resp, err := config.Clientset.CoreV1().Events(namespace).List(
  322. context.TODO(),
  323. metav1.ListOptions{
  324. FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=%s", name, namespace),
  325. },
  326. )
  327. if err != nil {
  328. return err
  329. }
  330. for _, event := range resp.Items {
  331. color.New(color.FgRed).Println(event.Message)
  332. }
  333. return nil
  334. }
  335. func getExistingPod(config *PorterRunSharedConfig, name, namespace string) (*v1.Pod, error) {
  336. return config.Clientset.CoreV1().Pods(namespace).Get(
  337. context.Background(),
  338. name,
  339. metav1.GetOptions{},
  340. )
  341. }
  342. func deletePod(config *PorterRunSharedConfig, name, namespace string) error {
  343. // update the config in case the operation has taken longer than token expiry time
  344. config.setSharedConfig()
  345. err := config.Clientset.CoreV1().Pods(namespace).Delete(
  346. context.Background(),
  347. name,
  348. metav1.DeleteOptions{},
  349. )
  350. if err != nil {
  351. color.New(color.FgRed).Println("Could not delete ephemeral pod: %s", err.Error())
  352. return err
  353. }
  354. color.New(color.FgGreen).Println("Sucessfully deleted ephemeral pod")
  355. return nil
  356. }
  357. func createPodFromExisting(config *PorterRunSharedConfig, existing *v1.Pod, args []string) (*v1.Pod, error) {
  358. newPod := existing.DeepCopy()
  359. // only copy the pod spec, overwrite metadata
  360. newPod.ObjectMeta = metav1.ObjectMeta{
  361. Name: strings.ToLower(fmt.Sprintf("%s-copy-%s", existing.ObjectMeta.Name, utils.String(4))),
  362. Namespace: existing.ObjectMeta.Namespace,
  363. }
  364. newPod.Status = v1.PodStatus{}
  365. // only use "primary" container
  366. newPod.Spec.Containers = newPod.Spec.Containers[0:1]
  367. // set restart policy to never
  368. newPod.Spec.RestartPolicy = v1.RestartPolicyNever
  369. // change the command in the pod to the passed in pod command
  370. cmdRoot := args[0]
  371. cmdArgs := make([]string, 0)
  372. if len(args) > 1 {
  373. cmdArgs = args[1:]
  374. }
  375. newPod.Spec.Containers[0].Command = []string{cmdRoot}
  376. newPod.Spec.Containers[0].Args = cmdArgs
  377. newPod.Spec.Containers[0].TTY = true
  378. newPod.Spec.Containers[0].Stdin = true
  379. newPod.Spec.Containers[0].StdinOnce = true
  380. newPod.Spec.NodeName = ""
  381. // remove health checks and probes
  382. newPod.Spec.Containers[0].LivenessProbe = nil
  383. newPod.Spec.Containers[0].ReadinessProbe = nil
  384. newPod.Spec.Containers[0].StartupProbe = nil
  385. // create the pod and return it
  386. return config.Clientset.CoreV1().Pods(existing.ObjectMeta.Namespace).Create(
  387. context.Background(),
  388. newPod,
  389. metav1.CreateOptions{},
  390. )
  391. }