exec.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package exec
  14. import (
  15. "bytes"
  16. "crypto/tls"
  17. "crypto/x509"
  18. "errors"
  19. "fmt"
  20. "io"
  21. "net"
  22. "net/http"
  23. "os"
  24. "os/exec"
  25. "reflect"
  26. "strings"
  27. "sync"
  28. "time"
  29. "github.com/davecgh/go-spew/spew"
  30. "golang.org/x/term"
  31. v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  32. "k8s.io/apimachinery/pkg/runtime"
  33. "k8s.io/apimachinery/pkg/runtime/schema"
  34. "k8s.io/apimachinery/pkg/runtime/serializer"
  35. "k8s.io/apimachinery/pkg/util/clock"
  36. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  37. "k8s.io/client-go/pkg/apis/clientauthentication"
  38. "k8s.io/client-go/pkg/apis/clientauthentication/v1alpha1"
  39. "k8s.io/client-go/pkg/apis/clientauthentication/v1beta1"
  40. "k8s.io/client-go/tools/clientcmd/api"
  41. "k8s.io/client-go/tools/metrics"
  42. "k8s.io/client-go/transport"
  43. "k8s.io/client-go/util/connrotation"
  44. "k8s.io/klog/v2"
  45. )
  46. const execInfoEnv = "KUBERNETES_EXEC_INFO"
  47. const installHintVerboseHelp = `
  48. It looks like you are trying to use a client-go credential plugin that is not installed.
  49. To learn more about this feature, consult the documentation available at:
  50. https://kubernetes.io/docs/reference/access-authn-authz/authentication/#client-go-credential-plugins`
  51. var scheme = runtime.NewScheme()
  52. var codecs = serializer.NewCodecFactory(scheme)
  53. func init() {
  54. v1.AddToGroupVersion(scheme, schema.GroupVersion{Version: "v1"})
  55. utilruntime.Must(v1alpha1.AddToScheme(scheme))
  56. utilruntime.Must(v1beta1.AddToScheme(scheme))
  57. utilruntime.Must(clientauthentication.AddToScheme(scheme))
  58. }
  59. var (
  60. // Since transports can be constantly re-initialized by programs like kubectl,
  61. // keep a cache of initialized authenticators keyed by a hash of their config.
  62. globalCache = newCache()
  63. // The list of API versions we accept.
  64. apiVersions = map[string]schema.GroupVersion{
  65. v1alpha1.SchemeGroupVersion.String(): v1alpha1.SchemeGroupVersion,
  66. v1beta1.SchemeGroupVersion.String(): v1beta1.SchemeGroupVersion,
  67. }
  68. )
  69. func newCache() *cache {
  70. return &cache{m: make(map[string]*Authenticator)}
  71. }
  72. var spewConfig = &spew.ConfigState{DisableMethods: true, Indent: " "}
  73. func cacheKey(conf *api.ExecConfig, cluster *clientauthentication.Cluster) string {
  74. key := struct {
  75. conf *api.ExecConfig
  76. cluster *clientauthentication.Cluster
  77. }{
  78. conf: conf,
  79. cluster: cluster,
  80. }
  81. return spewConfig.Sprint(key)
  82. }
  83. type cache struct {
  84. mu sync.Mutex
  85. m map[string]*Authenticator
  86. }
  87. func (c *cache) get(s string) (*Authenticator, bool) {
  88. c.mu.Lock()
  89. defer c.mu.Unlock()
  90. a, ok := c.m[s]
  91. return a, ok
  92. }
  93. // put inserts an authenticator into the cache. If an authenticator is already
  94. // associated with the key, the first one is returned instead.
  95. func (c *cache) put(s string, a *Authenticator) *Authenticator {
  96. c.mu.Lock()
  97. defer c.mu.Unlock()
  98. existing, ok := c.m[s]
  99. if ok {
  100. return existing
  101. }
  102. c.m[s] = a
  103. return a
  104. }
  105. // sometimes rate limits how often a function f() is called. Specifically, Do()
  106. // will run the provided function f() up to threshold times every interval
  107. // duration.
  108. type sometimes struct {
  109. threshold int
  110. interval time.Duration
  111. clock clock.Clock
  112. mu sync.Mutex
  113. count int // times we have called f() in this window
  114. window time.Time // beginning of current window of length interval
  115. }
  116. func (s *sometimes) Do(f func()) {
  117. s.mu.Lock()
  118. defer s.mu.Unlock()
  119. now := s.clock.Now()
  120. if s.window.IsZero() {
  121. s.window = now
  122. }
  123. // If we are no longer in our saved time window, then we get to reset our run
  124. // count back to 0 and start increasing towards the threshold again.
  125. if inWindow := now.Sub(s.window) < s.interval; !inWindow {
  126. s.window = now
  127. s.count = 0
  128. }
  129. // If we have not run the function more than threshold times in this current
  130. // time window, we get to run it now!
  131. if underThreshold := s.count < s.threshold; underThreshold {
  132. s.count++
  133. f()
  134. }
  135. }
  136. // GetAuthenticator returns an exec-based plugin for providing client credentials.
  137. func GetAuthenticator(config *api.ExecConfig, cluster *clientauthentication.Cluster) (*Authenticator, error) {
  138. return newAuthenticator(globalCache, config, cluster)
  139. }
  140. func newAuthenticator(c *cache, config *api.ExecConfig, cluster *clientauthentication.Cluster) (*Authenticator, error) {
  141. key := cacheKey(config, cluster)
  142. if a, ok := c.get(key); ok {
  143. return a, nil
  144. }
  145. gv, ok := apiVersions[config.APIVersion]
  146. if !ok {
  147. return nil, fmt.Errorf("exec plugin: invalid apiVersion %q", config.APIVersion)
  148. }
  149. connTracker := connrotation.NewConnectionTracker()
  150. defaultDialer := connrotation.NewDialerWithTracker(
  151. (&net.Dialer{Timeout: 30 * time.Second, KeepAlive: 30 * time.Second}).DialContext,
  152. connTracker,
  153. )
  154. a := &Authenticator{
  155. cmd: config.Command,
  156. args: config.Args,
  157. group: gv,
  158. cluster: cluster,
  159. provideClusterInfo: config.ProvideClusterInfo,
  160. installHint: config.InstallHint,
  161. sometimes: &sometimes{
  162. threshold: 10,
  163. interval: time.Hour,
  164. clock: clock.RealClock{},
  165. },
  166. stdin: os.Stdin,
  167. stderr: os.Stderr,
  168. interactive: term.IsTerminal(int(os.Stdin.Fd())),
  169. now: time.Now,
  170. environ: os.Environ,
  171. defaultDialer: defaultDialer,
  172. connTracker: connTracker,
  173. }
  174. for _, env := range config.Env {
  175. a.env = append(a.env, env.Name+"="+env.Value)
  176. }
  177. return c.put(key, a), nil
  178. }
  179. // Authenticator is a client credential provider that rotates credentials by executing a plugin.
  180. // The plugin input and output are defined by the API group client.authentication.k8s.io.
  181. type Authenticator struct {
  182. // Set by the config
  183. cmd string
  184. args []string
  185. group schema.GroupVersion
  186. env []string
  187. cluster *clientauthentication.Cluster
  188. provideClusterInfo bool
  189. // Used to avoid log spew by rate limiting install hint printing. We didn't do
  190. // this by interval based rate limiting alone since that way may have prevented
  191. // the install hint from showing up for kubectl users.
  192. sometimes *sometimes
  193. installHint string
  194. // Stubbable for testing
  195. stdin io.Reader
  196. stderr io.Writer
  197. interactive bool
  198. now func() time.Time
  199. environ func() []string
  200. // defaultDialer is used for clients which don't specify a custom dialer
  201. defaultDialer *connrotation.Dialer
  202. // connTracker tracks all connections opened that we need to close when rotating a client certificate
  203. connTracker *connrotation.ConnectionTracker
  204. // Cached results.
  205. //
  206. // The mutex also guards calling the plugin. Since the plugin could be
  207. // interactive we want to make sure it's only called once.
  208. mu sync.Mutex
  209. cachedCreds *credentials
  210. exp time.Time
  211. }
  212. type credentials struct {
  213. token string `datapolicy:"token"`
  214. cert *tls.Certificate `datapolicy:"secret-key"`
  215. }
  216. // UpdateTransportConfig updates the transport.Config to use credentials
  217. // returned by the plugin.
  218. func (a *Authenticator) UpdateTransportConfig(c *transport.Config) error {
  219. // If a bearer token is present in the request - avoid the GetCert callback when
  220. // setting up the transport, as that triggers the exec action if the server is
  221. // also configured to allow client certificates for authentication. For requests
  222. // like "kubectl get --token (token) pods" we should assume the intention is to
  223. // use the provided token for authentication.
  224. if c.HasTokenAuth() {
  225. return nil
  226. }
  227. c.Wrap(func(rt http.RoundTripper) http.RoundTripper {
  228. return &roundTripper{a, rt}
  229. })
  230. if c.TLS.GetCert != nil {
  231. return errors.New("can't add TLS certificate callback: transport.Config.TLS.GetCert already set")
  232. }
  233. c.TLS.GetCert = a.cert
  234. var d *connrotation.Dialer
  235. if c.Dial != nil {
  236. // if c has a custom dialer, we have to wrap it
  237. d = connrotation.NewDialerWithTracker(c.Dial, a.connTracker)
  238. } else {
  239. d = a.defaultDialer
  240. }
  241. c.Dial = d.DialContext
  242. return nil
  243. }
  244. type roundTripper struct {
  245. a *Authenticator
  246. base http.RoundTripper
  247. }
  248. func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
  249. // If a user has already set credentials, use that. This makes commands like
  250. // "kubectl get --token (token) pods" work.
  251. if req.Header.Get("Authorization") != "" {
  252. return r.base.RoundTrip(req)
  253. }
  254. creds, err := r.a.getCreds()
  255. if err != nil {
  256. return nil, fmt.Errorf("getting credentials: %v", err)
  257. }
  258. if creds.token != "" {
  259. req.Header.Set("Authorization", "Bearer "+creds.token)
  260. }
  261. res, err := r.base.RoundTrip(req)
  262. if err != nil {
  263. return nil, err
  264. }
  265. if res.StatusCode == http.StatusUnauthorized {
  266. resp := &clientauthentication.Response{
  267. Header: res.Header,
  268. Code: int32(res.StatusCode),
  269. }
  270. if err := r.a.maybeRefreshCreds(creds, resp); err != nil {
  271. klog.Errorf("refreshing credentials: %v", err)
  272. }
  273. }
  274. return res, nil
  275. }
  276. func (a *Authenticator) credsExpired() bool {
  277. if a.exp.IsZero() {
  278. return false
  279. }
  280. return a.now().After(a.exp)
  281. }
  282. func (a *Authenticator) cert() (*tls.Certificate, error) {
  283. creds, err := a.getCreds()
  284. if err != nil {
  285. return nil, err
  286. }
  287. return creds.cert, nil
  288. }
  289. func (a *Authenticator) getCreds() (*credentials, error) {
  290. a.mu.Lock()
  291. defer a.mu.Unlock()
  292. if a.cachedCreds != nil && !a.credsExpired() {
  293. return a.cachedCreds, nil
  294. }
  295. if err := a.refreshCredsLocked(nil); err != nil {
  296. return nil, err
  297. }
  298. return a.cachedCreds, nil
  299. }
  300. // maybeRefreshCreds executes the plugin to force a rotation of the
  301. // credentials, unless they were rotated already.
  302. func (a *Authenticator) maybeRefreshCreds(creds *credentials, r *clientauthentication.Response) error {
  303. a.mu.Lock()
  304. defer a.mu.Unlock()
  305. // Since we're not making a new pointer to a.cachedCreds in getCreds, no
  306. // need to do deep comparison.
  307. if creds != a.cachedCreds {
  308. // Credentials already rotated.
  309. return nil
  310. }
  311. return a.refreshCredsLocked(r)
  312. }
  313. // refreshCredsLocked executes the plugin and reads the credentials from
  314. // stdout. It must be called while holding the Authenticator's mutex.
  315. func (a *Authenticator) refreshCredsLocked(r *clientauthentication.Response) error {
  316. cred := &clientauthentication.ExecCredential{
  317. Spec: clientauthentication.ExecCredentialSpec{
  318. Response: r,
  319. Interactive: a.interactive,
  320. },
  321. }
  322. if a.provideClusterInfo {
  323. cred.Spec.Cluster = a.cluster
  324. }
  325. env := append(a.environ(), a.env...)
  326. data, err := runtime.Encode(codecs.LegacyCodec(a.group), cred)
  327. if err != nil {
  328. return fmt.Errorf("encode ExecCredentials: %v", err)
  329. }
  330. env = append(env, fmt.Sprintf("%s=%s", execInfoEnv, data))
  331. stdout := &bytes.Buffer{}
  332. cmd := exec.Command(a.cmd, a.args...)
  333. cmd.Env = env
  334. cmd.Stderr = a.stderr
  335. cmd.Stdout = stdout
  336. if a.interactive {
  337. cmd.Stdin = a.stdin
  338. }
  339. err = cmd.Run()
  340. incrementCallsMetric(err)
  341. if err != nil {
  342. return a.wrapCmdRunErrorLocked(err)
  343. }
  344. _, gvk, err := codecs.UniversalDecoder(a.group).Decode(stdout.Bytes(), nil, cred)
  345. if err != nil {
  346. return fmt.Errorf("decoding stdout: %v", err)
  347. }
  348. if gvk.Group != a.group.Group || gvk.Version != a.group.Version {
  349. return fmt.Errorf("exec plugin is configured to use API version %s, plugin returned version %s",
  350. a.group, schema.GroupVersion{Group: gvk.Group, Version: gvk.Version})
  351. }
  352. if cred.Status == nil {
  353. return fmt.Errorf("exec plugin didn't return a status field")
  354. }
  355. if cred.Status.Token == "" && cred.Status.ClientCertificateData == "" && cred.Status.ClientKeyData == "" {
  356. return fmt.Errorf("exec plugin didn't return a token or cert/key pair")
  357. }
  358. if (cred.Status.ClientCertificateData == "") != (cred.Status.ClientKeyData == "") {
  359. return fmt.Errorf("exec plugin returned only certificate or key, not both")
  360. }
  361. if cred.Status.ExpirationTimestamp != nil {
  362. a.exp = cred.Status.ExpirationTimestamp.Time
  363. } else {
  364. a.exp = time.Time{}
  365. }
  366. newCreds := &credentials{
  367. token: cred.Status.Token,
  368. }
  369. if cred.Status.ClientKeyData != "" && cred.Status.ClientCertificateData != "" {
  370. cert, err := tls.X509KeyPair([]byte(cred.Status.ClientCertificateData), []byte(cred.Status.ClientKeyData))
  371. if err != nil {
  372. return fmt.Errorf("failed parsing client key/certificate: %v", err)
  373. }
  374. // Leaf is initialized to be nil:
  375. // https://golang.org/pkg/crypto/tls/#X509KeyPair
  376. // Leaf certificate is the first certificate:
  377. // https://golang.org/pkg/crypto/tls/#Certificate
  378. // Populating leaf is useful for quickly accessing the underlying x509
  379. // certificate values.
  380. cert.Leaf, err = x509.ParseCertificate(cert.Certificate[0])
  381. if err != nil {
  382. return fmt.Errorf("failed parsing client leaf certificate: %v", err)
  383. }
  384. newCreds.cert = &cert
  385. }
  386. oldCreds := a.cachedCreds
  387. a.cachedCreds = newCreds
  388. // Only close all connections when TLS cert rotates. Token rotation doesn't
  389. // need the extra noise.
  390. if oldCreds != nil && !reflect.DeepEqual(oldCreds.cert, a.cachedCreds.cert) {
  391. // Can be nil if the exec auth plugin only returned token auth.
  392. if oldCreds.cert != nil && oldCreds.cert.Leaf != nil {
  393. metrics.ClientCertRotationAge.Observe(time.Now().Sub(oldCreds.cert.Leaf.NotBefore))
  394. }
  395. a.connTracker.CloseAll()
  396. }
  397. expiry := time.Time{}
  398. if a.cachedCreds.cert != nil && a.cachedCreds.cert.Leaf != nil {
  399. expiry = a.cachedCreds.cert.Leaf.NotAfter
  400. }
  401. expirationMetrics.set(a, expiry)
  402. return nil
  403. }
  404. // wrapCmdRunErrorLocked pulls out the code to construct a helpful error message
  405. // for when the exec plugin's binary fails to Run().
  406. //
  407. // It must be called while holding the Authenticator's mutex.
  408. func (a *Authenticator) wrapCmdRunErrorLocked(err error) error {
  409. switch err.(type) {
  410. case *exec.Error: // Binary does not exist (see exec.Error).
  411. builder := strings.Builder{}
  412. fmt.Fprintf(&builder, "exec: executable %s not found", a.cmd)
  413. a.sometimes.Do(func() {
  414. fmt.Fprint(&builder, installHintVerboseHelp)
  415. if a.installHint != "" {
  416. fmt.Fprintf(&builder, "\n\n%s", a.installHint)
  417. }
  418. })
  419. return errors.New(builder.String())
  420. case *exec.ExitError: // Binary execution failed (see exec.Cmd.Run()).
  421. e := err.(*exec.ExitError)
  422. return fmt.Errorf(
  423. "exec: executable %s failed with exit code %d",
  424. a.cmd,
  425. e.ProcessState.ExitCode(),
  426. )
  427. default:
  428. return fmt.Errorf("exec: %v", err)
  429. }
  430. }