agent.go 34 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423
  1. package kubernetes
  2. import (
  3. "bufio"
  4. "bytes"
  5. "compress/gzip"
  6. "context"
  7. "encoding/base64"
  8. "encoding/json"
  9. "fmt"
  10. "io"
  11. "io/ioutil"
  12. "strings"
  13. "time"
  14. goerrors "errors"
  15. "github.com/porter-dev/porter/api/server/shared/config/env"
  16. "github.com/porter-dev/porter/api/server/shared/websocket"
  17. "github.com/porter-dev/porter/internal/kubernetes/provisioner"
  18. "github.com/porter-dev/porter/internal/kubernetes/provisioner/aws"
  19. "github.com/porter-dev/porter/internal/kubernetes/provisioner/aws/ecr"
  20. "github.com/porter-dev/porter/internal/kubernetes/provisioner/aws/eks"
  21. "github.com/porter-dev/porter/internal/kubernetes/provisioner/do"
  22. "github.com/porter-dev/porter/internal/kubernetes/provisioner/do/docr"
  23. "github.com/porter-dev/porter/internal/kubernetes/provisioner/do/doks"
  24. "github.com/porter-dev/porter/internal/kubernetes/provisioner/gcp"
  25. "github.com/porter-dev/porter/internal/kubernetes/provisioner/gcp/gke"
  26. "github.com/porter-dev/porter/internal/models"
  27. "github.com/porter-dev/porter/internal/models/integrations"
  28. "github.com/porter-dev/porter/internal/oauth"
  29. "github.com/porter-dev/porter/internal/registry"
  30. "github.com/porter-dev/porter/internal/repository"
  31. "golang.org/x/oauth2"
  32. errors2 "errors"
  33. "github.com/porter-dev/porter/internal/helm/grapher"
  34. appsv1 "k8s.io/api/apps/v1"
  35. batchv1 "k8s.io/api/batch/v1"
  36. batchv1beta1 "k8s.io/api/batch/v1beta1"
  37. v1 "k8s.io/api/core/v1"
  38. v1beta1 "k8s.io/api/extensions/v1beta1"
  39. "k8s.io/apimachinery/pkg/api/errors"
  40. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  41. "k8s.io/apimachinery/pkg/fields"
  42. "k8s.io/apimachinery/pkg/runtime"
  43. "k8s.io/apimachinery/pkg/runtime/schema"
  44. "k8s.io/apimachinery/pkg/types"
  45. "k8s.io/apimachinery/pkg/watch"
  46. "k8s.io/cli-runtime/pkg/genericclioptions"
  47. "k8s.io/client-go/informers"
  48. "k8s.io/client-go/kubernetes"
  49. "k8s.io/client-go/rest"
  50. "k8s.io/client-go/tools/cache"
  51. "k8s.io/client-go/tools/remotecommand"
  52. rspb "helm.sh/helm/v3/pkg/release"
  53. )
  54. // Agent is a Kubernetes agent for performing operations that interact with the
  55. // api server
  56. type Agent struct {
  57. RESTClientGetter genericclioptions.RESTClientGetter
  58. Clientset kubernetes.Interface
  59. }
  60. type Message struct {
  61. EventType string `json:"event_type"`
  62. Object interface{}
  63. Kind string
  64. }
  65. type ListOptions struct {
  66. FieldSelector string
  67. }
  68. type AuthError struct{}
  69. func (e *AuthError) Error() string {
  70. return "Unauthorized error"
  71. }
  72. // UpdateClientset updates the Agent's Clientset (this refreshes auth tokens)
  73. func (a *Agent) UpdateClientset() error {
  74. restConf, err := a.RESTClientGetter.ToRESTConfig()
  75. if err != nil {
  76. return err
  77. }
  78. clientset, err := kubernetes.NewForConfig(restConf)
  79. if err != nil {
  80. return err
  81. }
  82. a.Clientset = clientset
  83. return nil
  84. }
  85. // CreateConfigMap creates the configmap given the key-value pairs and namespace
  86. func (a *Agent) CreateConfigMap(name string, namespace string, configMap map[string]string) (*v1.ConfigMap, error) {
  87. return a.Clientset.CoreV1().ConfigMaps(namespace).Create(
  88. context.TODO(),
  89. &v1.ConfigMap{
  90. ObjectMeta: metav1.ObjectMeta{
  91. Name: name,
  92. Namespace: namespace,
  93. Labels: map[string]string{
  94. "porter": "true",
  95. },
  96. },
  97. Data: configMap,
  98. },
  99. metav1.CreateOptions{},
  100. )
  101. }
  102. // CreateLinkedSecret creates a secret given the key-value pairs and namespace. Values are
  103. // base64 encoded
  104. func (a *Agent) CreateLinkedSecret(name, namespace, cmName string, data map[string][]byte) (*v1.Secret, error) {
  105. return a.Clientset.CoreV1().Secrets(namespace).Create(
  106. context.TODO(),
  107. &v1.Secret{
  108. ObjectMeta: metav1.ObjectMeta{
  109. Name: name,
  110. Namespace: namespace,
  111. Labels: map[string]string{
  112. "porter": "true",
  113. "configmap": cmName,
  114. },
  115. },
  116. Data: data,
  117. },
  118. metav1.CreateOptions{},
  119. )
  120. }
  121. type mergeConfigMapData struct {
  122. Data map[string]*string `json:"data"`
  123. }
  124. // UpdateConfigMap updates the configmap given its name and namespace
  125. func (a *Agent) UpdateConfigMap(name string, namespace string, configMap map[string]string) (*v1.ConfigMap, error) {
  126. cmData := make(map[string]*string)
  127. for key, val := range configMap {
  128. valCopy := val
  129. cmData[key] = &valCopy
  130. if len(val) == 0 {
  131. cmData[key] = nil
  132. }
  133. }
  134. mergeCM := &mergeConfigMapData{
  135. Data: cmData,
  136. }
  137. patchBytes, err := json.Marshal(mergeCM)
  138. if err != nil {
  139. return nil, err
  140. }
  141. return a.Clientset.CoreV1().ConfigMaps(namespace).Patch(
  142. context.Background(),
  143. name,
  144. types.MergePatchType,
  145. patchBytes,
  146. metav1.PatchOptions{},
  147. )
  148. }
  149. type mergeLinkedSecretData struct {
  150. Data map[string]*[]byte `json:"data"`
  151. }
  152. // UpdateLinkedSecret updates the secret given its name and namespace
  153. func (a *Agent) UpdateLinkedSecret(name, namespace, cmName string, data map[string][]byte) error {
  154. secretData := make(map[string]*[]byte)
  155. for key, val := range data {
  156. valCopy := val
  157. secretData[key] = &valCopy
  158. if len(val) == 0 {
  159. secretData[key] = nil
  160. }
  161. }
  162. mergeSecret := &mergeLinkedSecretData{
  163. Data: secretData,
  164. }
  165. patchBytes, err := json.Marshal(mergeSecret)
  166. if err != nil {
  167. return err
  168. }
  169. _, err = a.Clientset.CoreV1().Secrets(namespace).Patch(
  170. context.TODO(),
  171. name,
  172. types.MergePatchType,
  173. patchBytes,
  174. metav1.PatchOptions{},
  175. )
  176. return err
  177. }
  178. // DeleteConfigMap deletes the configmap given its name and namespace
  179. func (a *Agent) DeleteConfigMap(name string, namespace string) error {
  180. return a.Clientset.CoreV1().ConfigMaps(namespace).Delete(
  181. context.TODO(),
  182. name,
  183. metav1.DeleteOptions{},
  184. )
  185. }
  186. // DeleteLinkedSecret deletes the secret given its name and namespace
  187. func (a *Agent) DeleteLinkedSecret(name, namespace string) error {
  188. return a.Clientset.CoreV1().Secrets(namespace).Delete(
  189. context.TODO(),
  190. name,
  191. metav1.DeleteOptions{},
  192. )
  193. }
  194. // GetConfigMap retrieves the configmap given its name and namespace
  195. func (a *Agent) GetConfigMap(name string, namespace string) (*v1.ConfigMap, error) {
  196. return a.Clientset.CoreV1().ConfigMaps(namespace).Get(
  197. context.TODO(),
  198. name,
  199. metav1.GetOptions{},
  200. )
  201. }
  202. // GetSecret retrieves the secret given its name and namespace
  203. func (a *Agent) GetSecret(name string, namespace string) (*v1.Secret, error) {
  204. return a.Clientset.CoreV1().Secrets(namespace).Get(
  205. context.TODO(),
  206. name,
  207. metav1.GetOptions{},
  208. )
  209. }
  210. // ListConfigMaps simply lists namespaces
  211. func (a *Agent) ListConfigMaps(namespace string) (*v1.ConfigMapList, error) {
  212. return a.Clientset.CoreV1().ConfigMaps(namespace).List(
  213. context.TODO(),
  214. metav1.ListOptions{
  215. LabelSelector: "porter=true",
  216. },
  217. )
  218. }
  219. // ListEvents lists the events of a given object.
  220. func (a *Agent) ListEvents(name string, namespace string) (*v1.EventList, error) {
  221. return a.Clientset.CoreV1().Events(namespace).List(
  222. context.TODO(),
  223. metav1.ListOptions{
  224. FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=%s", name, namespace),
  225. },
  226. )
  227. }
  228. // ListNamespaces simply lists namespaces
  229. func (a *Agent) ListNamespaces() (*v1.NamespaceList, error) {
  230. return a.Clientset.CoreV1().Namespaces().List(
  231. context.TODO(),
  232. metav1.ListOptions{},
  233. )
  234. }
  235. // CreateNamespace creates a namespace with the given name.
  236. func (a *Agent) CreateNamespace(name string) (*v1.Namespace, error) {
  237. namespace := v1.Namespace{
  238. ObjectMeta: metav1.ObjectMeta{
  239. Name: name,
  240. },
  241. }
  242. return a.Clientset.CoreV1().Namespaces().Create(
  243. context.TODO(),
  244. &namespace,
  245. metav1.CreateOptions{},
  246. )
  247. }
  248. // DeleteNamespace deletes the namespace given the name.
  249. func (a *Agent) DeleteNamespace(name string) error {
  250. return a.Clientset.CoreV1().Namespaces().Delete(
  251. context.TODO(),
  252. name,
  253. metav1.DeleteOptions{},
  254. )
  255. }
  256. // ListJobsByLabel lists jobs in a namespace matching a label
  257. type Label struct {
  258. Key string
  259. Val string
  260. }
  261. func (a *Agent) ListJobsByLabel(namespace string, labels ...Label) ([]batchv1.Job, error) {
  262. selectors := make([]string, 0)
  263. for _, label := range labels {
  264. selectors = append(selectors, fmt.Sprintf("%s=%s", label.Key, label.Val))
  265. }
  266. resp, err := a.Clientset.BatchV1().Jobs(namespace).List(
  267. context.TODO(),
  268. metav1.ListOptions{
  269. LabelSelector: strings.Join(selectors, ","),
  270. },
  271. )
  272. if err != nil {
  273. return nil, err
  274. }
  275. return resp.Items, nil
  276. }
  277. // DeleteJob deletes the job in the given name and namespace.
  278. func (a *Agent) DeleteJob(name, namespace string) error {
  279. return a.Clientset.BatchV1().Jobs(namespace).Delete(
  280. context.TODO(),
  281. name,
  282. metav1.DeleteOptions{},
  283. )
  284. }
  285. // GetJobPods lists all pods belonging to a job in a namespace
  286. func (a *Agent) GetJobPods(namespace, jobName string) ([]v1.Pod, error) {
  287. resp, err := a.Clientset.CoreV1().Pods(namespace).List(
  288. context.TODO(),
  289. metav1.ListOptions{
  290. LabelSelector: fmt.Sprintf("%s=%s", "job-name", jobName),
  291. },
  292. )
  293. if err != nil {
  294. return nil, err
  295. }
  296. return resp.Items, nil
  297. }
  298. // GetIngress gets ingress given the name and namespace
  299. func (a *Agent) GetIngress(namespace string, name string) (*v1beta1.Ingress, error) {
  300. resp, err := a.Clientset.ExtensionsV1beta1().Ingresses(namespace).Get(
  301. context.TODO(),
  302. name,
  303. metav1.GetOptions{},
  304. )
  305. if err != nil && errors.IsNotFound(err) {
  306. return nil, IsNotFoundError
  307. } else if err != nil {
  308. return nil, err
  309. }
  310. return resp, nil
  311. }
  312. var IsNotFoundError = fmt.Errorf("not found")
  313. type BadRequestError struct {
  314. msg string
  315. }
  316. func (e *BadRequestError) Error() string {
  317. return e.msg
  318. }
  319. // GetDeployment gets the deployment given the name and namespace
  320. func (a *Agent) GetDeployment(c grapher.Object) (*appsv1.Deployment, error) {
  321. res, err := a.Clientset.AppsV1().Deployments(c.Namespace).Get(
  322. context.TODO(),
  323. c.Name,
  324. metav1.GetOptions{},
  325. )
  326. if err != nil && errors.IsNotFound(err) {
  327. return nil, IsNotFoundError
  328. } else if err != nil {
  329. return nil, err
  330. }
  331. res.Kind = c.Kind
  332. return res, nil
  333. }
  334. // GetStatefulSet gets the statefulset given the name and namespace
  335. func (a *Agent) GetStatefulSet(c grapher.Object) (*appsv1.StatefulSet, error) {
  336. res, err := a.Clientset.AppsV1().StatefulSets(c.Namespace).Get(
  337. context.TODO(),
  338. c.Name,
  339. metav1.GetOptions{},
  340. )
  341. if err != nil && errors.IsNotFound(err) {
  342. return nil, IsNotFoundError
  343. } else if err != nil {
  344. return nil, err
  345. }
  346. res.Kind = c.Kind
  347. return res, nil
  348. }
  349. // GetReplicaSet gets the replicaset given the name and namespace
  350. func (a *Agent) GetReplicaSet(c grapher.Object) (*appsv1.ReplicaSet, error) {
  351. res, err := a.Clientset.AppsV1().ReplicaSets(c.Namespace).Get(
  352. context.TODO(),
  353. c.Name,
  354. metav1.GetOptions{},
  355. )
  356. if err != nil && errors.IsNotFound(err) {
  357. return nil, IsNotFoundError
  358. } else if err != nil {
  359. return nil, err
  360. }
  361. res.Kind = c.Kind
  362. return res, nil
  363. }
  364. // GetDaemonSet gets the daemonset by name and namespace
  365. func (a *Agent) GetDaemonSet(c grapher.Object) (*appsv1.DaemonSet, error) {
  366. res, err := a.Clientset.AppsV1().DaemonSets(c.Namespace).Get(
  367. context.TODO(),
  368. c.Name,
  369. metav1.GetOptions{},
  370. )
  371. if err != nil && errors.IsNotFound(err) {
  372. return nil, IsNotFoundError
  373. } else if err != nil {
  374. return nil, err
  375. }
  376. res.Kind = c.Kind
  377. return res, nil
  378. }
  379. // GetJob gets the job by name and namespace
  380. func (a *Agent) GetJob(c grapher.Object) (*batchv1.Job, error) {
  381. res, err := a.Clientset.BatchV1().Jobs(c.Namespace).Get(
  382. context.TODO(),
  383. c.Name,
  384. metav1.GetOptions{},
  385. )
  386. if err != nil && errors.IsNotFound(err) {
  387. return nil, IsNotFoundError
  388. } else if err != nil {
  389. return nil, err
  390. }
  391. res.Kind = c.Kind
  392. return res, nil
  393. }
  394. // GetCronJob gets the CronJob by name and namespace
  395. func (a *Agent) GetCronJob(c grapher.Object) (*batchv1beta1.CronJob, error) {
  396. res, err := a.Clientset.BatchV1beta1().CronJobs(c.Namespace).Get(
  397. context.TODO(),
  398. c.Name,
  399. metav1.GetOptions{},
  400. )
  401. if err != nil && errors.IsNotFound(err) {
  402. return nil, IsNotFoundError
  403. } else if err != nil {
  404. return nil, err
  405. }
  406. res.Kind = c.Kind
  407. return res, nil
  408. }
  409. // GetPodsByLabel retrieves pods with matching labels
  410. func (a *Agent) GetPodsByLabel(selector string, namespace string) (*v1.PodList, error) {
  411. // Search in all namespaces for matching pods
  412. return a.Clientset.CoreV1().Pods(namespace).List(
  413. context.TODO(),
  414. metav1.ListOptions{
  415. LabelSelector: selector,
  416. },
  417. )
  418. }
  419. // DeletePod deletes a pod by name and namespace
  420. func (a *Agent) DeletePod(namespace string, name string) error {
  421. err := a.Clientset.CoreV1().Pods(namespace).Delete(
  422. context.TODO(),
  423. name,
  424. metav1.DeleteOptions{},
  425. )
  426. if err != nil && errors.IsNotFound(err) {
  427. return IsNotFoundError
  428. }
  429. return err
  430. }
  431. // GetPodLogs streams real-time logs from a given pod.
  432. func (a *Agent) GetPodLogs(namespace string, name string, rw *websocket.WebsocketSafeReadWriter) error {
  433. // get the pod to read in the list of contains
  434. pod, err := a.Clientset.CoreV1().Pods(namespace).Get(
  435. context.Background(),
  436. name,
  437. metav1.GetOptions{},
  438. )
  439. if err != nil && errors.IsNotFound(err) {
  440. return IsNotFoundError
  441. } else if err != nil {
  442. return fmt.Errorf("Cannot get logs from pod %s: %s", name, err.Error())
  443. }
  444. // see if container is ready and able to open a stream. If not, wait for container
  445. // to be ready.
  446. err, isExited := a.waitForPod(pod)
  447. if err != nil && goerrors.Is(err, IsNotFoundError) {
  448. return IsNotFoundError
  449. } else if err != nil {
  450. return fmt.Errorf("Cannot get logs from pod %s: %s", name, err.Error())
  451. } else if isExited {
  452. // if exited, we return nil and simply close the stream
  453. return nil
  454. }
  455. container := pod.Spec.Containers[0].Name
  456. tails := int64(400)
  457. // follow logs
  458. podLogOpts := v1.PodLogOptions{
  459. Follow: true,
  460. TailLines: &tails,
  461. Container: container,
  462. }
  463. req := a.Clientset.CoreV1().Pods(namespace).GetLogs(name, &podLogOpts)
  464. podLogs, err := req.Stream(context.TODO())
  465. // in the case of bad request errors, such as if the pod is stuck in "ContainerCreating",
  466. // we'd like to pass this through to the client.
  467. if err != nil && errors.IsBadRequest(err) {
  468. return &BadRequestError{err.Error()}
  469. } else if err != nil {
  470. return fmt.Errorf("Cannot open log stream for pod %s: %s", name, err.Error())
  471. }
  472. defer podLogs.Close()
  473. r := bufio.NewReader(podLogs)
  474. errorchan := make(chan error)
  475. go func() {
  476. // listens for websocket closing handshake
  477. for {
  478. if _, _, err := rw.ReadMessage(); err != nil {
  479. errorchan <- nil
  480. return
  481. }
  482. }
  483. }()
  484. go func() {
  485. for {
  486. select {
  487. case <-errorchan:
  488. defer close(errorchan)
  489. return
  490. default:
  491. }
  492. bytes, err := r.ReadBytes('\n')
  493. if _, writeErr := rw.Write(bytes); writeErr != nil {
  494. errorchan <- writeErr
  495. return
  496. }
  497. if err != nil {
  498. if err != io.EOF {
  499. errorchan <- err
  500. return
  501. }
  502. errorchan <- nil
  503. return
  504. }
  505. }
  506. }()
  507. for {
  508. select {
  509. case err = <-errorchan:
  510. return err
  511. }
  512. }
  513. }
  514. // StopJobWithJobSidecar sends a termination signal to a job running with a sidecar
  515. func (a *Agent) StopJobWithJobSidecar(namespace, name string) error {
  516. jobPods, err := a.GetJobPods(namespace, name)
  517. if err != nil {
  518. return err
  519. }
  520. podName := jobPods[0].ObjectMeta.Name
  521. restConf, err := a.RESTClientGetter.ToRESTConfig()
  522. restConf.GroupVersion = &schema.GroupVersion{
  523. Group: "api",
  524. Version: "v1",
  525. }
  526. restConf.NegotiatedSerializer = runtime.NewSimpleNegotiatedSerializer(runtime.SerializerInfo{})
  527. restClient, err := rest.RESTClientFor(restConf)
  528. if err != nil {
  529. return err
  530. }
  531. req := restClient.Post().
  532. Resource("pods").
  533. Name(podName).
  534. Namespace(namespace).
  535. SubResource("exec")
  536. req.Param("command", "./signal.sh")
  537. req.Param("container", "sidecar")
  538. req.Param("stdin", "true")
  539. req.Param("stdout", "false")
  540. req.Param("tty", "false")
  541. exec, err := remotecommand.NewSPDYExecutor(restConf, "POST", req.URL())
  542. if err != nil {
  543. return err
  544. }
  545. return exec.Stream(remotecommand.StreamOptions{
  546. Tty: false,
  547. Stdin: strings.NewReader("./signal.sh"),
  548. })
  549. }
  550. // RunWebsocketTask will run a websocket task. If the websocket returns an anauthorized error, it will restart
  551. // the task some number of times until failing
  552. func (a *Agent) RunWebsocketTask(task func() error) error {
  553. lastTime := int64(0)
  554. for {
  555. if err := a.UpdateClientset(); err != nil {
  556. return err
  557. }
  558. err := task()
  559. if err == nil {
  560. return nil
  561. }
  562. if !errors2.Is(err, &AuthError{}) {
  563. return err
  564. }
  565. if time.Now().Unix()-lastTime < 60 { // don't regenerate connection if too many unauthorized errors
  566. return err
  567. }
  568. lastTime = time.Now().Unix()
  569. }
  570. }
  571. // StreamControllerStatus streams controller status. Supports Deployment, StatefulSet, ReplicaSet, and DaemonSet
  572. // TODO: Support Jobs
  573. func (a *Agent) StreamControllerStatus(kind string, selectors string, rw *websocket.WebsocketSafeReadWriter) error {
  574. run := func() error {
  575. // selectors is an array of max length 1. StreamControllerStatus accepts calls without the selectors argument.
  576. // selectors argument is a single string with comma separated key=value pairs. (e.g. "app=porter,porter=true")
  577. tweakListOptionsFunc := func(options *metav1.ListOptions) {
  578. options.LabelSelector = selectors
  579. }
  580. factory := informers.NewSharedInformerFactoryWithOptions(
  581. a.Clientset,
  582. 0,
  583. informers.WithTweakListOptions(tweakListOptionsFunc),
  584. )
  585. var informer cache.SharedInformer
  586. // Spins up an informer depending on kind. Convert to lowercase for robustness
  587. switch strings.ToLower(kind) {
  588. case "deployment":
  589. informer = factory.Apps().V1().Deployments().Informer()
  590. case "statefulset":
  591. informer = factory.Apps().V1().StatefulSets().Informer()
  592. case "replicaset":
  593. informer = factory.Apps().V1().ReplicaSets().Informer()
  594. case "daemonset":
  595. informer = factory.Apps().V1().DaemonSets().Informer()
  596. case "job":
  597. informer = factory.Batch().V1().Jobs().Informer()
  598. case "cronjob":
  599. informer = factory.Batch().V1beta1().CronJobs().Informer()
  600. case "namespace":
  601. informer = factory.Core().V1().Namespaces().Informer()
  602. case "pod":
  603. informer = factory.Core().V1().Pods().Informer()
  604. }
  605. stopper := make(chan struct{})
  606. errorchan := make(chan error)
  607. defer close(stopper)
  608. informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
  609. if strings.HasSuffix(err.Error(), ": Unauthorized") {
  610. errorchan <- &AuthError{}
  611. }
  612. })
  613. informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
  614. UpdateFunc: func(oldObj, newObj interface{}) {
  615. msg := Message{
  616. EventType: "UPDATE",
  617. Object: newObj,
  618. Kind: strings.ToLower(kind),
  619. }
  620. rw.WriteJSONWithChannel(msg, errorchan)
  621. },
  622. AddFunc: func(obj interface{}) {
  623. msg := Message{
  624. EventType: "ADD",
  625. Object: obj,
  626. Kind: strings.ToLower(kind),
  627. }
  628. rw.WriteJSONWithChannel(msg, errorchan)
  629. },
  630. DeleteFunc: func(obj interface{}) {
  631. msg := Message{
  632. EventType: "DELETE",
  633. Object: obj,
  634. Kind: strings.ToLower(kind),
  635. }
  636. rw.WriteJSONWithChannel(msg, errorchan)
  637. },
  638. })
  639. go func() {
  640. // listens for websocket closing handshake
  641. for {
  642. if _, _, err := rw.ReadMessage(); err != nil {
  643. errorchan <- nil
  644. return
  645. }
  646. }
  647. }()
  648. go informer.Run(stopper)
  649. for {
  650. select {
  651. case err := <-errorchan:
  652. return err
  653. }
  654. }
  655. }
  656. return a.RunWebsocketTask(run)
  657. }
  658. var b64 = base64.StdEncoding
  659. var magicGzip = []byte{0x1f, 0x8b, 0x08}
  660. func decodeRelease(data string) (*rspb.Release, error) {
  661. // base64 decode string
  662. b, err := b64.DecodeString(data)
  663. if err != nil {
  664. return nil, err
  665. }
  666. // For backwards compatibility with releases that were stored before
  667. // compression was introduced we skip decompression if the
  668. // gzip magic header is not found
  669. if bytes.Equal(b[0:3], magicGzip) {
  670. r, err := gzip.NewReader(bytes.NewReader(b))
  671. if err != nil {
  672. return nil, err
  673. }
  674. defer r.Close()
  675. b2, err := ioutil.ReadAll(r)
  676. if err != nil {
  677. return nil, err
  678. }
  679. b = b2
  680. }
  681. var rls rspb.Release
  682. // unmarshal release object bytes
  683. if err := json.Unmarshal(b, &rls); err != nil {
  684. return nil, err
  685. }
  686. return &rls, nil
  687. }
  688. func contains(s []string, str string) bool {
  689. for _, v := range s {
  690. if v == str {
  691. return true
  692. }
  693. }
  694. return false
  695. }
  696. func parseSecretToHelmRelease(secret v1.Secret, chartList []string) (*rspb.Release, bool, error) {
  697. if secret.Type != "helm.sh/release.v1" {
  698. return nil, true, nil
  699. }
  700. releaseData, ok := secret.Data["release"]
  701. if !ok {
  702. return nil, true, fmt.Errorf("release field not found")
  703. }
  704. helm_object, err := decodeRelease(string(releaseData))
  705. if err != nil {
  706. return nil, true, err
  707. }
  708. if len(chartList) > 0 && !contains(chartList, helm_object.Name) {
  709. return nil, true, nil
  710. }
  711. return helm_object, false, nil
  712. }
  713. func (a *Agent) StreamHelmReleases(namespace string, chartList []string, selectors string, rw *websocket.WebsocketSafeReadWriter) error {
  714. run := func() error {
  715. tweakListOptionsFunc := func(options *metav1.ListOptions) {
  716. options.LabelSelector = selectors
  717. }
  718. factory := informers.NewSharedInformerFactoryWithOptions(
  719. a.Clientset,
  720. 0,
  721. informers.WithTweakListOptions(tweakListOptionsFunc),
  722. informers.WithNamespace(namespace),
  723. )
  724. informer := factory.Core().V1().Secrets().Informer()
  725. stopper := make(chan struct{})
  726. errorchan := make(chan error)
  727. defer close(stopper)
  728. informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
  729. if strings.HasSuffix(err.Error(), ": Unauthorized") {
  730. errorchan <- &AuthError{}
  731. }
  732. })
  733. informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
  734. UpdateFunc: func(oldObj, newObj interface{}) {
  735. secretObj, ok := newObj.(*v1.Secret)
  736. if !ok {
  737. errorchan <- fmt.Errorf("could not cast to secret")
  738. return
  739. }
  740. helm_object, isNotHelmRelease, err := parseSecretToHelmRelease(*secretObj, chartList)
  741. if isNotHelmRelease && err == nil {
  742. return
  743. }
  744. if err != nil {
  745. errorchan <- err
  746. return
  747. }
  748. msg := Message{
  749. EventType: "UPDATE",
  750. Object: helm_object,
  751. }
  752. rw.WriteJSONWithChannel(msg, errorchan)
  753. },
  754. AddFunc: func(obj interface{}) {
  755. secretObj, ok := obj.(*v1.Secret)
  756. if !ok {
  757. errorchan <- fmt.Errorf("could not cast to secret")
  758. return
  759. }
  760. helm_object, isNotHelmRelease, err := parseSecretToHelmRelease(*secretObj, chartList)
  761. if isNotHelmRelease && err == nil {
  762. return
  763. }
  764. if err != nil {
  765. errorchan <- err
  766. return
  767. }
  768. msg := Message{
  769. EventType: "ADD",
  770. Object: helm_object,
  771. }
  772. rw.WriteJSONWithChannel(msg, errorchan)
  773. },
  774. DeleteFunc: func(obj interface{}) {
  775. secretObj, ok := obj.(*v1.Secret)
  776. if !ok {
  777. errorchan <- fmt.Errorf("could not cast to secret")
  778. return
  779. }
  780. helm_object, isNotHelmRelease, err := parseSecretToHelmRelease(*secretObj, chartList)
  781. if isNotHelmRelease && err == nil {
  782. return
  783. }
  784. if err != nil {
  785. errorchan <- err
  786. return
  787. }
  788. msg := Message{
  789. EventType: "DELETE",
  790. Object: helm_object,
  791. }
  792. rw.WriteJSONWithChannel(msg, errorchan)
  793. },
  794. })
  795. go func() {
  796. // listens for websocket closing handshake
  797. for {
  798. if _, _, err := rw.ReadMessage(); err != nil {
  799. errorchan <- nil
  800. return
  801. }
  802. }
  803. }()
  804. go informer.Run(stopper)
  805. for {
  806. select {
  807. case err := <-errorchan:
  808. return err
  809. }
  810. }
  811. }
  812. return a.RunWebsocketTask(run)
  813. }
  814. type SharedProvisionOpts struct {
  815. ProjectID uint
  816. Repo repository.Repository
  817. Infra *models.Infra
  818. Operation provisioner.ProvisionerOperation
  819. PGConf *env.DBConf
  820. RedisConf *env.RedisConf
  821. ProvImageTag string
  822. ProvImagePullSecret string
  823. }
  824. // ProvisionECR spawns a new provisioning pod that creates an ECR instance
  825. func (a *Agent) ProvisionECR(
  826. opts *SharedProvisionOpts,
  827. awsConf *integrations.AWSIntegration,
  828. ecrName string,
  829. ) (*batchv1.Job, error) {
  830. id := opts.Infra.GetUniqueName()
  831. prov := &provisioner.Conf{
  832. ID: id,
  833. Name: fmt.Sprintf("prov-%s-%s", id, string(opts.Operation)),
  834. Kind: provisioner.ECR,
  835. Operation: opts.Operation,
  836. Redis: opts.RedisConf,
  837. Postgres: opts.PGConf,
  838. ProvisionerImageTag: opts.ProvImageTag,
  839. ImagePullSecret: opts.ProvImagePullSecret,
  840. LastApplied: opts.Infra.LastApplied,
  841. AWS: &aws.Conf{
  842. AWSRegion: awsConf.AWSRegion,
  843. AWSAccessKeyID: string(awsConf.AWSAccessKeyID),
  844. AWSSecretAccessKey: string(awsConf.AWSSecretAccessKey),
  845. },
  846. ECR: &ecr.Conf{
  847. ECRName: ecrName,
  848. },
  849. }
  850. return a.provision(prov, opts.Infra, opts.Repo)
  851. }
  852. // ProvisionEKS spawns a new provisioning pod that creates an EKS instance
  853. func (a *Agent) ProvisionEKS(
  854. opts *SharedProvisionOpts,
  855. awsConf *integrations.AWSIntegration,
  856. eksName, machineType string,
  857. ) (*batchv1.Job, error) {
  858. id := opts.Infra.GetUniqueName()
  859. prov := &provisioner.Conf{
  860. ID: id,
  861. Name: fmt.Sprintf("prov-%s-%s", id, string(opts.Operation)),
  862. Kind: provisioner.EKS,
  863. Operation: opts.Operation,
  864. Redis: opts.RedisConf,
  865. Postgres: opts.PGConf,
  866. ProvisionerImageTag: opts.ProvImageTag,
  867. ImagePullSecret: opts.ProvImagePullSecret,
  868. LastApplied: opts.Infra.LastApplied,
  869. AWS: &aws.Conf{
  870. AWSRegion: awsConf.AWSRegion,
  871. AWSAccessKeyID: string(awsConf.AWSAccessKeyID),
  872. AWSSecretAccessKey: string(awsConf.AWSSecretAccessKey),
  873. },
  874. EKS: &eks.Conf{
  875. ClusterName: eksName,
  876. MachineType: machineType,
  877. },
  878. }
  879. return a.provision(prov, opts.Infra, opts.Repo)
  880. }
  881. // ProvisionGCR spawns a new provisioning pod that creates a GCR instance
  882. func (a *Agent) ProvisionGCR(
  883. opts *SharedProvisionOpts,
  884. gcpConf *integrations.GCPIntegration,
  885. ) (*batchv1.Job, error) {
  886. id := opts.Infra.GetUniqueName()
  887. prov := &provisioner.Conf{
  888. ID: id,
  889. Name: fmt.Sprintf("prov-%s-%s", id, string(opts.Operation)),
  890. Kind: provisioner.GCR,
  891. Operation: opts.Operation,
  892. Redis: opts.RedisConf,
  893. Postgres: opts.PGConf,
  894. ProvisionerImageTag: opts.ProvImageTag,
  895. ImagePullSecret: opts.ProvImagePullSecret,
  896. LastApplied: opts.Infra.LastApplied,
  897. GCP: &gcp.Conf{
  898. GCPRegion: gcpConf.GCPRegion,
  899. GCPProjectID: gcpConf.GCPProjectID,
  900. GCPKeyData: string(gcpConf.GCPKeyData),
  901. },
  902. }
  903. return a.provision(prov, opts.Infra, opts.Repo)
  904. }
  905. // ProvisionGKE spawns a new provisioning pod that creates a GKE instance
  906. func (a *Agent) ProvisionGKE(
  907. opts *SharedProvisionOpts,
  908. gcpConf *integrations.GCPIntegration,
  909. gkeName string,
  910. ) (*batchv1.Job, error) {
  911. id := opts.Infra.GetUniqueName()
  912. prov := &provisioner.Conf{
  913. ID: id,
  914. Name: fmt.Sprintf("prov-%s-%s", id, string(opts.Operation)),
  915. Kind: provisioner.GKE,
  916. Operation: opts.Operation,
  917. Redis: opts.RedisConf,
  918. Postgres: opts.PGConf,
  919. ProvisionerImageTag: opts.ProvImageTag,
  920. ImagePullSecret: opts.ProvImagePullSecret,
  921. LastApplied: opts.Infra.LastApplied,
  922. GCP: &gcp.Conf{
  923. GCPRegion: gcpConf.GCPRegion,
  924. GCPProjectID: gcpConf.GCPProjectID,
  925. GCPKeyData: string(gcpConf.GCPKeyData),
  926. },
  927. GKE: &gke.Conf{
  928. ClusterName: gkeName,
  929. },
  930. }
  931. return a.provision(prov, opts.Infra, opts.Repo)
  932. }
  933. // ProvisionDOCR spawns a new provisioning pod that creates a DOCR instance
  934. func (a *Agent) ProvisionDOCR(
  935. opts *SharedProvisionOpts,
  936. doConf *integrations.OAuthIntegration,
  937. doAuth *oauth2.Config,
  938. docrName, docrSubscriptionTier string,
  939. ) (*batchv1.Job, error) {
  940. // get the token
  941. oauthInt, err := opts.Repo.OAuthIntegration().ReadOAuthIntegration(
  942. opts.ProjectID,
  943. opts.Infra.DOIntegrationID,
  944. )
  945. if err != nil {
  946. return nil, err
  947. }
  948. tok, _, err := oauth.GetAccessToken(oauthInt.SharedOAuthModel, doAuth, oauth.MakeUpdateOAuthIntegrationTokenFunction(oauthInt, opts.Repo))
  949. if err != nil {
  950. return nil, err
  951. }
  952. id := opts.Infra.GetUniqueName()
  953. prov := &provisioner.Conf{
  954. ID: id,
  955. Name: fmt.Sprintf("prov-%s-%s", id, string(opts.Operation)),
  956. Kind: provisioner.DOCR,
  957. Operation: opts.Operation,
  958. Redis: opts.RedisConf,
  959. Postgres: opts.PGConf,
  960. ProvisionerImageTag: opts.ProvImageTag,
  961. ImagePullSecret: opts.ProvImagePullSecret,
  962. LastApplied: opts.Infra.LastApplied,
  963. DO: &do.Conf{
  964. DOToken: tok,
  965. },
  966. DOCR: &docr.Conf{
  967. DOCRName: docrName,
  968. DOCRSubscriptionTier: docrSubscriptionTier,
  969. },
  970. }
  971. return a.provision(prov, opts.Infra, opts.Repo)
  972. }
  973. // ProvisionDOKS spawns a new provisioning pod that creates a DOKS instance
  974. func (a *Agent) ProvisionDOKS(
  975. opts *SharedProvisionOpts,
  976. doConf *integrations.OAuthIntegration,
  977. doAuth *oauth2.Config,
  978. doRegion, doksClusterName string,
  979. ) (*batchv1.Job, error) {
  980. // get the token
  981. oauthInt, err := opts.Repo.OAuthIntegration().ReadOAuthIntegration(
  982. opts.ProjectID,
  983. opts.Infra.DOIntegrationID,
  984. )
  985. if err != nil {
  986. return nil, err
  987. }
  988. tok, _, err := oauth.GetAccessToken(oauthInt.SharedOAuthModel, doAuth, oauth.MakeUpdateOAuthIntegrationTokenFunction(oauthInt, opts.Repo))
  989. if err != nil {
  990. return nil, err
  991. }
  992. id := opts.Infra.GetUniqueName()
  993. prov := &provisioner.Conf{
  994. ID: id,
  995. Name: fmt.Sprintf("prov-%s-%s", id, string(opts.Operation)),
  996. Kind: provisioner.DOKS,
  997. Operation: opts.Operation,
  998. Redis: opts.RedisConf,
  999. Postgres: opts.PGConf,
  1000. LastApplied: opts.Infra.LastApplied,
  1001. ProvisionerImageTag: opts.ProvImageTag,
  1002. ImagePullSecret: opts.ProvImagePullSecret,
  1003. DO: &do.Conf{
  1004. DOToken: tok,
  1005. },
  1006. DOKS: &doks.Conf{
  1007. DORegion: doRegion,
  1008. DOKSClusterName: doksClusterName,
  1009. },
  1010. }
  1011. return a.provision(prov, opts.Infra, opts.Repo)
  1012. }
  1013. // ProvisionTest spawns a new provisioning pod that tests provisioning
  1014. func (a *Agent) ProvisionTest(
  1015. opts *SharedProvisionOpts,
  1016. ) (*batchv1.Job, error) {
  1017. id := opts.Infra.GetUniqueName()
  1018. prov := &provisioner.Conf{
  1019. ID: id,
  1020. Name: fmt.Sprintf("prov-%s-%s", id, string(opts.Operation)),
  1021. Operation: opts.Operation,
  1022. Kind: provisioner.Test,
  1023. Redis: opts.RedisConf,
  1024. Postgres: opts.PGConf,
  1025. ProvisionerImageTag: opts.ProvImageTag,
  1026. ImagePullSecret: opts.ProvImagePullSecret,
  1027. }
  1028. return a.provision(prov, opts.Infra, opts.Repo)
  1029. }
  1030. func (a *Agent) provision(
  1031. prov *provisioner.Conf,
  1032. infra *models.Infra,
  1033. repo repository.Repository,
  1034. ) (*batchv1.Job, error) {
  1035. prov.Namespace = "default"
  1036. job, err := prov.GetProvisionerJobTemplate()
  1037. if err != nil {
  1038. return nil, err
  1039. }
  1040. job, err = a.Clientset.BatchV1().Jobs(prov.Namespace).Create(
  1041. context.TODO(),
  1042. job,
  1043. metav1.CreateOptions{},
  1044. )
  1045. if err != nil {
  1046. return nil, err
  1047. }
  1048. infra.LastApplied = prov.LastApplied
  1049. infra, err = repo.Infra().UpdateInfra(infra)
  1050. if err != nil {
  1051. return nil, err
  1052. }
  1053. return job, nil
  1054. }
  1055. // CreateImagePullSecrets will create the required image pull secrets and
  1056. // return a map from the registry name to the name of the secret.
  1057. func (a *Agent) CreateImagePullSecrets(
  1058. repo repository.Repository,
  1059. namespace string,
  1060. linkedRegs map[string]*models.Registry,
  1061. doAuth *oauth2.Config,
  1062. ) (map[string]string, error) {
  1063. res := make(map[string]string)
  1064. for key, val := range linkedRegs {
  1065. _reg := registry.Registry(*val)
  1066. data, err := _reg.GetDockerConfigJSON(repo, doAuth)
  1067. if err != nil {
  1068. return nil, err
  1069. }
  1070. secretName := fmt.Sprintf("porter-%s-%d", val.ToRegistryType().Service, val.ID)
  1071. secret, err := a.Clientset.CoreV1().Secrets(namespace).Get(
  1072. context.TODO(),
  1073. secretName,
  1074. metav1.GetOptions{},
  1075. )
  1076. // if not found, create the secret
  1077. if err != nil && errors.IsNotFound(err) {
  1078. _, err = a.Clientset.CoreV1().Secrets(namespace).Create(
  1079. context.TODO(),
  1080. &v1.Secret{
  1081. ObjectMeta: metav1.ObjectMeta{
  1082. Name: secretName,
  1083. },
  1084. Data: map[string][]byte{
  1085. string(v1.DockerConfigJsonKey): data,
  1086. },
  1087. Type: v1.SecretTypeDockerConfigJson,
  1088. },
  1089. metav1.CreateOptions{},
  1090. )
  1091. if err != nil {
  1092. return nil, err
  1093. }
  1094. // add secret name to the map
  1095. res[key] = secretName
  1096. continue
  1097. } else if err != nil {
  1098. return nil, err
  1099. }
  1100. // otherwise, check that the secret contains the correct data: if
  1101. // if doesn't, update it
  1102. if !bytes.Equal(secret.Data[v1.DockerConfigJsonKey], data) {
  1103. _, err := a.Clientset.CoreV1().Secrets(namespace).Update(
  1104. context.TODO(),
  1105. &v1.Secret{
  1106. ObjectMeta: metav1.ObjectMeta{
  1107. Name: secretName,
  1108. },
  1109. Data: map[string][]byte{
  1110. string(v1.DockerConfigJsonKey): data,
  1111. },
  1112. Type: v1.SecretTypeDockerConfigJson,
  1113. },
  1114. metav1.UpdateOptions{},
  1115. )
  1116. if err != nil {
  1117. return nil, err
  1118. }
  1119. }
  1120. // add secret name to the map
  1121. res[key] = secretName
  1122. }
  1123. return res, nil
  1124. }
  1125. // helper that waits for pod to be ready
  1126. func (a *Agent) waitForPod(pod *v1.Pod) (error, bool) {
  1127. var (
  1128. w watch.Interface
  1129. err error
  1130. ok bool
  1131. )
  1132. // immediately after creating a pod, the API may return a 404. heuristically 1
  1133. // second seems to be plenty.
  1134. watchRetries := 3
  1135. for i := 0; i < watchRetries; i++ {
  1136. selector := fields.OneTermEqualSelector("metadata.name", pod.Name).String()
  1137. w, err = a.Clientset.CoreV1().
  1138. Pods(pod.Namespace).
  1139. Watch(context.Background(), metav1.ListOptions{FieldSelector: selector})
  1140. if err == nil {
  1141. break
  1142. }
  1143. time.Sleep(time.Second)
  1144. }
  1145. if err != nil {
  1146. return err, false
  1147. }
  1148. defer w.Stop()
  1149. for {
  1150. select {
  1151. case <-time.After(time.Second * 30):
  1152. return goerrors.New("timed out waiting for pod"), false
  1153. case <-time.Tick(time.Second):
  1154. // poll every second in case we already missed the ready event while
  1155. // creating the listener.
  1156. pod, err = a.Clientset.CoreV1().
  1157. Pods(pod.Namespace).
  1158. Get(context.Background(), pod.Name, metav1.GetOptions{})
  1159. if err != nil && errors.IsNotFound(err) {
  1160. return IsNotFoundError, false
  1161. } else if err != nil {
  1162. return err, false
  1163. }
  1164. if isExited := isPodExited(pod); isExited || isPodReady(pod) {
  1165. return nil, isExited
  1166. }
  1167. case evt := <-w.ResultChan():
  1168. pod, ok = evt.Object.(*v1.Pod)
  1169. if !ok {
  1170. return fmt.Errorf("unexpected object type: %T", evt.Object), false
  1171. }
  1172. if isExited := isPodExited(pod); isExited || isPodReady(pod) {
  1173. return nil, isExited
  1174. }
  1175. }
  1176. }
  1177. }
  1178. func isPodReady(pod *v1.Pod) bool {
  1179. ready := false
  1180. conditions := pod.Status.Conditions
  1181. for i := range conditions {
  1182. if conditions[i].Type == v1.PodReady {
  1183. ready = pod.Status.Conditions[i].Status == v1.ConditionTrue
  1184. }
  1185. }
  1186. return ready
  1187. }
  1188. func isPodExited(pod *v1.Pod) bool {
  1189. return pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed
  1190. }