2
0

agent.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653
  1. package kubernetes
  2. import (
  3. "bufio"
  4. "bytes"
  5. "context"
  6. "fmt"
  7. "io"
  8. "strings"
  9. "github.com/porter-dev/porter/internal/kubernetes/provisioner"
  10. "github.com/porter-dev/porter/internal/kubernetes/provisioner/aws"
  11. "github.com/porter-dev/porter/internal/kubernetes/provisioner/aws/ecr"
  12. "github.com/porter-dev/porter/internal/kubernetes/provisioner/aws/eks"
  13. "github.com/porter-dev/porter/internal/kubernetes/provisioner/do"
  14. "github.com/porter-dev/porter/internal/kubernetes/provisioner/do/docr"
  15. "github.com/porter-dev/porter/internal/kubernetes/provisioner/do/doks"
  16. "github.com/porter-dev/porter/internal/kubernetes/provisioner/gcp"
  17. "github.com/porter-dev/porter/internal/kubernetes/provisioner/gcp/gke"
  18. "github.com/porter-dev/porter/internal/models"
  19. "github.com/porter-dev/porter/internal/models/integrations"
  20. "github.com/porter-dev/porter/internal/oauth"
  21. "github.com/porter-dev/porter/internal/registry"
  22. "github.com/porter-dev/porter/internal/repository"
  23. "golang.org/x/oauth2"
  24. "github.com/gorilla/websocket"
  25. "github.com/porter-dev/porter/internal/helm/grapher"
  26. appsv1 "k8s.io/api/apps/v1"
  27. batchv1 "k8s.io/api/batch/v1"
  28. batchv1beta1 "k8s.io/api/batch/v1beta1"
  29. v1 "k8s.io/api/core/v1"
  30. v1beta1 "k8s.io/api/extensions/v1beta1"
  31. "k8s.io/apimachinery/pkg/api/errors"
  32. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  33. "k8s.io/cli-runtime/pkg/genericclioptions"
  34. "k8s.io/client-go/informers"
  35. "k8s.io/client-go/kubernetes"
  36. "k8s.io/client-go/tools/cache"
  37. "github.com/porter-dev/porter/internal/config"
  38. )
  39. // Agent is a Kubernetes agent for performing operations that interact with the
  40. // api server
  41. type Agent struct {
  42. RESTClientGetter genericclioptions.RESTClientGetter
  43. Clientset kubernetes.Interface
  44. }
  45. type Message struct {
  46. EventType string
  47. Object interface{}
  48. Kind string
  49. }
  50. type ListOptions struct {
  51. FieldSelector string
  52. }
  53. // ListNamespaces simply lists namespaces
  54. func (a *Agent) ListNamespaces() (*v1.NamespaceList, error) {
  55. return a.Clientset.CoreV1().Namespaces().List(
  56. context.TODO(),
  57. metav1.ListOptions{},
  58. )
  59. }
  60. // GetIngress gets ingress given the name and namespace
  61. func (a *Agent) GetIngress(namespace string, name string) (*v1beta1.Ingress, error) {
  62. return a.Clientset.ExtensionsV1beta1().Ingresses(namespace).Get(
  63. context.TODO(),
  64. name,
  65. metav1.GetOptions{},
  66. )
  67. }
  68. // GetDeployment gets the deployment given the name and namespace
  69. func (a *Agent) GetDeployment(c grapher.Object) (*appsv1.Deployment, error) {
  70. return a.Clientset.AppsV1().Deployments(c.Namespace).Get(
  71. context.TODO(),
  72. c.Name,
  73. metav1.GetOptions{},
  74. )
  75. }
  76. // GetStatefulSet gets the statefulset given the name and namespace
  77. func (a *Agent) GetStatefulSet(c grapher.Object) (*appsv1.StatefulSet, error) {
  78. return a.Clientset.AppsV1().StatefulSets(c.Namespace).Get(
  79. context.TODO(),
  80. c.Name,
  81. metav1.GetOptions{},
  82. )
  83. }
  84. // GetReplicaSet gets the replicaset given the name and namespace
  85. func (a *Agent) GetReplicaSet(c grapher.Object) (*appsv1.ReplicaSet, error) {
  86. return a.Clientset.AppsV1().ReplicaSets(c.Namespace).Get(
  87. context.TODO(),
  88. c.Name,
  89. metav1.GetOptions{},
  90. )
  91. }
  92. // GetDaemonSet gets the daemonset by name and namespace
  93. func (a *Agent) GetDaemonSet(c grapher.Object) (*appsv1.DaemonSet, error) {
  94. return a.Clientset.AppsV1().DaemonSets(c.Namespace).Get(
  95. context.TODO(),
  96. c.Name,
  97. metav1.GetOptions{},
  98. )
  99. }
  100. // GetJob gets the job by name and namespace
  101. func (a *Agent) GetJob(c grapher.Object) (*batchv1.Job, error) {
  102. return a.Clientset.BatchV1().Jobs(c.Namespace).Get(
  103. context.TODO(),
  104. c.Name,
  105. metav1.GetOptions{},
  106. )
  107. }
  108. // GetCronJob gets the CronJob by name and namespace
  109. func (a *Agent) GetCronJob(c grapher.Object) (*batchv1beta1.CronJob, error) {
  110. return a.Clientset.BatchV1beta1().CronJobs(c.Namespace).Get(
  111. context.TODO(),
  112. c.Name,
  113. metav1.GetOptions{},
  114. )
  115. }
  116. // GetPodsByLabel retrieves pods with matching labels
  117. func (a *Agent) GetPodsByLabel(selector string) (*v1.PodList, error) {
  118. // Search in all namespaces for matching pods
  119. return a.Clientset.CoreV1().Pods("").List(
  120. context.TODO(),
  121. metav1.ListOptions{
  122. LabelSelector: selector,
  123. },
  124. )
  125. }
  126. // DeletePod deletes a pod by name and namespace
  127. func (a *Agent) DeletePod(namespace string, name string) error {
  128. return a.Clientset.CoreV1().Pods(namespace).Delete(
  129. context.TODO(),
  130. name,
  131. metav1.DeleteOptions{},
  132. )
  133. }
  134. // GetPodLogs streams real-time logs from a given pod.
  135. func (a *Agent) GetPodLogs(namespace string, name string, conn *websocket.Conn) error {
  136. tails := int64(400)
  137. // follow logs
  138. podLogOpts := v1.PodLogOptions{
  139. Follow: true,
  140. TailLines: &tails,
  141. }
  142. req := a.Clientset.CoreV1().Pods(namespace).GetLogs(name, &podLogOpts)
  143. podLogs, err := req.Stream(context.TODO())
  144. if err != nil {
  145. return fmt.Errorf("Cannot open log stream for pod %s", name)
  146. }
  147. defer podLogs.Close()
  148. r := bufio.NewReader(podLogs)
  149. errorchan := make(chan error)
  150. go func() {
  151. // listens for websocket closing handshake
  152. for {
  153. if _, _, err := conn.ReadMessage(); err != nil {
  154. defer conn.Close()
  155. errorchan <- nil
  156. return
  157. }
  158. }
  159. }()
  160. go func() {
  161. for {
  162. select {
  163. case <-errorchan:
  164. defer close(errorchan)
  165. return
  166. default:
  167. }
  168. bytes, err := r.ReadBytes('\n')
  169. if writeErr := conn.WriteMessage(websocket.TextMessage, bytes); writeErr != nil {
  170. errorchan <- writeErr
  171. return
  172. }
  173. if err != nil {
  174. if err != io.EOF {
  175. errorchan <- err
  176. return
  177. }
  178. errorchan <- nil
  179. return
  180. }
  181. }
  182. }()
  183. for {
  184. select {
  185. case err = <-errorchan:
  186. return err
  187. }
  188. }
  189. }
  190. // StreamControllerStatus streams controller status. Supports Deployment, StatefulSet, ReplicaSet, and DaemonSet
  191. // TODO: Support Jobs
  192. func (a *Agent) StreamControllerStatus(conn *websocket.Conn, kind string) error {
  193. factory := informers.NewSharedInformerFactory(
  194. a.Clientset,
  195. 0,
  196. )
  197. var informer cache.SharedInformer
  198. // Spins up an informer depending on kind. Convert to lowercase for robustness
  199. switch strings.ToLower(kind) {
  200. case "deployment":
  201. informer = factory.Apps().V1().Deployments().Informer()
  202. case "statefulset":
  203. informer = factory.Apps().V1().StatefulSets().Informer()
  204. case "replicaset":
  205. informer = factory.Apps().V1().ReplicaSets().Informer()
  206. case "daemonset":
  207. informer = factory.Apps().V1().DaemonSets().Informer()
  208. }
  209. stopper := make(chan struct{})
  210. errorchan := make(chan error)
  211. defer close(errorchan)
  212. informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
  213. UpdateFunc: func(oldObj, newObj interface{}) {
  214. msg := Message{
  215. EventType: "UPDATE",
  216. Object: newObj,
  217. Kind: strings.ToLower(kind),
  218. }
  219. if writeErr := conn.WriteJSON(msg); writeErr != nil {
  220. errorchan <- writeErr
  221. return
  222. }
  223. },
  224. })
  225. go func() {
  226. // listens for websocket closing handshake
  227. for {
  228. if _, _, err := conn.ReadMessage(); err != nil {
  229. defer conn.Close()
  230. defer close(stopper)
  231. errorchan <- nil
  232. return
  233. }
  234. }
  235. }()
  236. go informer.Run(stopper)
  237. for {
  238. select {
  239. case err := <-errorchan:
  240. return err
  241. }
  242. }
  243. }
  244. // ProvisionECR spawns a new provisioning pod that creates an ECR instance
  245. func (a *Agent) ProvisionECR(
  246. projectID uint,
  247. awsConf *integrations.AWSIntegration,
  248. ecrName string,
  249. repo repository.Repository,
  250. infra *models.Infra,
  251. operation provisioner.ProvisionerOperation,
  252. pgConf *config.DBConf,
  253. redisConf *config.RedisConf,
  254. provImageTag string,
  255. ) (*batchv1.Job, error) {
  256. id := infra.GetUniqueName()
  257. prov := &provisioner.Conf{
  258. ID: id,
  259. Name: fmt.Sprintf("prov-%s-%s", id, string(operation)),
  260. Kind: provisioner.ECR,
  261. Operation: operation,
  262. Redis: redisConf,
  263. Postgres: pgConf,
  264. ProvisionerImageTag: provImageTag,
  265. LastApplied: infra.LastApplied,
  266. AWS: &aws.Conf{
  267. AWSRegion: awsConf.AWSRegion,
  268. AWSAccessKeyID: string(awsConf.AWSAccessKeyID),
  269. AWSSecretAccessKey: string(awsConf.AWSSecretAccessKey),
  270. },
  271. ECR: &ecr.Conf{
  272. ECRName: ecrName,
  273. },
  274. }
  275. return a.provision(prov, infra, repo)
  276. }
  277. // ProvisionEKS spawns a new provisioning pod that creates an EKS instance
  278. func (a *Agent) ProvisionEKS(
  279. projectID uint,
  280. awsConf *integrations.AWSIntegration,
  281. eksName, machineType string,
  282. repo repository.Repository,
  283. infra *models.Infra,
  284. operation provisioner.ProvisionerOperation,
  285. pgConf *config.DBConf,
  286. redisConf *config.RedisConf,
  287. provImageTag string,
  288. ) (*batchv1.Job, error) {
  289. id := infra.GetUniqueName()
  290. prov := &provisioner.Conf{
  291. ID: id,
  292. Name: fmt.Sprintf("prov-%s-%s", id, string(operation)),
  293. Kind: provisioner.EKS,
  294. Operation: operation,
  295. Redis: redisConf,
  296. Postgres: pgConf,
  297. ProvisionerImageTag: provImageTag,
  298. LastApplied: infra.LastApplied,
  299. AWS: &aws.Conf{
  300. AWSRegion: awsConf.AWSRegion,
  301. AWSAccessKeyID: string(awsConf.AWSAccessKeyID),
  302. AWSSecretAccessKey: string(awsConf.AWSSecretAccessKey),
  303. },
  304. EKS: &eks.Conf{
  305. ClusterName: eksName,
  306. MachineType: machineType,
  307. },
  308. }
  309. return a.provision(prov, infra, repo)
  310. }
  311. // ProvisionGCR spawns a new provisioning pod that creates a GCR instance
  312. func (a *Agent) ProvisionGCR(
  313. projectID uint,
  314. gcpConf *integrations.GCPIntegration,
  315. repo repository.Repository,
  316. infra *models.Infra,
  317. operation provisioner.ProvisionerOperation,
  318. pgConf *config.DBConf,
  319. redisConf *config.RedisConf,
  320. provImageTag string,
  321. ) (*batchv1.Job, error) {
  322. id := infra.GetUniqueName()
  323. prov := &provisioner.Conf{
  324. ID: id,
  325. Name: fmt.Sprintf("prov-%s-%s", id, string(operation)),
  326. Kind: provisioner.GCR,
  327. Operation: operation,
  328. Redis: redisConf,
  329. Postgres: pgConf,
  330. ProvisionerImageTag: provImageTag,
  331. LastApplied: infra.LastApplied,
  332. GCP: &gcp.Conf{
  333. GCPRegion: gcpConf.GCPRegion,
  334. GCPProjectID: gcpConf.GCPProjectID,
  335. GCPKeyData: string(gcpConf.GCPKeyData),
  336. },
  337. }
  338. return a.provision(prov, infra, repo)
  339. }
  340. // ProvisionGKE spawns a new provisioning pod that creates a GKE instance
  341. func (a *Agent) ProvisionGKE(
  342. projectID uint,
  343. gcpConf *integrations.GCPIntegration,
  344. gkeName string,
  345. repo repository.Repository,
  346. infra *models.Infra,
  347. operation provisioner.ProvisionerOperation,
  348. pgConf *config.DBConf,
  349. redisConf *config.RedisConf,
  350. provImageTag string,
  351. ) (*batchv1.Job, error) {
  352. id := infra.GetUniqueName()
  353. prov := &provisioner.Conf{
  354. ID: id,
  355. Name: fmt.Sprintf("prov-%s-%s", id, string(operation)),
  356. Kind: provisioner.GKE,
  357. Operation: operation,
  358. Redis: redisConf,
  359. Postgres: pgConf,
  360. ProvisionerImageTag: provImageTag,
  361. LastApplied: infra.LastApplied,
  362. GCP: &gcp.Conf{
  363. GCPRegion: gcpConf.GCPRegion,
  364. GCPProjectID: gcpConf.GCPProjectID,
  365. GCPKeyData: string(gcpConf.GCPKeyData),
  366. },
  367. GKE: &gke.Conf{
  368. ClusterName: gkeName,
  369. },
  370. }
  371. return a.provision(prov, infra, repo)
  372. }
  373. // ProvisionDOCR spawns a new provisioning pod that creates a DOCR instance
  374. func (a *Agent) ProvisionDOCR(
  375. projectID uint,
  376. doConf *integrations.OAuthIntegration,
  377. doAuth *oauth2.Config,
  378. repo repository.Repository,
  379. docrName, docrSubscriptionTier string,
  380. infra *models.Infra,
  381. operation provisioner.ProvisionerOperation,
  382. pgConf *config.DBConf,
  383. redisConf *config.RedisConf,
  384. provImageTag string,
  385. ) (*batchv1.Job, error) {
  386. // get the token
  387. oauthInt, err := repo.OAuthIntegration.ReadOAuthIntegration(
  388. infra.DOIntegrationID,
  389. )
  390. if err != nil {
  391. return nil, err
  392. }
  393. tok, _, err := oauth.GetAccessToken(oauthInt, doAuth, repo)
  394. if err != nil {
  395. return nil, err
  396. }
  397. id := infra.GetUniqueName()
  398. prov := &provisioner.Conf{
  399. ID: id,
  400. Name: fmt.Sprintf("prov-%s-%s", id, string(operation)),
  401. Kind: provisioner.DOCR,
  402. Operation: operation,
  403. Redis: redisConf,
  404. Postgres: pgConf,
  405. ProvisionerImageTag: provImageTag,
  406. LastApplied: infra.LastApplied,
  407. DO: &do.Conf{
  408. DOToken: tok,
  409. },
  410. DOCR: &docr.Conf{
  411. DOCRName: docrName,
  412. DOCRSubscriptionTier: docrSubscriptionTier,
  413. },
  414. }
  415. return a.provision(prov, infra, repo)
  416. }
  417. // ProvisionDOKS spawns a new provisioning pod that creates a DOKS instance
  418. func (a *Agent) ProvisionDOKS(
  419. projectID uint,
  420. doConf *integrations.OAuthIntegration,
  421. doAuth *oauth2.Config,
  422. repo repository.Repository,
  423. doRegion, doksClusterName string,
  424. infra *models.Infra,
  425. operation provisioner.ProvisionerOperation,
  426. pgConf *config.DBConf,
  427. redisConf *config.RedisConf,
  428. provImageTag string,
  429. ) (*batchv1.Job, error) {
  430. // get the token
  431. oauthInt, err := repo.OAuthIntegration.ReadOAuthIntegration(
  432. infra.DOIntegrationID,
  433. )
  434. if err != nil {
  435. return nil, err
  436. }
  437. tok, _, err := oauth.GetAccessToken(oauthInt, doAuth, repo)
  438. if err != nil {
  439. return nil, err
  440. }
  441. id := infra.GetUniqueName()
  442. prov := &provisioner.Conf{
  443. ID: id,
  444. Name: fmt.Sprintf("prov-%s-%s", id, string(operation)),
  445. Kind: provisioner.DOKS,
  446. Operation: operation,
  447. Redis: redisConf,
  448. Postgres: pgConf,
  449. LastApplied: infra.LastApplied,
  450. ProvisionerImageTag: provImageTag,
  451. DO: &do.Conf{
  452. DOToken: tok,
  453. },
  454. DOKS: &doks.Conf{
  455. DORegion: doRegion,
  456. DOKSClusterName: doksClusterName,
  457. },
  458. }
  459. return a.provision(prov, infra, repo)
  460. }
  461. // ProvisionTest spawns a new provisioning pod that tests provisioning
  462. func (a *Agent) ProvisionTest(
  463. projectID uint,
  464. infra *models.Infra,
  465. repo repository.Repository,
  466. operation provisioner.ProvisionerOperation,
  467. pgConf *config.DBConf,
  468. redisConf *config.RedisConf,
  469. provImageTag string,
  470. ) (*batchv1.Job, error) {
  471. id := infra.GetUniqueName()
  472. prov := &provisioner.Conf{
  473. ID: id,
  474. Name: fmt.Sprintf("prov-%s-%s", id, string(operation)),
  475. Operation: operation,
  476. Kind: provisioner.Test,
  477. Redis: redisConf,
  478. Postgres: pgConf,
  479. ProvisionerImageTag: provImageTag,
  480. }
  481. return a.provision(prov, infra, repo)
  482. }
  483. func (a *Agent) provision(
  484. prov *provisioner.Conf,
  485. infra *models.Infra,
  486. repo repository.Repository,
  487. ) (*batchv1.Job, error) {
  488. prov.Namespace = "default"
  489. job, err := prov.GetProvisionerJobTemplate()
  490. if err != nil {
  491. return nil, err
  492. }
  493. job, err = a.Clientset.BatchV1().Jobs(prov.Namespace).Create(
  494. context.TODO(),
  495. job,
  496. metav1.CreateOptions{},
  497. )
  498. if err != nil {
  499. return nil, err
  500. }
  501. infra.LastApplied = prov.LastApplied
  502. infra, err = repo.Infra.UpdateInfra(infra)
  503. if err != nil {
  504. return nil, err
  505. }
  506. return job, nil
  507. }
  508. // CreateImagePullSecrets will create the required image pull secrets and
  509. // return a map from the registry name to the name of the secret.
  510. func (a *Agent) CreateImagePullSecrets(
  511. repo repository.Repository,
  512. namespace string,
  513. linkedRegs map[string]*models.Registry,
  514. doAuth *oauth2.Config,
  515. ) (map[string]string, error) {
  516. res := make(map[string]string)
  517. for key, val := range linkedRegs {
  518. _reg := registry.Registry(*val)
  519. data, err := _reg.GetDockerConfigJSON(repo, doAuth)
  520. if err != nil {
  521. return nil, err
  522. }
  523. secretName := fmt.Sprintf("porter-%s-%d", val.Externalize().Service, val.ID)
  524. secret, err := a.Clientset.CoreV1().Secrets(namespace).Get(
  525. context.TODO(),
  526. secretName,
  527. metav1.GetOptions{},
  528. )
  529. // if not found, create the secret
  530. if err != nil && errors.IsNotFound(err) {
  531. _, err = a.Clientset.CoreV1().Secrets(namespace).Create(
  532. context.TODO(),
  533. &v1.Secret{
  534. ObjectMeta: metav1.ObjectMeta{
  535. Name: secretName,
  536. },
  537. Data: map[string][]byte{
  538. string(v1.DockerConfigJsonKey): data,
  539. },
  540. Type: v1.SecretTypeDockerConfigJson,
  541. },
  542. metav1.CreateOptions{},
  543. )
  544. if err != nil {
  545. return nil, err
  546. }
  547. // add secret name to the map
  548. res[key] = secretName
  549. continue
  550. } else if err != nil {
  551. return nil, err
  552. }
  553. // otherwise, check that the secret contains the correct data: if
  554. // if doesn't, update it
  555. if !bytes.Equal(secret.Data[v1.DockerConfigJsonKey], data) {
  556. _, err := a.Clientset.CoreV1().Secrets(namespace).Update(
  557. context.TODO(),
  558. &v1.Secret{
  559. ObjectMeta: metav1.ObjectMeta{
  560. Name: secretName,
  561. },
  562. Data: map[string][]byte{
  563. string(v1.DockerConfigJsonKey): data,
  564. },
  565. Type: v1.SecretTypeDockerConfigJson,
  566. },
  567. metav1.UpdateOptions{},
  568. )
  569. if err != nil {
  570. return nil, err
  571. }
  572. }
  573. // add secret name to the map
  574. res[key] = secretName
  575. }
  576. return res, nil
  577. }