exec.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package exec
  14. import (
  15. "bytes"
  16. "crypto/tls"
  17. "crypto/x509"
  18. "errors"
  19. "fmt"
  20. "io"
  21. "net"
  22. "net/http"
  23. "os"
  24. "os/exec"
  25. "reflect"
  26. "strings"
  27. "sync"
  28. "time"
  29. "github.com/davecgh/go-spew/spew"
  30. "golang.org/x/term"
  31. "k8s.io/apimachinery/pkg/runtime"
  32. "k8s.io/apimachinery/pkg/runtime/schema"
  33. "k8s.io/apimachinery/pkg/runtime/serializer"
  34. utilnet "k8s.io/apimachinery/pkg/util/net"
  35. "k8s.io/client-go/pkg/apis/clientauthentication"
  36. "k8s.io/client-go/pkg/apis/clientauthentication/install"
  37. clientauthenticationv1 "k8s.io/client-go/pkg/apis/clientauthentication/v1"
  38. clientauthenticationv1alpha1 "k8s.io/client-go/pkg/apis/clientauthentication/v1alpha1"
  39. clientauthenticationv1beta1 "k8s.io/client-go/pkg/apis/clientauthentication/v1beta1"
  40. "k8s.io/client-go/tools/clientcmd/api"
  41. "k8s.io/client-go/tools/metrics"
  42. "k8s.io/client-go/transport"
  43. "k8s.io/client-go/util/connrotation"
  44. "k8s.io/klog/v2"
  45. "k8s.io/utils/clock"
  46. )
  47. const execInfoEnv = "KUBERNETES_EXEC_INFO"
  48. const installHintVerboseHelp = `
  49. It looks like you are trying to use a client-go credential plugin that is not installed.
  50. To learn more about this feature, consult the documentation available at:
  51. https://kubernetes.io/docs/reference/access-authn-authz/authentication/#client-go-credential-plugins`
  52. var scheme = runtime.NewScheme()
  53. var codecs = serializer.NewCodecFactory(scheme)
  54. func init() {
  55. install.Install(scheme)
  56. }
  57. var (
  58. // Since transports can be constantly re-initialized by programs like kubectl,
  59. // keep a cache of initialized authenticators keyed by a hash of their config.
  60. globalCache = newCache()
  61. // The list of API versions we accept.
  62. apiVersions = map[string]schema.GroupVersion{
  63. clientauthenticationv1alpha1.SchemeGroupVersion.String(): clientauthenticationv1alpha1.SchemeGroupVersion,
  64. clientauthenticationv1beta1.SchemeGroupVersion.String(): clientauthenticationv1beta1.SchemeGroupVersion,
  65. clientauthenticationv1.SchemeGroupVersion.String(): clientauthenticationv1.SchemeGroupVersion,
  66. }
  67. )
  68. func newCache() *cache {
  69. return &cache{m: make(map[string]*Authenticator)}
  70. }
  71. var spewConfig = &spew.ConfigState{DisableMethods: true, Indent: " "}
  72. func cacheKey(conf *api.ExecConfig, cluster *clientauthentication.Cluster) string {
  73. key := struct {
  74. conf *api.ExecConfig
  75. cluster *clientauthentication.Cluster
  76. }{
  77. conf: conf,
  78. cluster: cluster,
  79. }
  80. return spewConfig.Sprint(key)
  81. }
  82. type cache struct {
  83. mu sync.Mutex
  84. m map[string]*Authenticator
  85. }
  86. func (c *cache) get(s string) (*Authenticator, bool) {
  87. c.mu.Lock()
  88. defer c.mu.Unlock()
  89. a, ok := c.m[s]
  90. return a, ok
  91. }
  92. // put inserts an authenticator into the cache. If an authenticator is already
  93. // associated with the key, the first one is returned instead.
  94. func (c *cache) put(s string, a *Authenticator) *Authenticator {
  95. c.mu.Lock()
  96. defer c.mu.Unlock()
  97. existing, ok := c.m[s]
  98. if ok {
  99. return existing
  100. }
  101. c.m[s] = a
  102. return a
  103. }
  104. // sometimes rate limits how often a function f() is called. Specifically, Do()
  105. // will run the provided function f() up to threshold times every interval
  106. // duration.
  107. type sometimes struct {
  108. threshold int
  109. interval time.Duration
  110. clock clock.Clock
  111. mu sync.Mutex
  112. count int // times we have called f() in this window
  113. window time.Time // beginning of current window of length interval
  114. }
  115. func (s *sometimes) Do(f func()) {
  116. s.mu.Lock()
  117. defer s.mu.Unlock()
  118. now := s.clock.Now()
  119. if s.window.IsZero() {
  120. s.window = now
  121. }
  122. // If we are no longer in our saved time window, then we get to reset our run
  123. // count back to 0 and start increasing towards the threshold again.
  124. if inWindow := now.Sub(s.window) < s.interval; !inWindow {
  125. s.window = now
  126. s.count = 0
  127. }
  128. // If we have not run the function more than threshold times in this current
  129. // time window, we get to run it now!
  130. if underThreshold := s.count < s.threshold; underThreshold {
  131. s.count++
  132. f()
  133. }
  134. }
  135. // GetAuthenticator returns an exec-based plugin for providing client credentials.
  136. func GetAuthenticator(config *api.ExecConfig, cluster *clientauthentication.Cluster) (*Authenticator, error) {
  137. return newAuthenticator(globalCache, term.IsTerminal, config, cluster)
  138. }
  139. func newAuthenticator(c *cache, isTerminalFunc func(int) bool, config *api.ExecConfig, cluster *clientauthentication.Cluster) (*Authenticator, error) {
  140. key := cacheKey(config, cluster)
  141. if a, ok := c.get(key); ok {
  142. return a, nil
  143. }
  144. gv, ok := apiVersions[config.APIVersion]
  145. if !ok {
  146. return nil, fmt.Errorf("exec plugin: invalid apiVersion %q", config.APIVersion)
  147. }
  148. connTracker := connrotation.NewConnectionTracker()
  149. defaultDialer := connrotation.NewDialerWithTracker(
  150. (&net.Dialer{Timeout: 30 * time.Second, KeepAlive: 30 * time.Second}).DialContext,
  151. connTracker,
  152. )
  153. a := &Authenticator{
  154. cmd: config.Command,
  155. args: config.Args,
  156. group: gv,
  157. cluster: cluster,
  158. provideClusterInfo: config.ProvideClusterInfo,
  159. installHint: config.InstallHint,
  160. sometimes: &sometimes{
  161. threshold: 10,
  162. interval: time.Hour,
  163. clock: clock.RealClock{},
  164. },
  165. stdin: os.Stdin,
  166. stderr: os.Stderr,
  167. interactiveFunc: func() (bool, error) { return isInteractive(isTerminalFunc, config) },
  168. now: time.Now,
  169. environ: os.Environ,
  170. defaultDialer: defaultDialer,
  171. connTracker: connTracker,
  172. }
  173. for _, env := range config.Env {
  174. a.env = append(a.env, env.Name+"="+env.Value)
  175. }
  176. return c.put(key, a), nil
  177. }
  178. func isInteractive(isTerminalFunc func(int) bool, config *api.ExecConfig) (bool, error) {
  179. var shouldBeInteractive bool
  180. switch config.InteractiveMode {
  181. case api.NeverExecInteractiveMode:
  182. shouldBeInteractive = false
  183. case api.IfAvailableExecInteractiveMode:
  184. shouldBeInteractive = !config.StdinUnavailable && isTerminalFunc(int(os.Stdin.Fd()))
  185. case api.AlwaysExecInteractiveMode:
  186. if !isTerminalFunc(int(os.Stdin.Fd())) {
  187. return false, errors.New("standard input is not a terminal")
  188. }
  189. if config.StdinUnavailable {
  190. suffix := ""
  191. if len(config.StdinUnavailableMessage) > 0 {
  192. // only print extra ": <message>" if the user actually specified a message
  193. suffix = fmt.Sprintf(": %s", config.StdinUnavailableMessage)
  194. }
  195. return false, fmt.Errorf("standard input is unavailable%s", suffix)
  196. }
  197. shouldBeInteractive = true
  198. default:
  199. return false, fmt.Errorf("unknown interactiveMode: %q", config.InteractiveMode)
  200. }
  201. return shouldBeInteractive, nil
  202. }
  203. // Authenticator is a client credential provider that rotates credentials by executing a plugin.
  204. // The plugin input and output are defined by the API group client.authentication.k8s.io.
  205. type Authenticator struct {
  206. // Set by the config
  207. cmd string
  208. args []string
  209. group schema.GroupVersion
  210. env []string
  211. cluster *clientauthentication.Cluster
  212. provideClusterInfo bool
  213. // Used to avoid log spew by rate limiting install hint printing. We didn't do
  214. // this by interval based rate limiting alone since that way may have prevented
  215. // the install hint from showing up for kubectl users.
  216. sometimes *sometimes
  217. installHint string
  218. // Stubbable for testing
  219. stdin io.Reader
  220. stderr io.Writer
  221. interactiveFunc func() (bool, error)
  222. now func() time.Time
  223. environ func() []string
  224. // defaultDialer is used for clients which don't specify a custom dialer
  225. defaultDialer *connrotation.Dialer
  226. // connTracker tracks all connections opened that we need to close when rotating a client certificate
  227. connTracker *connrotation.ConnectionTracker
  228. // Cached results.
  229. //
  230. // The mutex also guards calling the plugin. Since the plugin could be
  231. // interactive we want to make sure it's only called once.
  232. mu sync.Mutex
  233. cachedCreds *credentials
  234. exp time.Time
  235. }
  236. type credentials struct {
  237. token string `datapolicy:"token"`
  238. cert *tls.Certificate `datapolicy:"secret-key"`
  239. }
  240. // UpdateTransportConfig updates the transport.Config to use credentials
  241. // returned by the plugin.
  242. func (a *Authenticator) UpdateTransportConfig(c *transport.Config) error {
  243. // If a bearer token is present in the request - avoid the GetCert callback when
  244. // setting up the transport, as that triggers the exec action if the server is
  245. // also configured to allow client certificates for authentication. For requests
  246. // like "kubectl get --token (token) pods" we should assume the intention is to
  247. // use the provided token for authentication. The same can be said for when the
  248. // user specifies basic auth.
  249. if c.HasTokenAuth() || c.HasBasicAuth() {
  250. return nil
  251. }
  252. c.Wrap(func(rt http.RoundTripper) http.RoundTripper {
  253. return &roundTripper{a, rt}
  254. })
  255. if c.TLS.GetCert != nil {
  256. return errors.New("can't add TLS certificate callback: transport.Config.TLS.GetCert already set")
  257. }
  258. c.TLS.GetCert = a.cert
  259. var d *connrotation.Dialer
  260. if c.Dial != nil {
  261. // if c has a custom dialer, we have to wrap it
  262. d = connrotation.NewDialerWithTracker(c.Dial, a.connTracker)
  263. } else {
  264. d = a.defaultDialer
  265. }
  266. c.Dial = d.DialContext
  267. return nil
  268. }
  269. var _ utilnet.RoundTripperWrapper = &roundTripper{}
  270. type roundTripper struct {
  271. a *Authenticator
  272. base http.RoundTripper
  273. }
  274. func (r *roundTripper) WrappedRoundTripper() http.RoundTripper {
  275. return r.base
  276. }
  277. func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
  278. // If a user has already set credentials, use that. This makes commands like
  279. // "kubectl get --token (token) pods" work.
  280. if req.Header.Get("Authorization") != "" {
  281. return r.base.RoundTrip(req)
  282. }
  283. creds, err := r.a.getCreds()
  284. if err != nil {
  285. return nil, fmt.Errorf("getting credentials: %v", err)
  286. }
  287. if creds.token != "" {
  288. req.Header.Set("Authorization", "Bearer "+creds.token)
  289. }
  290. res, err := r.base.RoundTrip(req)
  291. if err != nil {
  292. return nil, err
  293. }
  294. if res.StatusCode == http.StatusUnauthorized {
  295. resp := &clientauthentication.Response{
  296. Header: res.Header,
  297. Code: int32(res.StatusCode),
  298. }
  299. if err := r.a.maybeRefreshCreds(creds, resp); err != nil {
  300. klog.Errorf("refreshing credentials: %v", err)
  301. }
  302. }
  303. return res, nil
  304. }
  305. func (a *Authenticator) credsExpired() bool {
  306. if a.exp.IsZero() {
  307. return false
  308. }
  309. return a.now().After(a.exp)
  310. }
  311. func (a *Authenticator) cert() (*tls.Certificate, error) {
  312. creds, err := a.getCreds()
  313. if err != nil {
  314. return nil, err
  315. }
  316. return creds.cert, nil
  317. }
  318. func (a *Authenticator) getCreds() (*credentials, error) {
  319. a.mu.Lock()
  320. defer a.mu.Unlock()
  321. if a.cachedCreds != nil && !a.credsExpired() {
  322. return a.cachedCreds, nil
  323. }
  324. if err := a.refreshCredsLocked(nil); err != nil {
  325. return nil, err
  326. }
  327. return a.cachedCreds, nil
  328. }
  329. // maybeRefreshCreds executes the plugin to force a rotation of the
  330. // credentials, unless they were rotated already.
  331. func (a *Authenticator) maybeRefreshCreds(creds *credentials, r *clientauthentication.Response) error {
  332. a.mu.Lock()
  333. defer a.mu.Unlock()
  334. // Since we're not making a new pointer to a.cachedCreds in getCreds, no
  335. // need to do deep comparison.
  336. if creds != a.cachedCreds {
  337. // Credentials already rotated.
  338. return nil
  339. }
  340. return a.refreshCredsLocked(r)
  341. }
  342. // refreshCredsLocked executes the plugin and reads the credentials from
  343. // stdout. It must be called while holding the Authenticator's mutex.
  344. func (a *Authenticator) refreshCredsLocked(r *clientauthentication.Response) error {
  345. interactive, err := a.interactiveFunc()
  346. if err != nil {
  347. return fmt.Errorf("exec plugin cannot support interactive mode: %w", err)
  348. }
  349. cred := &clientauthentication.ExecCredential{
  350. Spec: clientauthentication.ExecCredentialSpec{
  351. Response: r,
  352. Interactive: interactive,
  353. },
  354. }
  355. if a.provideClusterInfo {
  356. cred.Spec.Cluster = a.cluster
  357. }
  358. env := append(a.environ(), a.env...)
  359. data, err := runtime.Encode(codecs.LegacyCodec(a.group), cred)
  360. if err != nil {
  361. return fmt.Errorf("encode ExecCredentials: %v", err)
  362. }
  363. env = append(env, fmt.Sprintf("%s=%s", execInfoEnv, data))
  364. stdout := &bytes.Buffer{}
  365. cmd := exec.Command(a.cmd, a.args...)
  366. cmd.Env = env
  367. cmd.Stderr = a.stderr
  368. cmd.Stdout = stdout
  369. if interactive {
  370. cmd.Stdin = a.stdin
  371. }
  372. err = cmd.Run()
  373. incrementCallsMetric(err)
  374. if err != nil {
  375. return a.wrapCmdRunErrorLocked(err)
  376. }
  377. _, gvk, err := codecs.UniversalDecoder(a.group).Decode(stdout.Bytes(), nil, cred)
  378. if err != nil {
  379. return fmt.Errorf("decoding stdout: %v", err)
  380. }
  381. if gvk.Group != a.group.Group || gvk.Version != a.group.Version {
  382. return fmt.Errorf("exec plugin is configured to use API version %s, plugin returned version %s",
  383. a.group, schema.GroupVersion{Group: gvk.Group, Version: gvk.Version})
  384. }
  385. if cred.Status == nil {
  386. return fmt.Errorf("exec plugin didn't return a status field")
  387. }
  388. if cred.Status.Token == "" && cred.Status.ClientCertificateData == "" && cred.Status.ClientKeyData == "" {
  389. return fmt.Errorf("exec plugin didn't return a token or cert/key pair")
  390. }
  391. if (cred.Status.ClientCertificateData == "") != (cred.Status.ClientKeyData == "") {
  392. return fmt.Errorf("exec plugin returned only certificate or key, not both")
  393. }
  394. if cred.Status.ExpirationTimestamp != nil {
  395. a.exp = cred.Status.ExpirationTimestamp.Time
  396. } else {
  397. a.exp = time.Time{}
  398. }
  399. newCreds := &credentials{
  400. token: cred.Status.Token,
  401. }
  402. if cred.Status.ClientKeyData != "" && cred.Status.ClientCertificateData != "" {
  403. cert, err := tls.X509KeyPair([]byte(cred.Status.ClientCertificateData), []byte(cred.Status.ClientKeyData))
  404. if err != nil {
  405. return fmt.Errorf("failed parsing client key/certificate: %v", err)
  406. }
  407. // Leaf is initialized to be nil:
  408. // https://golang.org/pkg/crypto/tls/#X509KeyPair
  409. // Leaf certificate is the first certificate:
  410. // https://golang.org/pkg/crypto/tls/#Certificate
  411. // Populating leaf is useful for quickly accessing the underlying x509
  412. // certificate values.
  413. cert.Leaf, err = x509.ParseCertificate(cert.Certificate[0])
  414. if err != nil {
  415. return fmt.Errorf("failed parsing client leaf certificate: %v", err)
  416. }
  417. newCreds.cert = &cert
  418. }
  419. oldCreds := a.cachedCreds
  420. a.cachedCreds = newCreds
  421. // Only close all connections when TLS cert rotates. Token rotation doesn't
  422. // need the extra noise.
  423. if oldCreds != nil && !reflect.DeepEqual(oldCreds.cert, a.cachedCreds.cert) {
  424. // Can be nil if the exec auth plugin only returned token auth.
  425. if oldCreds.cert != nil && oldCreds.cert.Leaf != nil {
  426. metrics.ClientCertRotationAge.Observe(time.Since(oldCreds.cert.Leaf.NotBefore))
  427. }
  428. a.connTracker.CloseAll()
  429. }
  430. expiry := time.Time{}
  431. if a.cachedCreds.cert != nil && a.cachedCreds.cert.Leaf != nil {
  432. expiry = a.cachedCreds.cert.Leaf.NotAfter
  433. }
  434. expirationMetrics.set(a, expiry)
  435. return nil
  436. }
  437. // wrapCmdRunErrorLocked pulls out the code to construct a helpful error message
  438. // for when the exec plugin's binary fails to Run().
  439. //
  440. // It must be called while holding the Authenticator's mutex.
  441. func (a *Authenticator) wrapCmdRunErrorLocked(err error) error {
  442. switch err.(type) {
  443. case *exec.Error: // Binary does not exist (see exec.Error).
  444. builder := strings.Builder{}
  445. fmt.Fprintf(&builder, "exec: executable %s not found", a.cmd)
  446. a.sometimes.Do(func() {
  447. fmt.Fprint(&builder, installHintVerboseHelp)
  448. if a.installHint != "" {
  449. fmt.Fprintf(&builder, "\n\n%s", a.installHint)
  450. }
  451. })
  452. return errors.New(builder.String())
  453. case *exec.ExitError: // Binary execution failed (see exec.Cmd.Run()).
  454. e := err.(*exec.ExitError)
  455. return fmt.Errorf(
  456. "exec: executable %s failed with exit code %d",
  457. a.cmd,
  458. e.ProcessState.ExitCode(),
  459. )
  460. default:
  461. return fmt.Errorf("exec: %v", err)
  462. }
  463. }