agent.go 34 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420
  1. package kubernetes
  2. import (
  3. "bufio"
  4. "bytes"
  5. "compress/gzip"
  6. "context"
  7. "encoding/base64"
  8. "encoding/json"
  9. "fmt"
  10. "io"
  11. "io/ioutil"
  12. "strings"
  13. "time"
  14. goerrors "errors"
  15. "github.com/porter-dev/porter/api/server/shared/config/env"
  16. "github.com/porter-dev/porter/api/server/shared/websocket"
  17. "github.com/porter-dev/porter/internal/kubernetes/provisioner"
  18. "github.com/porter-dev/porter/internal/kubernetes/provisioner/aws"
  19. "github.com/porter-dev/porter/internal/kubernetes/provisioner/aws/ecr"
  20. "github.com/porter-dev/porter/internal/kubernetes/provisioner/aws/eks"
  21. "github.com/porter-dev/porter/internal/kubernetes/provisioner/do"
  22. "github.com/porter-dev/porter/internal/kubernetes/provisioner/do/docr"
  23. "github.com/porter-dev/porter/internal/kubernetes/provisioner/do/doks"
  24. "github.com/porter-dev/porter/internal/kubernetes/provisioner/gcp"
  25. "github.com/porter-dev/porter/internal/kubernetes/provisioner/gcp/gke"
  26. "github.com/porter-dev/porter/internal/models"
  27. "github.com/porter-dev/porter/internal/models/integrations"
  28. "github.com/porter-dev/porter/internal/oauth"
  29. "github.com/porter-dev/porter/internal/registry"
  30. "github.com/porter-dev/porter/internal/repository"
  31. "golang.org/x/oauth2"
  32. errors2 "errors"
  33. "github.com/porter-dev/porter/internal/helm/grapher"
  34. appsv1 "k8s.io/api/apps/v1"
  35. batchv1 "k8s.io/api/batch/v1"
  36. batchv1beta1 "k8s.io/api/batch/v1beta1"
  37. v1 "k8s.io/api/core/v1"
  38. v1beta1 "k8s.io/api/extensions/v1beta1"
  39. "k8s.io/apimachinery/pkg/api/errors"
  40. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  41. "k8s.io/apimachinery/pkg/fields"
  42. "k8s.io/apimachinery/pkg/runtime"
  43. "k8s.io/apimachinery/pkg/runtime/schema"
  44. "k8s.io/apimachinery/pkg/types"
  45. "k8s.io/apimachinery/pkg/watch"
  46. "k8s.io/cli-runtime/pkg/genericclioptions"
  47. "k8s.io/client-go/informers"
  48. "k8s.io/client-go/kubernetes"
  49. "k8s.io/client-go/rest"
  50. "k8s.io/client-go/tools/cache"
  51. "k8s.io/client-go/tools/remotecommand"
  52. rspb "helm.sh/helm/v3/pkg/release"
  53. )
  54. // Agent is a Kubernetes agent for performing operations that interact with the
  55. // api server
  56. type Agent struct {
  57. RESTClientGetter genericclioptions.RESTClientGetter
  58. Clientset kubernetes.Interface
  59. }
  60. type Message struct {
  61. EventType string `json:"event_type"`
  62. Object interface{}
  63. Kind string
  64. }
  65. type ListOptions struct {
  66. FieldSelector string
  67. }
  68. type AuthError struct{}
  69. func (e *AuthError) Error() string {
  70. return "Unauthorized error"
  71. }
  72. // UpdateClientset updates the Agent's Clientset (this refreshes auth tokens)
  73. func (a *Agent) UpdateClientset() error {
  74. restConf, err := a.RESTClientGetter.ToRESTConfig()
  75. if err != nil {
  76. return err
  77. }
  78. clientset, err := kubernetes.NewForConfig(restConf)
  79. if err != nil {
  80. return err
  81. }
  82. a.Clientset = clientset
  83. return nil
  84. }
  85. // CreateConfigMap creates the configmap given the key-value pairs and namespace
  86. func (a *Agent) CreateConfigMap(name string, namespace string, configMap map[string]string) (*v1.ConfigMap, error) {
  87. return a.Clientset.CoreV1().ConfigMaps(namespace).Create(
  88. context.TODO(),
  89. &v1.ConfigMap{
  90. ObjectMeta: metav1.ObjectMeta{
  91. Name: name,
  92. Namespace: namespace,
  93. Labels: map[string]string{
  94. "porter": "true",
  95. },
  96. },
  97. Data: configMap,
  98. },
  99. metav1.CreateOptions{},
  100. )
  101. }
  102. // CreateLinkedSecret creates a secret given the key-value pairs and namespace. Values are
  103. // base64 encoded
  104. func (a *Agent) CreateLinkedSecret(name, namespace, cmName string, data map[string][]byte) (*v1.Secret, error) {
  105. return a.Clientset.CoreV1().Secrets(namespace).Create(
  106. context.TODO(),
  107. &v1.Secret{
  108. ObjectMeta: metav1.ObjectMeta{
  109. Name: name,
  110. Namespace: namespace,
  111. Labels: map[string]string{
  112. "porter": "true",
  113. "configmap": cmName,
  114. },
  115. },
  116. Data: data,
  117. },
  118. metav1.CreateOptions{},
  119. )
  120. }
  121. type mergeConfigMapData struct {
  122. Data map[string]*string `json:"data"`
  123. }
  124. // UpdateConfigMap updates the configmap given its name and namespace
  125. func (a *Agent) UpdateConfigMap(name string, namespace string, configMap map[string]string) (*v1.ConfigMap, error) {
  126. cmData := make(map[string]*string)
  127. for key, val := range configMap {
  128. valCopy := val
  129. cmData[key] = &valCopy
  130. if len(val) == 0 {
  131. cmData[key] = nil
  132. }
  133. }
  134. mergeCM := &mergeConfigMapData{
  135. Data: cmData,
  136. }
  137. patchBytes, err := json.Marshal(mergeCM)
  138. if err != nil {
  139. return nil, err
  140. }
  141. return a.Clientset.CoreV1().ConfigMaps(namespace).Patch(
  142. context.Background(),
  143. name,
  144. types.MergePatchType,
  145. patchBytes,
  146. metav1.PatchOptions{},
  147. )
  148. }
  149. type mergeLinkedSecretData struct {
  150. Data map[string]*[]byte `json:"data"`
  151. }
  152. // UpdateLinkedSecret updates the secret given its name and namespace
  153. func (a *Agent) UpdateLinkedSecret(name, namespace, cmName string, data map[string][]byte) error {
  154. secretData := make(map[string]*[]byte)
  155. for key, val := range data {
  156. valCopy := val
  157. secretData[key] = &valCopy
  158. if len(val) == 0 {
  159. secretData[key] = nil
  160. }
  161. }
  162. mergeSecret := &mergeLinkedSecretData{
  163. Data: secretData,
  164. }
  165. patchBytes, err := json.Marshal(mergeSecret)
  166. if err != nil {
  167. return err
  168. }
  169. _, err = a.Clientset.CoreV1().Secrets(namespace).Patch(
  170. context.TODO(),
  171. name,
  172. types.MergePatchType,
  173. patchBytes,
  174. metav1.PatchOptions{},
  175. )
  176. return err
  177. }
  178. // DeleteConfigMap deletes the configmap given its name and namespace
  179. func (a *Agent) DeleteConfigMap(name string, namespace string) error {
  180. return a.Clientset.CoreV1().ConfigMaps(namespace).Delete(
  181. context.TODO(),
  182. name,
  183. metav1.DeleteOptions{},
  184. )
  185. }
  186. // DeleteLinkedSecret deletes the secret given its name and namespace
  187. func (a *Agent) DeleteLinkedSecret(name, namespace string) error {
  188. return a.Clientset.CoreV1().Secrets(namespace).Delete(
  189. context.TODO(),
  190. name,
  191. metav1.DeleteOptions{},
  192. )
  193. }
  194. // GetConfigMap retrieves the configmap given its name and namespace
  195. func (a *Agent) GetConfigMap(name string, namespace string) (*v1.ConfigMap, error) {
  196. return a.Clientset.CoreV1().ConfigMaps(namespace).Get(
  197. context.TODO(),
  198. name,
  199. metav1.GetOptions{},
  200. )
  201. }
  202. // GetSecret retrieves the secret given its name and namespace
  203. func (a *Agent) GetSecret(name string, namespace string) (*v1.Secret, error) {
  204. return a.Clientset.CoreV1().Secrets(namespace).Get(
  205. context.TODO(),
  206. name,
  207. metav1.GetOptions{},
  208. )
  209. }
  210. // ListConfigMaps simply lists namespaces
  211. func (a *Agent) ListConfigMaps(namespace string) (*v1.ConfigMapList, error) {
  212. return a.Clientset.CoreV1().ConfigMaps(namespace).List(
  213. context.TODO(),
  214. metav1.ListOptions{
  215. LabelSelector: "porter=true",
  216. },
  217. )
  218. }
  219. // ListEvents lists the events of a given object.
  220. func (a *Agent) ListEvents(name string, namespace string) (*v1.EventList, error) {
  221. return a.Clientset.CoreV1().Events(namespace).List(
  222. context.TODO(),
  223. metav1.ListOptions{
  224. FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=%s", name, namespace),
  225. },
  226. )
  227. }
  228. // ListNamespaces simply lists namespaces
  229. func (a *Agent) ListNamespaces() (*v1.NamespaceList, error) {
  230. return a.Clientset.CoreV1().Namespaces().List(
  231. context.TODO(),
  232. metav1.ListOptions{},
  233. )
  234. }
  235. // CreateNamespace creates a namespace with the given name.
  236. func (a *Agent) CreateNamespace(name string) (*v1.Namespace, error) {
  237. namespace := v1.Namespace{
  238. ObjectMeta: metav1.ObjectMeta{
  239. Name: name,
  240. },
  241. }
  242. return a.Clientset.CoreV1().Namespaces().Create(
  243. context.TODO(),
  244. &namespace,
  245. metav1.CreateOptions{},
  246. )
  247. }
  248. // DeleteNamespace deletes the namespace given the name.
  249. func (a *Agent) DeleteNamespace(name string) error {
  250. return a.Clientset.CoreV1().Namespaces().Delete(
  251. context.TODO(),
  252. name,
  253. metav1.DeleteOptions{},
  254. )
  255. }
  256. // ListJobsByLabel lists jobs in a namespace matching a label
  257. type Label struct {
  258. Key string
  259. Val string
  260. }
  261. func (a *Agent) ListJobsByLabel(namespace string, labels ...Label) ([]batchv1.Job, error) {
  262. selectors := make([]string, 0)
  263. for _, label := range labels {
  264. selectors = append(selectors, fmt.Sprintf("%s=%s", label.Key, label.Val))
  265. }
  266. resp, err := a.Clientset.BatchV1().Jobs(namespace).List(
  267. context.TODO(),
  268. metav1.ListOptions{
  269. LabelSelector: strings.Join(selectors, ","),
  270. },
  271. )
  272. if err != nil {
  273. return nil, err
  274. }
  275. return resp.Items, nil
  276. }
  277. // DeleteJob deletes the job in the given name and namespace.
  278. func (a *Agent) DeleteJob(name, namespace string) error {
  279. return a.Clientset.BatchV1().Jobs(namespace).Delete(
  280. context.TODO(),
  281. name,
  282. metav1.DeleteOptions{},
  283. )
  284. }
  285. // GetJobPods lists all pods belonging to a job in a namespace
  286. func (a *Agent) GetJobPods(namespace, jobName string) ([]v1.Pod, error) {
  287. resp, err := a.Clientset.CoreV1().Pods(namespace).List(
  288. context.TODO(),
  289. metav1.ListOptions{
  290. LabelSelector: fmt.Sprintf("%s=%s", "job-name", jobName),
  291. },
  292. )
  293. if err != nil {
  294. return nil, err
  295. }
  296. return resp.Items, nil
  297. }
  298. // GetIngress gets ingress given the name and namespace
  299. func (a *Agent) GetIngress(namespace string, name string) (*v1beta1.Ingress, error) {
  300. resp, err := a.Clientset.ExtensionsV1beta1().Ingresses(namespace).Get(
  301. context.TODO(),
  302. name,
  303. metav1.GetOptions{},
  304. )
  305. if err != nil && errors.IsNotFound(err) {
  306. return nil, IsNotFoundError
  307. } else if err != nil {
  308. return nil, err
  309. }
  310. return resp, nil
  311. }
  312. var IsNotFoundError = fmt.Errorf("not found")
  313. type BadRequestError struct {
  314. msg string
  315. }
  316. func (e *BadRequestError) Error() string {
  317. return e.msg
  318. }
  319. // GetDeployment gets the deployment given the name and namespace
  320. func (a *Agent) GetDeployment(c grapher.Object) (*appsv1.Deployment, error) {
  321. res, err := a.Clientset.AppsV1().Deployments(c.Namespace).Get(
  322. context.TODO(),
  323. c.Name,
  324. metav1.GetOptions{},
  325. )
  326. if err != nil && errors.IsNotFound(err) {
  327. return nil, IsNotFoundError
  328. } else if err != nil {
  329. return nil, err
  330. }
  331. res.Kind = c.Kind
  332. return res, nil
  333. }
  334. // GetStatefulSet gets the statefulset given the name and namespace
  335. func (a *Agent) GetStatefulSet(c grapher.Object) (*appsv1.StatefulSet, error) {
  336. res, err := a.Clientset.AppsV1().StatefulSets(c.Namespace).Get(
  337. context.TODO(),
  338. c.Name,
  339. metav1.GetOptions{},
  340. )
  341. if err != nil && errors.IsNotFound(err) {
  342. return nil, IsNotFoundError
  343. } else if err != nil {
  344. return nil, err
  345. }
  346. res.Kind = c.Kind
  347. return res, nil
  348. }
  349. // GetReplicaSet gets the replicaset given the name and namespace
  350. func (a *Agent) GetReplicaSet(c grapher.Object) (*appsv1.ReplicaSet, error) {
  351. res, err := a.Clientset.AppsV1().ReplicaSets(c.Namespace).Get(
  352. context.TODO(),
  353. c.Name,
  354. metav1.GetOptions{},
  355. )
  356. if err != nil && errors.IsNotFound(err) {
  357. return nil, IsNotFoundError
  358. } else if err != nil {
  359. return nil, err
  360. }
  361. res.Kind = c.Kind
  362. return res, nil
  363. }
  364. // GetDaemonSet gets the daemonset by name and namespace
  365. func (a *Agent) GetDaemonSet(c grapher.Object) (*appsv1.DaemonSet, error) {
  366. res, err := a.Clientset.AppsV1().DaemonSets(c.Namespace).Get(
  367. context.TODO(),
  368. c.Name,
  369. metav1.GetOptions{},
  370. )
  371. if err != nil && errors.IsNotFound(err) {
  372. return nil, IsNotFoundError
  373. } else if err != nil {
  374. return nil, err
  375. }
  376. res.Kind = c.Kind
  377. return res, nil
  378. }
  379. // GetJob gets the job by name and namespace
  380. func (a *Agent) GetJob(c grapher.Object) (*batchv1.Job, error) {
  381. res, err := a.Clientset.BatchV1().Jobs(c.Namespace).Get(
  382. context.TODO(),
  383. c.Name,
  384. metav1.GetOptions{},
  385. )
  386. if err != nil && errors.IsNotFound(err) {
  387. return nil, IsNotFoundError
  388. } else if err != nil {
  389. return nil, err
  390. }
  391. res.Kind = c.Kind
  392. return res, nil
  393. }
  394. // GetCronJob gets the CronJob by name and namespace
  395. func (a *Agent) GetCronJob(c grapher.Object) (*batchv1beta1.CronJob, error) {
  396. res, err := a.Clientset.BatchV1beta1().CronJobs(c.Namespace).Get(
  397. context.TODO(),
  398. c.Name,
  399. metav1.GetOptions{},
  400. )
  401. if err != nil && errors.IsNotFound(err) {
  402. return nil, IsNotFoundError
  403. } else if err != nil {
  404. return nil, err
  405. }
  406. res.Kind = c.Kind
  407. return res, nil
  408. }
  409. // GetPodsByLabel retrieves pods with matching labels
  410. func (a *Agent) GetPodsByLabel(selector string, namespace string) (*v1.PodList, error) {
  411. // Search in all namespaces for matching pods
  412. return a.Clientset.CoreV1().Pods(namespace).List(
  413. context.TODO(),
  414. metav1.ListOptions{
  415. LabelSelector: selector,
  416. },
  417. )
  418. }
  419. // DeletePod deletes a pod by name and namespace
  420. func (a *Agent) DeletePod(namespace string, name string) error {
  421. err := a.Clientset.CoreV1().Pods(namespace).Delete(
  422. context.TODO(),
  423. name,
  424. metav1.DeleteOptions{},
  425. )
  426. if err != nil && errors.IsNotFound(err) {
  427. return IsNotFoundError
  428. }
  429. return err
  430. }
  431. // GetPodLogs streams real-time logs from a given pod.
  432. func (a *Agent) GetPodLogs(namespace string, name string, rw *websocket.WebsocketSafeReadWriter) error {
  433. // get the pod to read in the list of contains
  434. pod, err := a.Clientset.CoreV1().Pods(namespace).Get(
  435. context.Background(),
  436. name,
  437. metav1.GetOptions{},
  438. )
  439. if err != nil && errors.IsNotFound(err) {
  440. return IsNotFoundError
  441. } else if err != nil {
  442. return fmt.Errorf("Cannot get logs from pod %s: %s", name, err.Error())
  443. }
  444. // see if container is ready and able to open a stream. If not, wait for container
  445. // to be ready.
  446. err, _ = a.waitForPod(pod)
  447. if err != nil && goerrors.Is(err, IsNotFoundError) {
  448. return IsNotFoundError
  449. } else if err != nil {
  450. return fmt.Errorf("Cannot get logs from pod %s: %s", name, err.Error())
  451. }
  452. container := pod.Spec.Containers[0].Name
  453. tails := int64(400)
  454. // follow logs
  455. podLogOpts := v1.PodLogOptions{
  456. Follow: true,
  457. TailLines: &tails,
  458. Container: container,
  459. }
  460. req := a.Clientset.CoreV1().Pods(namespace).GetLogs(name, &podLogOpts)
  461. podLogs, err := req.Stream(context.TODO())
  462. // in the case of bad request errors, such as if the pod is stuck in "ContainerCreating",
  463. // we'd like to pass this through to the client.
  464. if err != nil && errors.IsBadRequest(err) {
  465. return &BadRequestError{err.Error()}
  466. } else if err != nil {
  467. return fmt.Errorf("Cannot open log stream for pod %s: %s", name, err.Error())
  468. }
  469. defer podLogs.Close()
  470. r := bufio.NewReader(podLogs)
  471. errorchan := make(chan error)
  472. go func() {
  473. // listens for websocket closing handshake
  474. for {
  475. if _, _, err := rw.ReadMessage(); err != nil {
  476. errorchan <- nil
  477. return
  478. }
  479. }
  480. }()
  481. go func() {
  482. for {
  483. select {
  484. case <-errorchan:
  485. defer close(errorchan)
  486. return
  487. default:
  488. }
  489. bytes, err := r.ReadBytes('\n')
  490. if _, writeErr := rw.Write(bytes); writeErr != nil {
  491. errorchan <- writeErr
  492. return
  493. }
  494. if err != nil {
  495. if err != io.EOF {
  496. errorchan <- err
  497. return
  498. }
  499. errorchan <- nil
  500. return
  501. }
  502. }
  503. }()
  504. for {
  505. select {
  506. case err = <-errorchan:
  507. return err
  508. }
  509. }
  510. }
  511. // StopJobWithJobSidecar sends a termination signal to a job running with a sidecar
  512. func (a *Agent) StopJobWithJobSidecar(namespace, name string) error {
  513. jobPods, err := a.GetJobPods(namespace, name)
  514. if err != nil {
  515. return err
  516. }
  517. podName := jobPods[0].ObjectMeta.Name
  518. restConf, err := a.RESTClientGetter.ToRESTConfig()
  519. restConf.GroupVersion = &schema.GroupVersion{
  520. Group: "api",
  521. Version: "v1",
  522. }
  523. restConf.NegotiatedSerializer = runtime.NewSimpleNegotiatedSerializer(runtime.SerializerInfo{})
  524. restClient, err := rest.RESTClientFor(restConf)
  525. if err != nil {
  526. return err
  527. }
  528. req := restClient.Post().
  529. Resource("pods").
  530. Name(podName).
  531. Namespace(namespace).
  532. SubResource("exec")
  533. req.Param("command", "./signal.sh")
  534. req.Param("container", "sidecar")
  535. req.Param("stdin", "true")
  536. req.Param("stdout", "false")
  537. req.Param("tty", "false")
  538. exec, err := remotecommand.NewSPDYExecutor(restConf, "POST", req.URL())
  539. if err != nil {
  540. return err
  541. }
  542. return exec.Stream(remotecommand.StreamOptions{
  543. Tty: false,
  544. Stdin: strings.NewReader("./signal.sh"),
  545. })
  546. }
  547. // RunWebsocketTask will run a websocket task. If the websocket returns an anauthorized error, it will restart
  548. // the task some number of times until failing
  549. func (a *Agent) RunWebsocketTask(task func() error) error {
  550. lastTime := int64(0)
  551. for {
  552. if err := a.UpdateClientset(); err != nil {
  553. return err
  554. }
  555. err := task()
  556. if err == nil {
  557. return nil
  558. }
  559. if !errors2.Is(err, &AuthError{}) {
  560. return err
  561. }
  562. if time.Now().Unix()-lastTime < 60 { // don't regenerate connection if too many unauthorized errors
  563. return err
  564. }
  565. lastTime = time.Now().Unix()
  566. }
  567. }
  568. // StreamControllerStatus streams controller status. Supports Deployment, StatefulSet, ReplicaSet, and DaemonSet
  569. // TODO: Support Jobs
  570. func (a *Agent) StreamControllerStatus(kind string, selectors string, rw *websocket.WebsocketSafeReadWriter) error {
  571. run := func() error {
  572. // selectors is an array of max length 1. StreamControllerStatus accepts calls without the selectors argument.
  573. // selectors argument is a single string with comma separated key=value pairs. (e.g. "app=porter,porter=true")
  574. tweakListOptionsFunc := func(options *metav1.ListOptions) {
  575. options.LabelSelector = selectors
  576. }
  577. factory := informers.NewSharedInformerFactoryWithOptions(
  578. a.Clientset,
  579. 0,
  580. informers.WithTweakListOptions(tweakListOptionsFunc),
  581. )
  582. var informer cache.SharedInformer
  583. // Spins up an informer depending on kind. Convert to lowercase for robustness
  584. switch strings.ToLower(kind) {
  585. case "deployment":
  586. informer = factory.Apps().V1().Deployments().Informer()
  587. case "statefulset":
  588. informer = factory.Apps().V1().StatefulSets().Informer()
  589. case "replicaset":
  590. informer = factory.Apps().V1().ReplicaSets().Informer()
  591. case "daemonset":
  592. informer = factory.Apps().V1().DaemonSets().Informer()
  593. case "job":
  594. informer = factory.Batch().V1().Jobs().Informer()
  595. case "cronjob":
  596. informer = factory.Batch().V1beta1().CronJobs().Informer()
  597. case "namespace":
  598. informer = factory.Core().V1().Namespaces().Informer()
  599. case "pod":
  600. informer = factory.Core().V1().Pods().Informer()
  601. }
  602. stopper := make(chan struct{})
  603. errorchan := make(chan error)
  604. defer close(stopper)
  605. informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
  606. if strings.HasSuffix(err.Error(), ": Unauthorized") {
  607. errorchan <- &AuthError{}
  608. }
  609. })
  610. informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
  611. UpdateFunc: func(oldObj, newObj interface{}) {
  612. msg := Message{
  613. EventType: "UPDATE",
  614. Object: newObj,
  615. Kind: strings.ToLower(kind),
  616. }
  617. rw.WriteJSONWithChannel(msg, errorchan)
  618. },
  619. AddFunc: func(obj interface{}) {
  620. msg := Message{
  621. EventType: "ADD",
  622. Object: obj,
  623. Kind: strings.ToLower(kind),
  624. }
  625. rw.WriteJSONWithChannel(msg, errorchan)
  626. },
  627. DeleteFunc: func(obj interface{}) {
  628. msg := Message{
  629. EventType: "DELETE",
  630. Object: obj,
  631. Kind: strings.ToLower(kind),
  632. }
  633. rw.WriteJSONWithChannel(msg, errorchan)
  634. },
  635. })
  636. go func() {
  637. // listens for websocket closing handshake
  638. for {
  639. if _, _, err := rw.ReadMessage(); err != nil {
  640. errorchan <- nil
  641. return
  642. }
  643. }
  644. }()
  645. go informer.Run(stopper)
  646. for {
  647. select {
  648. case err := <-errorchan:
  649. return err
  650. }
  651. }
  652. }
  653. return a.RunWebsocketTask(run)
  654. }
  655. var b64 = base64.StdEncoding
  656. var magicGzip = []byte{0x1f, 0x8b, 0x08}
  657. func decodeRelease(data string) (*rspb.Release, error) {
  658. // base64 decode string
  659. b, err := b64.DecodeString(data)
  660. if err != nil {
  661. return nil, err
  662. }
  663. // For backwards compatibility with releases that were stored before
  664. // compression was introduced we skip decompression if the
  665. // gzip magic header is not found
  666. if bytes.Equal(b[0:3], magicGzip) {
  667. r, err := gzip.NewReader(bytes.NewReader(b))
  668. if err != nil {
  669. return nil, err
  670. }
  671. defer r.Close()
  672. b2, err := ioutil.ReadAll(r)
  673. if err != nil {
  674. return nil, err
  675. }
  676. b = b2
  677. }
  678. var rls rspb.Release
  679. // unmarshal release object bytes
  680. if err := json.Unmarshal(b, &rls); err != nil {
  681. return nil, err
  682. }
  683. return &rls, nil
  684. }
  685. func contains(s []string, str string) bool {
  686. for _, v := range s {
  687. if v == str {
  688. return true
  689. }
  690. }
  691. return false
  692. }
  693. func parseSecretToHelmRelease(secret v1.Secret, chartList []string) (*rspb.Release, bool, error) {
  694. if secret.Type != "helm.sh/release.v1" {
  695. return nil, true, nil
  696. }
  697. releaseData, ok := secret.Data["release"]
  698. if !ok {
  699. return nil, true, fmt.Errorf("release field not found")
  700. }
  701. helm_object, err := decodeRelease(string(releaseData))
  702. if err != nil {
  703. return nil, true, err
  704. }
  705. if len(chartList) > 0 && !contains(chartList, helm_object.Name) {
  706. return nil, true, nil
  707. }
  708. return helm_object, false, nil
  709. }
  710. func (a *Agent) StreamHelmReleases(namespace string, chartList []string, selectors string, rw *websocket.WebsocketSafeReadWriter) error {
  711. run := func() error {
  712. tweakListOptionsFunc := func(options *metav1.ListOptions) {
  713. options.LabelSelector = selectors
  714. }
  715. factory := informers.NewSharedInformerFactoryWithOptions(
  716. a.Clientset,
  717. 0,
  718. informers.WithTweakListOptions(tweakListOptionsFunc),
  719. informers.WithNamespace(namespace),
  720. )
  721. informer := factory.Core().V1().Secrets().Informer()
  722. stopper := make(chan struct{})
  723. errorchan := make(chan error)
  724. defer close(stopper)
  725. informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
  726. if strings.HasSuffix(err.Error(), ": Unauthorized") {
  727. errorchan <- &AuthError{}
  728. }
  729. })
  730. informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
  731. UpdateFunc: func(oldObj, newObj interface{}) {
  732. secretObj, ok := newObj.(*v1.Secret)
  733. if !ok {
  734. errorchan <- fmt.Errorf("could not cast to secret")
  735. return
  736. }
  737. helm_object, isNotHelmRelease, err := parseSecretToHelmRelease(*secretObj, chartList)
  738. if isNotHelmRelease && err == nil {
  739. return
  740. }
  741. if err != nil {
  742. errorchan <- err
  743. return
  744. }
  745. msg := Message{
  746. EventType: "UPDATE",
  747. Object: helm_object,
  748. }
  749. rw.WriteJSONWithChannel(msg, errorchan)
  750. },
  751. AddFunc: func(obj interface{}) {
  752. secretObj, ok := obj.(*v1.Secret)
  753. if !ok {
  754. errorchan <- fmt.Errorf("could not cast to secret")
  755. return
  756. }
  757. helm_object, isNotHelmRelease, err := parseSecretToHelmRelease(*secretObj, chartList)
  758. if isNotHelmRelease && err == nil {
  759. return
  760. }
  761. if err != nil {
  762. errorchan <- err
  763. return
  764. }
  765. msg := Message{
  766. EventType: "ADD",
  767. Object: helm_object,
  768. }
  769. rw.WriteJSONWithChannel(msg, errorchan)
  770. },
  771. DeleteFunc: func(obj interface{}) {
  772. secretObj, ok := obj.(*v1.Secret)
  773. if !ok {
  774. errorchan <- fmt.Errorf("could not cast to secret")
  775. return
  776. }
  777. helm_object, isNotHelmRelease, err := parseSecretToHelmRelease(*secretObj, chartList)
  778. if isNotHelmRelease && err == nil {
  779. return
  780. }
  781. if err != nil {
  782. errorchan <- err
  783. return
  784. }
  785. msg := Message{
  786. EventType: "DELETE",
  787. Object: helm_object,
  788. }
  789. rw.WriteJSONWithChannel(msg, errorchan)
  790. },
  791. })
  792. go func() {
  793. // listens for websocket closing handshake
  794. for {
  795. if _, _, err := rw.ReadMessage(); err != nil {
  796. errorchan <- nil
  797. return
  798. }
  799. }
  800. }()
  801. go informer.Run(stopper)
  802. for {
  803. select {
  804. case err := <-errorchan:
  805. return err
  806. }
  807. }
  808. }
  809. return a.RunWebsocketTask(run)
  810. }
  811. type SharedProvisionOpts struct {
  812. ProjectID uint
  813. Repo repository.Repository
  814. Infra *models.Infra
  815. Operation provisioner.ProvisionerOperation
  816. PGConf *env.DBConf
  817. RedisConf *env.RedisConf
  818. ProvImageTag string
  819. ProvImagePullSecret string
  820. }
  821. // ProvisionECR spawns a new provisioning pod that creates an ECR instance
  822. func (a *Agent) ProvisionECR(
  823. opts *SharedProvisionOpts,
  824. awsConf *integrations.AWSIntegration,
  825. ecrName string,
  826. ) (*batchv1.Job, error) {
  827. id := opts.Infra.GetUniqueName()
  828. prov := &provisioner.Conf{
  829. ID: id,
  830. Name: fmt.Sprintf("prov-%s-%s", id, string(opts.Operation)),
  831. Kind: provisioner.ECR,
  832. Operation: opts.Operation,
  833. Redis: opts.RedisConf,
  834. Postgres: opts.PGConf,
  835. ProvisionerImageTag: opts.ProvImageTag,
  836. ImagePullSecret: opts.ProvImagePullSecret,
  837. LastApplied: opts.Infra.LastApplied,
  838. AWS: &aws.Conf{
  839. AWSRegion: awsConf.AWSRegion,
  840. AWSAccessKeyID: string(awsConf.AWSAccessKeyID),
  841. AWSSecretAccessKey: string(awsConf.AWSSecretAccessKey),
  842. },
  843. ECR: &ecr.Conf{
  844. ECRName: ecrName,
  845. },
  846. }
  847. return a.provision(prov, opts.Infra, opts.Repo)
  848. }
  849. // ProvisionEKS spawns a new provisioning pod that creates an EKS instance
  850. func (a *Agent) ProvisionEKS(
  851. opts *SharedProvisionOpts,
  852. awsConf *integrations.AWSIntegration,
  853. eksName, machineType string,
  854. ) (*batchv1.Job, error) {
  855. id := opts.Infra.GetUniqueName()
  856. prov := &provisioner.Conf{
  857. ID: id,
  858. Name: fmt.Sprintf("prov-%s-%s", id, string(opts.Operation)),
  859. Kind: provisioner.EKS,
  860. Operation: opts.Operation,
  861. Redis: opts.RedisConf,
  862. Postgres: opts.PGConf,
  863. ProvisionerImageTag: opts.ProvImageTag,
  864. ImagePullSecret: opts.ProvImagePullSecret,
  865. LastApplied: opts.Infra.LastApplied,
  866. AWS: &aws.Conf{
  867. AWSRegion: awsConf.AWSRegion,
  868. AWSAccessKeyID: string(awsConf.AWSAccessKeyID),
  869. AWSSecretAccessKey: string(awsConf.AWSSecretAccessKey),
  870. },
  871. EKS: &eks.Conf{
  872. ClusterName: eksName,
  873. MachineType: machineType,
  874. },
  875. }
  876. return a.provision(prov, opts.Infra, opts.Repo)
  877. }
  878. // ProvisionGCR spawns a new provisioning pod that creates a GCR instance
  879. func (a *Agent) ProvisionGCR(
  880. opts *SharedProvisionOpts,
  881. gcpConf *integrations.GCPIntegration,
  882. ) (*batchv1.Job, error) {
  883. id := opts.Infra.GetUniqueName()
  884. prov := &provisioner.Conf{
  885. ID: id,
  886. Name: fmt.Sprintf("prov-%s-%s", id, string(opts.Operation)),
  887. Kind: provisioner.GCR,
  888. Operation: opts.Operation,
  889. Redis: opts.RedisConf,
  890. Postgres: opts.PGConf,
  891. ProvisionerImageTag: opts.ProvImageTag,
  892. ImagePullSecret: opts.ProvImagePullSecret,
  893. LastApplied: opts.Infra.LastApplied,
  894. GCP: &gcp.Conf{
  895. GCPRegion: gcpConf.GCPRegion,
  896. GCPProjectID: gcpConf.GCPProjectID,
  897. GCPKeyData: string(gcpConf.GCPKeyData),
  898. },
  899. }
  900. return a.provision(prov, opts.Infra, opts.Repo)
  901. }
  902. // ProvisionGKE spawns a new provisioning pod that creates a GKE instance
  903. func (a *Agent) ProvisionGKE(
  904. opts *SharedProvisionOpts,
  905. gcpConf *integrations.GCPIntegration,
  906. gkeName string,
  907. ) (*batchv1.Job, error) {
  908. id := opts.Infra.GetUniqueName()
  909. prov := &provisioner.Conf{
  910. ID: id,
  911. Name: fmt.Sprintf("prov-%s-%s", id, string(opts.Operation)),
  912. Kind: provisioner.GKE,
  913. Operation: opts.Operation,
  914. Redis: opts.RedisConf,
  915. Postgres: opts.PGConf,
  916. ProvisionerImageTag: opts.ProvImageTag,
  917. ImagePullSecret: opts.ProvImagePullSecret,
  918. LastApplied: opts.Infra.LastApplied,
  919. GCP: &gcp.Conf{
  920. GCPRegion: gcpConf.GCPRegion,
  921. GCPProjectID: gcpConf.GCPProjectID,
  922. GCPKeyData: string(gcpConf.GCPKeyData),
  923. },
  924. GKE: &gke.Conf{
  925. ClusterName: gkeName,
  926. },
  927. }
  928. return a.provision(prov, opts.Infra, opts.Repo)
  929. }
  930. // ProvisionDOCR spawns a new provisioning pod that creates a DOCR instance
  931. func (a *Agent) ProvisionDOCR(
  932. opts *SharedProvisionOpts,
  933. doConf *integrations.OAuthIntegration,
  934. doAuth *oauth2.Config,
  935. docrName, docrSubscriptionTier string,
  936. ) (*batchv1.Job, error) {
  937. // get the token
  938. oauthInt, err := opts.Repo.OAuthIntegration().ReadOAuthIntegration(
  939. opts.ProjectID,
  940. opts.Infra.DOIntegrationID,
  941. )
  942. if err != nil {
  943. return nil, err
  944. }
  945. tok, _, err := oauth.GetAccessToken(oauthInt.SharedOAuthModel, doAuth, oauth.MakeUpdateOAuthIntegrationTokenFunction(oauthInt, opts.Repo))
  946. if err != nil {
  947. return nil, err
  948. }
  949. id := opts.Infra.GetUniqueName()
  950. prov := &provisioner.Conf{
  951. ID: id,
  952. Name: fmt.Sprintf("prov-%s-%s", id, string(opts.Operation)),
  953. Kind: provisioner.DOCR,
  954. Operation: opts.Operation,
  955. Redis: opts.RedisConf,
  956. Postgres: opts.PGConf,
  957. ProvisionerImageTag: opts.ProvImageTag,
  958. ImagePullSecret: opts.ProvImagePullSecret,
  959. LastApplied: opts.Infra.LastApplied,
  960. DO: &do.Conf{
  961. DOToken: tok,
  962. },
  963. DOCR: &docr.Conf{
  964. DOCRName: docrName,
  965. DOCRSubscriptionTier: docrSubscriptionTier,
  966. },
  967. }
  968. return a.provision(prov, opts.Infra, opts.Repo)
  969. }
  970. // ProvisionDOKS spawns a new provisioning pod that creates a DOKS instance
  971. func (a *Agent) ProvisionDOKS(
  972. opts *SharedProvisionOpts,
  973. doConf *integrations.OAuthIntegration,
  974. doAuth *oauth2.Config,
  975. doRegion, doksClusterName string,
  976. ) (*batchv1.Job, error) {
  977. // get the token
  978. oauthInt, err := opts.Repo.OAuthIntegration().ReadOAuthIntegration(
  979. opts.ProjectID,
  980. opts.Infra.DOIntegrationID,
  981. )
  982. if err != nil {
  983. return nil, err
  984. }
  985. tok, _, err := oauth.GetAccessToken(oauthInt.SharedOAuthModel, doAuth, oauth.MakeUpdateOAuthIntegrationTokenFunction(oauthInt, opts.Repo))
  986. if err != nil {
  987. return nil, err
  988. }
  989. id := opts.Infra.GetUniqueName()
  990. prov := &provisioner.Conf{
  991. ID: id,
  992. Name: fmt.Sprintf("prov-%s-%s", id, string(opts.Operation)),
  993. Kind: provisioner.DOKS,
  994. Operation: opts.Operation,
  995. Redis: opts.RedisConf,
  996. Postgres: opts.PGConf,
  997. LastApplied: opts.Infra.LastApplied,
  998. ProvisionerImageTag: opts.ProvImageTag,
  999. ImagePullSecret: opts.ProvImagePullSecret,
  1000. DO: &do.Conf{
  1001. DOToken: tok,
  1002. },
  1003. DOKS: &doks.Conf{
  1004. DORegion: doRegion,
  1005. DOKSClusterName: doksClusterName,
  1006. },
  1007. }
  1008. return a.provision(prov, opts.Infra, opts.Repo)
  1009. }
  1010. // ProvisionTest spawns a new provisioning pod that tests provisioning
  1011. func (a *Agent) ProvisionTest(
  1012. opts *SharedProvisionOpts,
  1013. ) (*batchv1.Job, error) {
  1014. id := opts.Infra.GetUniqueName()
  1015. prov := &provisioner.Conf{
  1016. ID: id,
  1017. Name: fmt.Sprintf("prov-%s-%s", id, string(opts.Operation)),
  1018. Operation: opts.Operation,
  1019. Kind: provisioner.Test,
  1020. Redis: opts.RedisConf,
  1021. Postgres: opts.PGConf,
  1022. ProvisionerImageTag: opts.ProvImageTag,
  1023. ImagePullSecret: opts.ProvImagePullSecret,
  1024. }
  1025. return a.provision(prov, opts.Infra, opts.Repo)
  1026. }
  1027. func (a *Agent) provision(
  1028. prov *provisioner.Conf,
  1029. infra *models.Infra,
  1030. repo repository.Repository,
  1031. ) (*batchv1.Job, error) {
  1032. prov.Namespace = "default"
  1033. job, err := prov.GetProvisionerJobTemplate()
  1034. if err != nil {
  1035. return nil, err
  1036. }
  1037. job, err = a.Clientset.BatchV1().Jobs(prov.Namespace).Create(
  1038. context.TODO(),
  1039. job,
  1040. metav1.CreateOptions{},
  1041. )
  1042. if err != nil {
  1043. return nil, err
  1044. }
  1045. infra.LastApplied = prov.LastApplied
  1046. infra, err = repo.Infra().UpdateInfra(infra)
  1047. if err != nil {
  1048. return nil, err
  1049. }
  1050. return job, nil
  1051. }
  1052. // CreateImagePullSecrets will create the required image pull secrets and
  1053. // return a map from the registry name to the name of the secret.
  1054. func (a *Agent) CreateImagePullSecrets(
  1055. repo repository.Repository,
  1056. namespace string,
  1057. linkedRegs map[string]*models.Registry,
  1058. doAuth *oauth2.Config,
  1059. ) (map[string]string, error) {
  1060. res := make(map[string]string)
  1061. for key, val := range linkedRegs {
  1062. _reg := registry.Registry(*val)
  1063. data, err := _reg.GetDockerConfigJSON(repo, doAuth)
  1064. if err != nil {
  1065. return nil, err
  1066. }
  1067. secretName := fmt.Sprintf("porter-%s-%d", val.ToRegistryType().Service, val.ID)
  1068. secret, err := a.Clientset.CoreV1().Secrets(namespace).Get(
  1069. context.TODO(),
  1070. secretName,
  1071. metav1.GetOptions{},
  1072. )
  1073. // if not found, create the secret
  1074. if err != nil && errors.IsNotFound(err) {
  1075. _, err = a.Clientset.CoreV1().Secrets(namespace).Create(
  1076. context.TODO(),
  1077. &v1.Secret{
  1078. ObjectMeta: metav1.ObjectMeta{
  1079. Name: secretName,
  1080. },
  1081. Data: map[string][]byte{
  1082. string(v1.DockerConfigJsonKey): data,
  1083. },
  1084. Type: v1.SecretTypeDockerConfigJson,
  1085. },
  1086. metav1.CreateOptions{},
  1087. )
  1088. if err != nil {
  1089. return nil, err
  1090. }
  1091. // add secret name to the map
  1092. res[key] = secretName
  1093. continue
  1094. } else if err != nil {
  1095. return nil, err
  1096. }
  1097. // otherwise, check that the secret contains the correct data: if
  1098. // if doesn't, update it
  1099. if !bytes.Equal(secret.Data[v1.DockerConfigJsonKey], data) {
  1100. _, err := a.Clientset.CoreV1().Secrets(namespace).Update(
  1101. context.TODO(),
  1102. &v1.Secret{
  1103. ObjectMeta: metav1.ObjectMeta{
  1104. Name: secretName,
  1105. },
  1106. Data: map[string][]byte{
  1107. string(v1.DockerConfigJsonKey): data,
  1108. },
  1109. Type: v1.SecretTypeDockerConfigJson,
  1110. },
  1111. metav1.UpdateOptions{},
  1112. )
  1113. if err != nil {
  1114. return nil, err
  1115. }
  1116. }
  1117. // add secret name to the map
  1118. res[key] = secretName
  1119. }
  1120. return res, nil
  1121. }
  1122. // helper that waits for pod to be ready
  1123. func (a *Agent) waitForPod(pod *v1.Pod) (error, bool) {
  1124. var (
  1125. w watch.Interface
  1126. err error
  1127. ok bool
  1128. )
  1129. // immediately after creating a pod, the API may return a 404. heuristically 1
  1130. // second seems to be plenty.
  1131. watchRetries := 3
  1132. for i := 0; i < watchRetries; i++ {
  1133. selector := fields.OneTermEqualSelector("metadata.name", pod.Name).String()
  1134. w, err = a.Clientset.CoreV1().
  1135. Pods(pod.Namespace).
  1136. Watch(context.Background(), metav1.ListOptions{FieldSelector: selector})
  1137. if err == nil {
  1138. break
  1139. }
  1140. time.Sleep(time.Second)
  1141. }
  1142. if err != nil {
  1143. return err, false
  1144. }
  1145. defer w.Stop()
  1146. for {
  1147. select {
  1148. case <-time.After(time.Second * 30):
  1149. return goerrors.New("timed out waiting for pod"), false
  1150. case <-time.Tick(time.Second):
  1151. // poll every second in case we already missed the ready event while
  1152. // creating the listener.
  1153. pod, err = a.Clientset.CoreV1().
  1154. Pods(pod.Namespace).
  1155. Get(context.Background(), pod.Name, metav1.GetOptions{})
  1156. if err != nil && errors.IsNotFound(err) {
  1157. return IsNotFoundError, false
  1158. } else if err != nil {
  1159. return err, false
  1160. }
  1161. if isExited := isPodExited(pod); isExited || isPodReady(pod) {
  1162. return nil, isExited
  1163. }
  1164. case evt := <-w.ResultChan():
  1165. pod, ok = evt.Object.(*v1.Pod)
  1166. if !ok {
  1167. return fmt.Errorf("unexpected object type: %T", evt.Object), false
  1168. }
  1169. if isExited := isPodExited(pod); isExited || isPodReady(pod) {
  1170. return nil, isExited
  1171. }
  1172. }
  1173. }
  1174. }
  1175. func isPodReady(pod *v1.Pod) bool {
  1176. ready := false
  1177. conditions := pod.Status.Conditions
  1178. for i := range conditions {
  1179. if conditions[i].Type == v1.PodReady {
  1180. ready = pod.Status.Conditions[i].Status == v1.ConditionTrue
  1181. }
  1182. }
  1183. return ready
  1184. }
  1185. func isPodExited(pod *v1.Pod) bool {
  1186. return pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed
  1187. }