exec.go 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package exec
  14. import (
  15. "bytes"
  16. "context"
  17. "crypto/tls"
  18. "errors"
  19. "fmt"
  20. "io"
  21. "net"
  22. "net/http"
  23. "os"
  24. "os/exec"
  25. "reflect"
  26. "sync"
  27. "time"
  28. "github.com/davecgh/go-spew/spew"
  29. "golang.org/x/crypto/ssh/terminal"
  30. v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  31. "k8s.io/apimachinery/pkg/runtime"
  32. "k8s.io/apimachinery/pkg/runtime/schema"
  33. "k8s.io/apimachinery/pkg/runtime/serializer"
  34. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  35. "k8s.io/client-go/pkg/apis/clientauthentication"
  36. "k8s.io/client-go/pkg/apis/clientauthentication/v1alpha1"
  37. "k8s.io/client-go/pkg/apis/clientauthentication/v1beta1"
  38. "k8s.io/client-go/tools/clientcmd/api"
  39. "k8s.io/client-go/transport"
  40. "k8s.io/client-go/util/connrotation"
  41. "k8s.io/klog"
  42. )
  43. const execInfoEnv = "KUBERNETES_EXEC_INFO"
  44. var scheme = runtime.NewScheme()
  45. var codecs = serializer.NewCodecFactory(scheme)
  46. func init() {
  47. v1.AddToGroupVersion(scheme, schema.GroupVersion{Version: "v1"})
  48. utilruntime.Must(v1alpha1.AddToScheme(scheme))
  49. utilruntime.Must(v1beta1.AddToScheme(scheme))
  50. utilruntime.Must(clientauthentication.AddToScheme(scheme))
  51. }
  52. var (
  53. // Since transports can be constantly re-initialized by programs like kubectl,
  54. // keep a cache of initialized authenticators keyed by a hash of their config.
  55. globalCache = newCache()
  56. // The list of API versions we accept.
  57. apiVersions = map[string]schema.GroupVersion{
  58. v1alpha1.SchemeGroupVersion.String(): v1alpha1.SchemeGroupVersion,
  59. v1beta1.SchemeGroupVersion.String(): v1beta1.SchemeGroupVersion,
  60. }
  61. )
  62. func newCache() *cache {
  63. return &cache{m: make(map[string]*Authenticator)}
  64. }
  65. var spewConfig = &spew.ConfigState{DisableMethods: true, Indent: " "}
  66. func cacheKey(c *api.ExecConfig) string {
  67. return spewConfig.Sprint(c)
  68. }
  69. type cache struct {
  70. mu sync.Mutex
  71. m map[string]*Authenticator
  72. }
  73. func (c *cache) get(s string) (*Authenticator, bool) {
  74. c.mu.Lock()
  75. defer c.mu.Unlock()
  76. a, ok := c.m[s]
  77. return a, ok
  78. }
  79. // put inserts an authenticator into the cache. If an authenticator is already
  80. // associated with the key, the first one is returned instead.
  81. func (c *cache) put(s string, a *Authenticator) *Authenticator {
  82. c.mu.Lock()
  83. defer c.mu.Unlock()
  84. existing, ok := c.m[s]
  85. if ok {
  86. return existing
  87. }
  88. c.m[s] = a
  89. return a
  90. }
  91. // GetAuthenticator returns an exec-based plugin for providing client credentials.
  92. func GetAuthenticator(config *api.ExecConfig) (*Authenticator, error) {
  93. return newAuthenticator(globalCache, config)
  94. }
  95. func newAuthenticator(c *cache, config *api.ExecConfig) (*Authenticator, error) {
  96. key := cacheKey(config)
  97. if a, ok := c.get(key); ok {
  98. return a, nil
  99. }
  100. gv, ok := apiVersions[config.APIVersion]
  101. if !ok {
  102. return nil, fmt.Errorf("exec plugin: invalid apiVersion %q", config.APIVersion)
  103. }
  104. a := &Authenticator{
  105. cmd: config.Command,
  106. args: config.Args,
  107. group: gv,
  108. stdin: os.Stdin,
  109. stderr: os.Stderr,
  110. interactive: terminal.IsTerminal(int(os.Stdout.Fd())),
  111. now: time.Now,
  112. environ: os.Environ,
  113. }
  114. for _, env := range config.Env {
  115. a.env = append(a.env, env.Name+"="+env.Value)
  116. }
  117. return c.put(key, a), nil
  118. }
  119. // Authenticator is a client credential provider that rotates credentials by executing a plugin.
  120. // The plugin input and output are defined by the API group client.authentication.k8s.io.
  121. type Authenticator struct {
  122. // Set by the config
  123. cmd string
  124. args []string
  125. group schema.GroupVersion
  126. env []string
  127. // Stubbable for testing
  128. stdin io.Reader
  129. stderr io.Writer
  130. interactive bool
  131. now func() time.Time
  132. environ func() []string
  133. // Cached results.
  134. //
  135. // The mutex also guards calling the plugin. Since the plugin could be
  136. // interactive we want to make sure it's only called once.
  137. mu sync.Mutex
  138. cachedCreds *credentials
  139. exp time.Time
  140. onRotate func()
  141. }
  142. type credentials struct {
  143. token string
  144. cert *tls.Certificate
  145. }
  146. // UpdateTransportConfig updates the transport.Config to use credentials
  147. // returned by the plugin.
  148. func (a *Authenticator) UpdateTransportConfig(c *transport.Config) error {
  149. c.Wrap(func(rt http.RoundTripper) http.RoundTripper {
  150. return &roundTripper{a, rt}
  151. })
  152. if c.TLS.GetCert != nil {
  153. return errors.New("can't add TLS certificate callback: transport.Config.TLS.GetCert already set")
  154. }
  155. c.TLS.GetCert = a.cert
  156. var dial func(ctx context.Context, network, addr string) (net.Conn, error)
  157. if c.Dial != nil {
  158. dial = c.Dial
  159. } else {
  160. dial = (&net.Dialer{Timeout: 30 * time.Second, KeepAlive: 30 * time.Second}).DialContext
  161. }
  162. d := connrotation.NewDialer(dial)
  163. a.onRotate = d.CloseAll
  164. c.Dial = d.DialContext
  165. return nil
  166. }
  167. type roundTripper struct {
  168. a *Authenticator
  169. base http.RoundTripper
  170. }
  171. func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
  172. // If a user has already set credentials, use that. This makes commands like
  173. // "kubectl get --token (token) pods" work.
  174. if req.Header.Get("Authorization") != "" {
  175. return r.base.RoundTrip(req)
  176. }
  177. creds, err := r.a.getCreds()
  178. if err != nil {
  179. return nil, fmt.Errorf("getting credentials: %v", err)
  180. }
  181. if creds.token != "" {
  182. req.Header.Set("Authorization", "Bearer "+creds.token)
  183. }
  184. res, err := r.base.RoundTrip(req)
  185. if err != nil {
  186. return nil, err
  187. }
  188. if res.StatusCode == http.StatusUnauthorized {
  189. resp := &clientauthentication.Response{
  190. Header: res.Header,
  191. Code: int32(res.StatusCode),
  192. }
  193. if err := r.a.maybeRefreshCreds(creds, resp); err != nil {
  194. klog.Errorf("refreshing credentials: %v", err)
  195. }
  196. }
  197. return res, nil
  198. }
  199. func (a *Authenticator) credsExpired() bool {
  200. if a.exp.IsZero() {
  201. return false
  202. }
  203. return a.now().After(a.exp)
  204. }
  205. func (a *Authenticator) cert() (*tls.Certificate, error) {
  206. creds, err := a.getCreds()
  207. if err != nil {
  208. return nil, err
  209. }
  210. return creds.cert, nil
  211. }
  212. func (a *Authenticator) getCreds() (*credentials, error) {
  213. a.mu.Lock()
  214. defer a.mu.Unlock()
  215. if a.cachedCreds != nil && !a.credsExpired() {
  216. return a.cachedCreds, nil
  217. }
  218. if err := a.refreshCredsLocked(nil); err != nil {
  219. return nil, err
  220. }
  221. return a.cachedCreds, nil
  222. }
  223. // maybeRefreshCreds executes the plugin to force a rotation of the
  224. // credentials, unless they were rotated already.
  225. func (a *Authenticator) maybeRefreshCreds(creds *credentials, r *clientauthentication.Response) error {
  226. a.mu.Lock()
  227. defer a.mu.Unlock()
  228. // Since we're not making a new pointer to a.cachedCreds in getCreds, no
  229. // need to do deep comparison.
  230. if creds != a.cachedCreds {
  231. // Credentials already rotated.
  232. return nil
  233. }
  234. return a.refreshCredsLocked(r)
  235. }
  236. // refreshCredsLocked executes the plugin and reads the credentials from
  237. // stdout. It must be called while holding the Authenticator's mutex.
  238. func (a *Authenticator) refreshCredsLocked(r *clientauthentication.Response) error {
  239. cred := &clientauthentication.ExecCredential{
  240. Spec: clientauthentication.ExecCredentialSpec{
  241. Response: r,
  242. Interactive: a.interactive,
  243. },
  244. }
  245. env := append(a.environ(), a.env...)
  246. if a.group == v1alpha1.SchemeGroupVersion {
  247. // Input spec disabled for beta due to lack of use. Possibly re-enable this later if
  248. // someone wants it back.
  249. //
  250. // See: https://github.com/kubernetes/kubernetes/issues/61796
  251. data, err := runtime.Encode(codecs.LegacyCodec(a.group), cred)
  252. if err != nil {
  253. return fmt.Errorf("encode ExecCredentials: %v", err)
  254. }
  255. env = append(env, fmt.Sprintf("%s=%s", execInfoEnv, data))
  256. }
  257. stdout := &bytes.Buffer{}
  258. cmd := exec.Command(a.cmd, a.args...)
  259. cmd.Env = env
  260. cmd.Stderr = a.stderr
  261. cmd.Stdout = stdout
  262. if a.interactive {
  263. cmd.Stdin = a.stdin
  264. }
  265. if err := cmd.Run(); err != nil {
  266. return fmt.Errorf("exec: %v", err)
  267. }
  268. _, gvk, err := codecs.UniversalDecoder(a.group).Decode(stdout.Bytes(), nil, cred)
  269. if err != nil {
  270. return fmt.Errorf("decoding stdout: %v", err)
  271. }
  272. if gvk.Group != a.group.Group || gvk.Version != a.group.Version {
  273. return fmt.Errorf("exec plugin is configured to use API version %s, plugin returned version %s",
  274. a.group, schema.GroupVersion{Group: gvk.Group, Version: gvk.Version})
  275. }
  276. if cred.Status == nil {
  277. return fmt.Errorf("exec plugin didn't return a status field")
  278. }
  279. if cred.Status.Token == "" && cred.Status.ClientCertificateData == "" && cred.Status.ClientKeyData == "" {
  280. return fmt.Errorf("exec plugin didn't return a token or cert/key pair")
  281. }
  282. if (cred.Status.ClientCertificateData == "") != (cred.Status.ClientKeyData == "") {
  283. return fmt.Errorf("exec plugin returned only certificate or key, not both")
  284. }
  285. if cred.Status.ExpirationTimestamp != nil {
  286. a.exp = cred.Status.ExpirationTimestamp.Time
  287. } else {
  288. a.exp = time.Time{}
  289. }
  290. newCreds := &credentials{
  291. token: cred.Status.Token,
  292. }
  293. if cred.Status.ClientKeyData != "" && cred.Status.ClientCertificateData != "" {
  294. cert, err := tls.X509KeyPair([]byte(cred.Status.ClientCertificateData), []byte(cred.Status.ClientKeyData))
  295. if err != nil {
  296. return fmt.Errorf("failed parsing client key/certificate: %v", err)
  297. }
  298. newCreds.cert = &cert
  299. }
  300. oldCreds := a.cachedCreds
  301. a.cachedCreds = newCreds
  302. // Only close all connections when TLS cert rotates. Token rotation doesn't
  303. // need the extra noise.
  304. if a.onRotate != nil && oldCreds != nil && !reflect.DeepEqual(oldCreds.cert, a.cachedCreds.cert) {
  305. a.onRotate()
  306. }
  307. return nil
  308. }