portforward.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279
  1. package cmd
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "net/url"
  7. "os"
  8. "os/signal"
  9. "strconv"
  10. "strings"
  11. "time"
  12. "github.com/briandowns/spinner"
  13. api "github.com/porter-dev/porter/api/client"
  14. "github.com/porter-dev/porter/api/types"
  15. "github.com/porter-dev/porter/cli/cmd/utils"
  16. "github.com/spf13/cobra"
  17. corev1 "k8s.io/api/core/v1"
  18. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  19. "k8s.io/apimachinery/pkg/util/sets"
  20. "k8s.io/client-go/rest"
  21. "k8s.io/client-go/tools/clientcmd"
  22. "k8s.io/client-go/tools/portforward"
  23. "k8s.io/client-go/transport/spdy"
  24. "k8s.io/kubectl/pkg/util"
  25. )
  26. var address []string
  27. var portForwardCmd = &cobra.Command{
  28. Use: "port-forward [release] [LOCAL_PORT:]REMOTE_PORT [...[LOCAL_PORT_N:]REMOTE_PORT_N]",
  29. Short: "Forward one or more local ports to a pod of a release",
  30. Args: cobra.MinimumNArgs(2),
  31. Run: func(cmd *cobra.Command, args []string) {
  32. err := checkLoginAndRun(args, portForward)
  33. if err != nil {
  34. os.Exit(1)
  35. }
  36. },
  37. }
  38. func init() {
  39. portForwardCmd.PersistentFlags().StringVar(
  40. &namespace,
  41. "namespace",
  42. "default",
  43. "namespace of the release whose pod you want to port-forward to",
  44. )
  45. portForwardCmd.Flags().StringSliceVar(
  46. &address,
  47. "address",
  48. []string{"localhost"},
  49. "Addresses to listen on (comma separated). Only accepts IP addresses or localhost as a value. "+
  50. "When localhost is supplied, kubectl will try to bind on both 127.0.0.1 and ::1 and will fail "+
  51. "if neither of these addresses are available to bind.")
  52. rootCmd.AddCommand(portForwardCmd)
  53. }
  54. func forwardPorts(
  55. method string,
  56. url *url.URL,
  57. kubeConfig *rest.Config,
  58. address, ports []string,
  59. stopChan <-chan struct{},
  60. readyChan chan struct{},
  61. ) error {
  62. transport, upgrader, err := spdy.RoundTripperFor(kubeConfig)
  63. if err != nil {
  64. return err
  65. }
  66. dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, method, url)
  67. fw, err := portforward.NewOnAddresses(
  68. dialer, address, ports, stopChan, readyChan, os.Stdout, os.Stderr)
  69. if err != nil {
  70. return err
  71. }
  72. return fw.ForwardPorts()
  73. }
  74. // splitPort splits port string which is in form of [LOCAL PORT]:REMOTE PORT
  75. // and returns local and remote ports separately
  76. func splitPort(port string) (local, remote string) {
  77. parts := strings.Split(port, ":")
  78. if len(parts) == 2 {
  79. return parts[0], parts[1]
  80. }
  81. return parts[0], parts[0]
  82. }
  83. func portForward(user *types.GetAuthenticatedUserResponse, client *api.Client, args []string) error {
  84. var err error
  85. var pod corev1.Pod
  86. s := spinner.New(spinner.CharSets[9], 100*time.Millisecond)
  87. s.Color("cyan")
  88. s.Suffix = fmt.Sprintf(" Loading list of pods for %s", args[0])
  89. s.Start()
  90. podsResp, err := client.GetK8sAllPods(context.Background(), cliConf.Project, cliConf.Cluster, namespace, args[0])
  91. s.Stop()
  92. if err != nil {
  93. return err
  94. }
  95. pods := *podsResp
  96. if len(pods) > 1 {
  97. selectedPod, err := utils.PromptSelect("Select a pod to port-forward", func() []string {
  98. var names []string
  99. for i, pod := range pods {
  100. names = append(names, fmt.Sprintf("%d - %s", (i+1), pod.Name))
  101. }
  102. return names
  103. }())
  104. if err != nil {
  105. return err
  106. }
  107. podIdxStr := strings.Split(selectedPod, " - ")[0]
  108. podIdx, err := strconv.Atoi(podIdxStr)
  109. if err != nil {
  110. return err
  111. }
  112. pod = pods[podIdx]
  113. } else {
  114. pod = pods[0]
  115. }
  116. kubeResp, err := client.GetKubeconfig(context.Background(), cliConf.Project, cliConf.Cluster)
  117. if err != nil {
  118. return err
  119. }
  120. kubeBytes := kubeResp.Kubeconfig
  121. cmdConf, err := clientcmd.NewClientConfigFromBytes(kubeBytes)
  122. if err != nil {
  123. return err
  124. }
  125. restConf, err := cmdConf.ClientConfig()
  126. if err != nil {
  127. return err
  128. }
  129. err = checkUDPPortInPod(args[1:], &pod)
  130. if err != nil {
  131. return err
  132. }
  133. ports, err := convertPodNamedPortToNumber(args[1:], pod)
  134. if err != nil {
  135. return err
  136. }
  137. stopChannel := make(chan struct{}, 1)
  138. readyChannel := make(chan struct{})
  139. signals := make(chan os.Signal, 1)
  140. signal.Notify(signals, os.Interrupt)
  141. defer signal.Stop(signals)
  142. go func() {
  143. <-signals
  144. if stopChannel != nil {
  145. close(stopChannel)
  146. }
  147. }()
  148. restClient, err := rest.RESTClientFor(restConf)
  149. if err != nil {
  150. return err
  151. }
  152. req := restClient.Post().
  153. Resource("pods").
  154. Namespace(namespace).
  155. Name(pod.Name).
  156. SubResource("portforward")
  157. return forwardPorts("POST", req.URL(), restConf, address, ports, stopChannel, readyChannel)
  158. }
  159. func checkUDPPortInPod(ports []string, pod *corev1.Pod) error {
  160. udpPorts := sets.NewInt()
  161. tcpPorts := sets.NewInt()
  162. for _, ct := range pod.Spec.Containers {
  163. for _, ctPort := range ct.Ports {
  164. portNum := int(ctPort.ContainerPort)
  165. switch ctPort.Protocol {
  166. case corev1.ProtocolUDP:
  167. udpPorts.Insert(portNum)
  168. case corev1.ProtocolTCP:
  169. tcpPorts.Insert(portNum)
  170. }
  171. }
  172. }
  173. return checkUDPPorts(udpPorts.Difference(tcpPorts), ports, pod)
  174. }
  175. func checkUDPPorts(udpOnlyPorts sets.Int, ports []string, obj metav1.Object) error {
  176. for _, port := range ports {
  177. _, remotePort := splitPort(port)
  178. portNum, err := strconv.Atoi(remotePort)
  179. if err != nil {
  180. switch v := obj.(type) {
  181. case *corev1.Service:
  182. svcPort, err := util.LookupServicePortNumberByName(*v, remotePort)
  183. if err != nil {
  184. return err
  185. }
  186. portNum = int(svcPort)
  187. case *corev1.Pod:
  188. ctPort, err := util.LookupContainerPortNumberByName(*v, remotePort)
  189. if err != nil {
  190. return err
  191. }
  192. portNum = int(ctPort)
  193. default:
  194. return fmt.Errorf("unknown object: %v", obj)
  195. }
  196. }
  197. if udpOnlyPorts.Has(portNum) {
  198. return fmt.Errorf("UDP protocol is not supported for %s", remotePort)
  199. }
  200. }
  201. return nil
  202. }
  203. func convertPodNamedPortToNumber(ports []string, pod corev1.Pod) ([]string, error) {
  204. var converted []string
  205. for _, port := range ports {
  206. localPort, remotePort := splitPort(port)
  207. containerPortStr := remotePort
  208. _, err := strconv.Atoi(remotePort)
  209. if err != nil {
  210. containerPort, err := util.LookupContainerPortNumberByName(pod, remotePort)
  211. if err != nil {
  212. return nil, err
  213. }
  214. containerPortStr = strconv.Itoa(int(containerPort))
  215. }
  216. if localPort != remotePort {
  217. converted = append(converted, fmt.Sprintf("%s:%s", localPort, containerPortStr))
  218. } else {
  219. converted = append(converted, containerPortStr)
  220. }
  221. }
  222. return converted, nil
  223. }