2
0

portforward.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. package cmd
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "net/url"
  7. "os"
  8. "os/signal"
  9. "strconv"
  10. "strings"
  11. "time"
  12. "github.com/briandowns/spinner"
  13. api "github.com/porter-dev/porter/api/client"
  14. "github.com/porter-dev/porter/api/types"
  15. "github.com/porter-dev/porter/cli/cmd/utils"
  16. "github.com/spf13/cobra"
  17. corev1 "k8s.io/api/core/v1"
  18. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  19. "k8s.io/apimachinery/pkg/runtime"
  20. "k8s.io/apimachinery/pkg/runtime/schema"
  21. "k8s.io/apimachinery/pkg/util/sets"
  22. "k8s.io/client-go/rest"
  23. "k8s.io/client-go/tools/clientcmd"
  24. "k8s.io/client-go/tools/portforward"
  25. "k8s.io/client-go/transport/spdy"
  26. "k8s.io/kubectl/pkg/util"
  27. )
  28. var address []string
  29. var portForwardCmd = &cobra.Command{
  30. Use: "port-forward [release] [LOCAL_PORT:]REMOTE_PORT [...[LOCAL_PORT_N:]REMOTE_PORT_N]",
  31. Short: "Forward one or more local ports to a pod of a release",
  32. Args: cobra.MinimumNArgs(2),
  33. Run: func(cmd *cobra.Command, args []string) {
  34. err := checkLoginAndRun(args, portForward)
  35. if err != nil {
  36. os.Exit(1)
  37. }
  38. },
  39. }
  40. func init() {
  41. portForwardCmd.PersistentFlags().StringVar(
  42. &namespace,
  43. "namespace",
  44. "default",
  45. "namespace of the release whose pod you want to port-forward to",
  46. )
  47. portForwardCmd.Flags().StringSliceVar(
  48. &address,
  49. "address",
  50. []string{"localhost"},
  51. "Addresses to listen on (comma separated). Only accepts IP addresses or localhost as a value. "+
  52. "When localhost is supplied, kubectl will try to bind on both 127.0.0.1 and ::1 and will fail "+
  53. "if neither of these addresses are available to bind.")
  54. rootCmd.AddCommand(portForwardCmd)
  55. }
  56. func forwardPorts(
  57. method string,
  58. url *url.URL,
  59. kubeConfig *rest.Config,
  60. address, ports []string,
  61. stopChan <-chan struct{},
  62. readyChan chan struct{},
  63. ) error {
  64. transport, upgrader, err := spdy.RoundTripperFor(kubeConfig)
  65. if err != nil {
  66. return err
  67. }
  68. dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, method, url)
  69. fw, err := portforward.NewOnAddresses(
  70. dialer, address, ports, stopChan, readyChan, os.Stdout, os.Stderr)
  71. if err != nil {
  72. return err
  73. }
  74. return fw.ForwardPorts()
  75. }
  76. // splitPort splits port string which is in form of [LOCAL PORT]:REMOTE PORT
  77. // and returns local and remote ports separately
  78. func splitPort(port string) (local, remote string) {
  79. parts := strings.Split(port, ":")
  80. if len(parts) == 2 {
  81. return parts[0], parts[1]
  82. }
  83. return parts[0], parts[0]
  84. }
  85. func portForward(user *types.GetAuthenticatedUserResponse, client *api.Client, args []string) error {
  86. var err error
  87. var pod corev1.Pod
  88. s := spinner.New(spinner.CharSets[9], 100*time.Millisecond)
  89. s.Color("cyan")
  90. s.Suffix = fmt.Sprintf(" Loading list of pods for %s", args[0])
  91. s.Start()
  92. podsResp, err := client.GetK8sAllPods(context.Background(), cliConf.Project, cliConf.Cluster, namespace, args[0])
  93. s.Stop()
  94. if err != nil {
  95. return err
  96. }
  97. pods := *podsResp
  98. if len(pods) > 1 {
  99. selectedPod, err := utils.PromptSelect("Select a pod to port-forward", func() []string {
  100. var names []string
  101. for i, pod := range pods {
  102. names = append(names, fmt.Sprintf("%d - %s", (i+1), pod.Name))
  103. }
  104. return names
  105. }())
  106. if err != nil {
  107. return err
  108. }
  109. podIdxStr := strings.Split(selectedPod, " - ")[0]
  110. podIdx, err := strconv.Atoi(podIdxStr)
  111. if err != nil {
  112. return err
  113. }
  114. pod = pods[podIdx-1]
  115. } else {
  116. pod = pods[0]
  117. }
  118. kubeResp, err := client.GetKubeconfig(context.Background(), cliConf.Project, cliConf.Cluster, cliConf.Kubeconfig)
  119. if err != nil {
  120. return err
  121. }
  122. kubeBytes := kubeResp.Kubeconfig
  123. cmdConf, err := clientcmd.NewClientConfigFromBytes(kubeBytes)
  124. if err != nil {
  125. return err
  126. }
  127. restConf, err := cmdConf.ClientConfig()
  128. if err != nil {
  129. return err
  130. }
  131. restConf.GroupVersion = &schema.GroupVersion{
  132. Group: "api",
  133. Version: "v1",
  134. }
  135. restConf.NegotiatedSerializer = runtime.NewSimpleNegotiatedSerializer(runtime.SerializerInfo{})
  136. restClient, err := rest.RESTClientFor(restConf)
  137. if err != nil {
  138. return err
  139. }
  140. err = checkUDPPortInPod(args[1:], &pod)
  141. if err != nil {
  142. return err
  143. }
  144. ports, err := convertPodNamedPortToNumber(args[1:], pod)
  145. if err != nil {
  146. return err
  147. }
  148. stopChannel := make(chan struct{}, 1)
  149. readyChannel := make(chan struct{})
  150. signals := make(chan os.Signal, 1)
  151. signal.Notify(signals, os.Interrupt)
  152. defer signal.Stop(signals)
  153. go func() {
  154. <-signals
  155. if stopChannel != nil {
  156. close(stopChannel)
  157. }
  158. }()
  159. req := restClient.Post().
  160. Resource("pods").
  161. Namespace(namespace).
  162. Name(pod.Name).
  163. SubResource("portforward")
  164. return forwardPorts("POST", req.URL(), restConf, address, ports, stopChannel, readyChannel)
  165. }
  166. func checkUDPPortInPod(ports []string, pod *corev1.Pod) error {
  167. udpPorts := sets.NewInt()
  168. tcpPorts := sets.NewInt()
  169. for _, ct := range pod.Spec.Containers {
  170. for _, ctPort := range ct.Ports {
  171. portNum := int(ctPort.ContainerPort)
  172. switch ctPort.Protocol {
  173. case corev1.ProtocolUDP:
  174. udpPorts.Insert(portNum)
  175. case corev1.ProtocolTCP:
  176. tcpPorts.Insert(portNum)
  177. }
  178. }
  179. }
  180. return checkUDPPorts(udpPorts.Difference(tcpPorts), ports, pod)
  181. }
  182. func checkUDPPorts(udpOnlyPorts sets.Int, ports []string, obj metav1.Object) error {
  183. for _, port := range ports {
  184. _, remotePort := splitPort(port)
  185. portNum, err := strconv.Atoi(remotePort)
  186. if err != nil {
  187. switch v := obj.(type) {
  188. case *corev1.Service:
  189. svcPort, err := util.LookupServicePortNumberByName(*v, remotePort)
  190. if err != nil {
  191. return err
  192. }
  193. portNum = int(svcPort)
  194. case *corev1.Pod:
  195. ctPort, err := util.LookupContainerPortNumberByName(*v, remotePort)
  196. if err != nil {
  197. return err
  198. }
  199. portNum = int(ctPort)
  200. default:
  201. return fmt.Errorf("unknown object: %v", obj)
  202. }
  203. }
  204. if udpOnlyPorts.Has(portNum) {
  205. return fmt.Errorf("UDP protocol is not supported for %s", remotePort)
  206. }
  207. }
  208. return nil
  209. }
  210. func convertPodNamedPortToNumber(ports []string, pod corev1.Pod) ([]string, error) {
  211. var converted []string
  212. for _, port := range ports {
  213. localPort, remotePort := splitPort(port)
  214. containerPortStr := remotePort
  215. _, err := strconv.Atoi(remotePort)
  216. if err != nil {
  217. containerPort, err := util.LookupContainerPortNumberByName(pod, remotePort)
  218. if err != nil {
  219. return nil, err
  220. }
  221. containerPortStr = strconv.Itoa(int(containerPort))
  222. }
  223. if localPort != remotePort {
  224. converted = append(converted, fmt.Sprintf("%s:%s", localPort, containerPortStr))
  225. } else {
  226. converted = append(converted, containerPortStr)
  227. }
  228. }
  229. return converted, nil
  230. }