agent.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633
  1. package kubernetes
  2. import (
  3. "bufio"
  4. "bytes"
  5. "context"
  6. "fmt"
  7. "io"
  8. "strings"
  9. "github.com/porter-dev/porter/internal/kubernetes/provisioner"
  10. "github.com/porter-dev/porter/internal/kubernetes/provisioner/aws"
  11. "github.com/porter-dev/porter/internal/kubernetes/provisioner/aws/ecr"
  12. "github.com/porter-dev/porter/internal/kubernetes/provisioner/aws/eks"
  13. "github.com/porter-dev/porter/internal/kubernetes/provisioner/do"
  14. "github.com/porter-dev/porter/internal/kubernetes/provisioner/do/docr"
  15. "github.com/porter-dev/porter/internal/kubernetes/provisioner/do/doks"
  16. "github.com/porter-dev/porter/internal/kubernetes/provisioner/gcp"
  17. "github.com/porter-dev/porter/internal/kubernetes/provisioner/gcp/gke"
  18. "github.com/porter-dev/porter/internal/models"
  19. "github.com/porter-dev/porter/internal/models/integrations"
  20. "github.com/porter-dev/porter/internal/oauth"
  21. "github.com/porter-dev/porter/internal/registry"
  22. "github.com/porter-dev/porter/internal/repository"
  23. "golang.org/x/oauth2"
  24. "github.com/gorilla/websocket"
  25. "github.com/porter-dev/porter/internal/helm/grapher"
  26. appsv1 "k8s.io/api/apps/v1"
  27. batchv1 "k8s.io/api/batch/v1"
  28. v1 "k8s.io/api/core/v1"
  29. v1beta1 "k8s.io/api/extensions/v1beta1"
  30. "k8s.io/apimachinery/pkg/api/errors"
  31. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  32. "k8s.io/cli-runtime/pkg/genericclioptions"
  33. "k8s.io/client-go/informers"
  34. "k8s.io/client-go/kubernetes"
  35. "k8s.io/client-go/tools/cache"
  36. "github.com/porter-dev/porter/internal/config"
  37. )
  38. // Agent is a Kubernetes agent for performing operations that interact with the
  39. // api server
  40. type Agent struct {
  41. RESTClientGetter genericclioptions.RESTClientGetter
  42. Clientset kubernetes.Interface
  43. }
  44. type Message struct {
  45. EventType string
  46. Object interface{}
  47. Kind string
  48. }
  49. type ListOptions struct {
  50. FieldSelector string
  51. }
  52. // ListNamespaces simply lists namespaces
  53. func (a *Agent) ListNamespaces() (*v1.NamespaceList, error) {
  54. return a.Clientset.CoreV1().Namespaces().List(
  55. context.TODO(),
  56. metav1.ListOptions{},
  57. )
  58. }
  59. // GetIngress gets ingress given the name and namespace
  60. func (a *Agent) GetIngress(namespace string, name string) (*v1beta1.Ingress, error) {
  61. return a.Clientset.ExtensionsV1beta1().Ingresses(namespace).Get(
  62. context.TODO(),
  63. name,
  64. metav1.GetOptions{},
  65. )
  66. }
  67. // GetDeployment gets the deployment given the name and namespace
  68. func (a *Agent) GetDeployment(c grapher.Object) (*appsv1.Deployment, error) {
  69. return a.Clientset.AppsV1().Deployments(c.Namespace).Get(
  70. context.TODO(),
  71. c.Name,
  72. metav1.GetOptions{},
  73. )
  74. }
  75. // GetStatefulSet gets the statefulset given the name and namespace
  76. func (a *Agent) GetStatefulSet(c grapher.Object) (*appsv1.StatefulSet, error) {
  77. return a.Clientset.AppsV1().StatefulSets(c.Namespace).Get(
  78. context.TODO(),
  79. c.Name,
  80. metav1.GetOptions{},
  81. )
  82. }
  83. // GetReplicaSet gets the replicaset given the name and namespace
  84. func (a *Agent) GetReplicaSet(c grapher.Object) (*appsv1.ReplicaSet, error) {
  85. return a.Clientset.AppsV1().ReplicaSets(c.Namespace).Get(
  86. context.TODO(),
  87. c.Name,
  88. metav1.GetOptions{},
  89. )
  90. }
  91. // GetDaemonSet gets the daemonset by name and namespace
  92. func (a *Agent) GetDaemonSet(c grapher.Object) (*appsv1.DaemonSet, error) {
  93. return a.Clientset.AppsV1().DaemonSets(c.Namespace).Get(
  94. context.TODO(),
  95. c.Name,
  96. metav1.GetOptions{},
  97. )
  98. }
  99. // GetJob gets the job by name and namespace
  100. func (a *Agent) GetJob(c grapher.Object) (*batchv1.Job, error) {
  101. return a.Clientset.BatchV1().Jobs(c.Namespace).Get(
  102. context.TODO(),
  103. c.Name,
  104. metav1.GetOptions{},
  105. )
  106. }
  107. // GetPodsByLabel retrieves pods with matching labels
  108. func (a *Agent) GetPodsByLabel(selector string) (*v1.PodList, error) {
  109. // Search in all namespaces for matching pods
  110. return a.Clientset.CoreV1().Pods("").List(
  111. context.TODO(),
  112. metav1.ListOptions{
  113. LabelSelector: selector,
  114. },
  115. )
  116. }
  117. // GetPodLogs streams real-time logs from a given pod.
  118. func (a *Agent) GetPodLogs(namespace string, name string, conn *websocket.Conn) error {
  119. tails := int64(400)
  120. // follow logs
  121. podLogOpts := v1.PodLogOptions{
  122. Follow: true,
  123. TailLines: &tails,
  124. }
  125. req := a.Clientset.CoreV1().Pods(namespace).GetLogs(name, &podLogOpts)
  126. podLogs, err := req.Stream(context.TODO())
  127. if err != nil {
  128. return fmt.Errorf("Cannot open log stream for pod %s", name)
  129. }
  130. defer podLogs.Close()
  131. r := bufio.NewReader(podLogs)
  132. errorchan := make(chan error)
  133. go func() {
  134. // listens for websocket closing handshake
  135. for {
  136. if _, _, err := conn.ReadMessage(); err != nil {
  137. defer conn.Close()
  138. errorchan <- nil
  139. return
  140. }
  141. }
  142. }()
  143. go func() {
  144. for {
  145. select {
  146. case <-errorchan:
  147. defer close(errorchan)
  148. return
  149. default:
  150. }
  151. bytes, err := r.ReadBytes('\n')
  152. if writeErr := conn.WriteMessage(websocket.TextMessage, bytes); writeErr != nil {
  153. errorchan <- writeErr
  154. return
  155. }
  156. if err != nil {
  157. if err != io.EOF {
  158. errorchan <- err
  159. return
  160. }
  161. errorchan <- nil
  162. return
  163. }
  164. }
  165. }()
  166. for {
  167. select {
  168. case err = <-errorchan:
  169. return err
  170. }
  171. }
  172. }
  173. // StreamControllerStatus streams controller status. Supports Deployment, StatefulSet, ReplicaSet, and DaemonSet
  174. // TODO: Support Jobs
  175. func (a *Agent) StreamControllerStatus(conn *websocket.Conn, kind string) error {
  176. factory := informers.NewSharedInformerFactory(
  177. a.Clientset,
  178. 0,
  179. )
  180. var informer cache.SharedInformer
  181. // Spins up an informer depending on kind. Convert to lowercase for robustness
  182. switch strings.ToLower(kind) {
  183. case "deployment":
  184. informer = factory.Apps().V1().Deployments().Informer()
  185. case "statefulset":
  186. informer = factory.Apps().V1().StatefulSets().Informer()
  187. case "replicaset":
  188. informer = factory.Apps().V1().ReplicaSets().Informer()
  189. case "daemonset":
  190. informer = factory.Apps().V1().DaemonSets().Informer()
  191. }
  192. stopper := make(chan struct{})
  193. errorchan := make(chan error)
  194. defer close(errorchan)
  195. informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
  196. UpdateFunc: func(oldObj, newObj interface{}) {
  197. msg := Message{
  198. EventType: "UPDATE",
  199. Object: newObj,
  200. Kind: strings.ToLower(kind),
  201. }
  202. if writeErr := conn.WriteJSON(msg); writeErr != nil {
  203. errorchan <- writeErr
  204. return
  205. }
  206. },
  207. })
  208. go func() {
  209. // listens for websocket closing handshake
  210. for {
  211. if _, _, err := conn.ReadMessage(); err != nil {
  212. defer conn.Close()
  213. defer close(stopper)
  214. errorchan <- nil
  215. return
  216. }
  217. }
  218. }()
  219. go informer.Run(stopper)
  220. for {
  221. select {
  222. case err := <-errorchan:
  223. return err
  224. }
  225. }
  226. }
  227. // ProvisionECR spawns a new provisioning pod that creates an ECR instance
  228. func (a *Agent) ProvisionECR(
  229. projectID uint,
  230. awsConf *integrations.AWSIntegration,
  231. ecrName string,
  232. repo repository.Repository,
  233. infra *models.Infra,
  234. operation provisioner.ProvisionerOperation,
  235. pgConf *config.DBConf,
  236. redisConf *config.RedisConf,
  237. provImageTag string,
  238. ) (*batchv1.Job, error) {
  239. id := infra.GetUniqueName()
  240. prov := &provisioner.Conf{
  241. ID: id,
  242. Name: fmt.Sprintf("prov-%s-%s", id, string(operation)),
  243. Kind: provisioner.ECR,
  244. Operation: operation,
  245. Redis: redisConf,
  246. Postgres: pgConf,
  247. ProvisionerImageTag: provImageTag,
  248. LastApplied: infra.LastApplied,
  249. AWS: &aws.Conf{
  250. AWSRegion: awsConf.AWSRegion,
  251. AWSAccessKeyID: string(awsConf.AWSAccessKeyID),
  252. AWSSecretAccessKey: string(awsConf.AWSSecretAccessKey),
  253. },
  254. ECR: &ecr.Conf{
  255. ECRName: ecrName,
  256. },
  257. }
  258. return a.provision(prov, infra, repo)
  259. }
  260. // ProvisionEKS spawns a new provisioning pod that creates an EKS instance
  261. func (a *Agent) ProvisionEKS(
  262. projectID uint,
  263. awsConf *integrations.AWSIntegration,
  264. eksName string,
  265. repo repository.Repository,
  266. infra *models.Infra,
  267. operation provisioner.ProvisionerOperation,
  268. pgConf *config.DBConf,
  269. redisConf *config.RedisConf,
  270. provImageTag string,
  271. ) (*batchv1.Job, error) {
  272. id := infra.GetUniqueName()
  273. prov := &provisioner.Conf{
  274. ID: id,
  275. Name: fmt.Sprintf("prov-%s-%s", id, string(operation)),
  276. Kind: provisioner.EKS,
  277. Operation: operation,
  278. Redis: redisConf,
  279. Postgres: pgConf,
  280. ProvisionerImageTag: provImageTag,
  281. LastApplied: infra.LastApplied,
  282. AWS: &aws.Conf{
  283. AWSRegion: awsConf.AWSRegion,
  284. AWSAccessKeyID: string(awsConf.AWSAccessKeyID),
  285. AWSSecretAccessKey: string(awsConf.AWSSecretAccessKey),
  286. },
  287. EKS: &eks.Conf{
  288. ClusterName: eksName,
  289. },
  290. }
  291. return a.provision(prov, infra, repo)
  292. }
  293. // ProvisionGCR spawns a new provisioning pod that creates a GCR instance
  294. func (a *Agent) ProvisionGCR(
  295. projectID uint,
  296. gcpConf *integrations.GCPIntegration,
  297. repo repository.Repository,
  298. infra *models.Infra,
  299. operation provisioner.ProvisionerOperation,
  300. pgConf *config.DBConf,
  301. redisConf *config.RedisConf,
  302. provImageTag string,
  303. ) (*batchv1.Job, error) {
  304. id := infra.GetUniqueName()
  305. prov := &provisioner.Conf{
  306. ID: id,
  307. Name: fmt.Sprintf("prov-%s-%s", id, string(operation)),
  308. Kind: provisioner.GCR,
  309. Operation: operation,
  310. Redis: redisConf,
  311. Postgres: pgConf,
  312. ProvisionerImageTag: provImageTag,
  313. LastApplied: infra.LastApplied,
  314. GCP: &gcp.Conf{
  315. GCPRegion: gcpConf.GCPRegion,
  316. GCPProjectID: gcpConf.GCPProjectID,
  317. GCPKeyData: string(gcpConf.GCPKeyData),
  318. },
  319. }
  320. return a.provision(prov, infra, repo)
  321. }
  322. // ProvisionGKE spawns a new provisioning pod that creates a GKE instance
  323. func (a *Agent) ProvisionGKE(
  324. projectID uint,
  325. gcpConf *integrations.GCPIntegration,
  326. gkeName string,
  327. repo repository.Repository,
  328. infra *models.Infra,
  329. operation provisioner.ProvisionerOperation,
  330. pgConf *config.DBConf,
  331. redisConf *config.RedisConf,
  332. provImageTag string,
  333. ) (*batchv1.Job, error) {
  334. id := infra.GetUniqueName()
  335. prov := &provisioner.Conf{
  336. ID: id,
  337. Name: fmt.Sprintf("prov-%s-%s", id, string(operation)),
  338. Kind: provisioner.GKE,
  339. Operation: operation,
  340. Redis: redisConf,
  341. Postgres: pgConf,
  342. ProvisionerImageTag: provImageTag,
  343. LastApplied: infra.LastApplied,
  344. GCP: &gcp.Conf{
  345. GCPRegion: gcpConf.GCPRegion,
  346. GCPProjectID: gcpConf.GCPProjectID,
  347. GCPKeyData: string(gcpConf.GCPKeyData),
  348. },
  349. GKE: &gke.Conf{
  350. ClusterName: gkeName,
  351. },
  352. }
  353. return a.provision(prov, infra, repo)
  354. }
  355. // ProvisionDOCR spawns a new provisioning pod that creates a DOCR instance
  356. func (a *Agent) ProvisionDOCR(
  357. projectID uint,
  358. doConf *integrations.OAuthIntegration,
  359. doAuth *oauth2.Config,
  360. repo repository.Repository,
  361. docrName, docrSubscriptionTier string,
  362. infra *models.Infra,
  363. operation provisioner.ProvisionerOperation,
  364. pgConf *config.DBConf,
  365. redisConf *config.RedisConf,
  366. provImageTag string,
  367. ) (*batchv1.Job, error) {
  368. // get the token
  369. oauthInt, err := repo.OAuthIntegration.ReadOAuthIntegration(
  370. infra.DOIntegrationID,
  371. )
  372. if err != nil {
  373. return nil, err
  374. }
  375. tok, _, err := oauth.GetAccessToken(oauthInt, doAuth, repo)
  376. if err != nil {
  377. return nil, err
  378. }
  379. id := infra.GetUniqueName()
  380. prov := &provisioner.Conf{
  381. ID: id,
  382. Name: fmt.Sprintf("prov-%s-%s", id, string(operation)),
  383. Kind: provisioner.DOCR,
  384. Operation: operation,
  385. Redis: redisConf,
  386. Postgres: pgConf,
  387. ProvisionerImageTag: provImageTag,
  388. LastApplied: infra.LastApplied,
  389. DO: &do.Conf{
  390. DOToken: tok,
  391. },
  392. DOCR: &docr.Conf{
  393. DOCRName: docrName,
  394. DOCRSubscriptionTier: docrSubscriptionTier,
  395. },
  396. }
  397. return a.provision(prov, infra, repo)
  398. }
  399. // ProvisionDOKS spawns a new provisioning pod that creates a DOKS instance
  400. func (a *Agent) ProvisionDOKS(
  401. projectID uint,
  402. doConf *integrations.OAuthIntegration,
  403. doAuth *oauth2.Config,
  404. repo repository.Repository,
  405. doRegion, doksClusterName string,
  406. infra *models.Infra,
  407. operation provisioner.ProvisionerOperation,
  408. pgConf *config.DBConf,
  409. redisConf *config.RedisConf,
  410. provImageTag string,
  411. ) (*batchv1.Job, error) {
  412. // get the token
  413. oauthInt, err := repo.OAuthIntegration.ReadOAuthIntegration(
  414. infra.DOIntegrationID,
  415. )
  416. if err != nil {
  417. return nil, err
  418. }
  419. tok, _, err := oauth.GetAccessToken(oauthInt, doAuth, repo)
  420. if err != nil {
  421. return nil, err
  422. }
  423. id := infra.GetUniqueName()
  424. prov := &provisioner.Conf{
  425. ID: id,
  426. Name: fmt.Sprintf("prov-%s-%s", id, string(operation)),
  427. Kind: provisioner.DOKS,
  428. Operation: operation,
  429. Redis: redisConf,
  430. Postgres: pgConf,
  431. LastApplied: infra.LastApplied,
  432. ProvisionerImageTag: provImageTag,
  433. DO: &do.Conf{
  434. DOToken: tok,
  435. },
  436. DOKS: &doks.Conf{
  437. DORegion: doRegion,
  438. DOKSClusterName: doksClusterName,
  439. },
  440. }
  441. return a.provision(prov, infra, repo)
  442. }
  443. // ProvisionTest spawns a new provisioning pod that tests provisioning
  444. func (a *Agent) ProvisionTest(
  445. projectID uint,
  446. infra *models.Infra,
  447. repo repository.Repository,
  448. operation provisioner.ProvisionerOperation,
  449. pgConf *config.DBConf,
  450. redisConf *config.RedisConf,
  451. provImageTag string,
  452. ) (*batchv1.Job, error) {
  453. id := infra.GetUniqueName()
  454. prov := &provisioner.Conf{
  455. ID: id,
  456. Name: fmt.Sprintf("prov-%s-%s", id, string(operation)),
  457. Operation: operation,
  458. Kind: provisioner.Test,
  459. Redis: redisConf,
  460. Postgres: pgConf,
  461. ProvisionerImageTag: provImageTag,
  462. }
  463. return a.provision(prov, infra, repo)
  464. }
  465. func (a *Agent) provision(
  466. prov *provisioner.Conf,
  467. infra *models.Infra,
  468. repo repository.Repository,
  469. ) (*batchv1.Job, error) {
  470. prov.Namespace = "default"
  471. job, err := prov.GetProvisionerJobTemplate()
  472. if err != nil {
  473. return nil, err
  474. }
  475. job, err = a.Clientset.BatchV1().Jobs(prov.Namespace).Create(
  476. context.TODO(),
  477. job,
  478. metav1.CreateOptions{},
  479. )
  480. if err != nil {
  481. return nil, err
  482. }
  483. infra.LastApplied = prov.LastApplied
  484. infra, err = repo.Infra.UpdateInfra(infra)
  485. if err != nil {
  486. return nil, err
  487. }
  488. return job, nil
  489. }
  490. // CreateImagePullSecrets will create the required image pull secrets and
  491. // return a map from the registry name to the name of the secret.
  492. func (a *Agent) CreateImagePullSecrets(
  493. repo repository.Repository,
  494. namespace string,
  495. linkedRegs map[string]*models.Registry,
  496. doAuth *oauth2.Config,
  497. ) (map[string]string, error) {
  498. res := make(map[string]string)
  499. for key, val := range linkedRegs {
  500. _reg := registry.Registry(*val)
  501. data, err := _reg.GetDockerConfigJSON(repo, doAuth)
  502. if err != nil {
  503. return nil, err
  504. }
  505. secretName := fmt.Sprintf("porter-%s-%d", val.Externalize().Service, val.ID)
  506. secret, err := a.Clientset.CoreV1().Secrets(namespace).Get(
  507. context.TODO(),
  508. secretName,
  509. metav1.GetOptions{},
  510. )
  511. // if not found, create the secret
  512. if err != nil && errors.IsNotFound(err) {
  513. _, err = a.Clientset.CoreV1().Secrets(namespace).Create(
  514. context.TODO(),
  515. &v1.Secret{
  516. ObjectMeta: metav1.ObjectMeta{
  517. Name: secretName,
  518. },
  519. Data: map[string][]byte{
  520. string(v1.DockerConfigJsonKey): data,
  521. },
  522. Type: v1.SecretTypeDockerConfigJson,
  523. },
  524. metav1.CreateOptions{},
  525. )
  526. if err != nil {
  527. return nil, err
  528. }
  529. // add secret name to the map
  530. res[key] = secretName
  531. continue
  532. } else if err != nil {
  533. return nil, err
  534. }
  535. // otherwise, check that the secret contains the correct data: if
  536. // if doesn't, update it
  537. if !bytes.Equal(secret.Data[v1.DockerConfigJsonKey], data) {
  538. _, err := a.Clientset.CoreV1().Secrets(namespace).Update(
  539. context.TODO(),
  540. &v1.Secret{
  541. ObjectMeta: metav1.ObjectMeta{
  542. Name: secretName,
  543. },
  544. Data: map[string][]byte{
  545. string(v1.DockerConfigJsonKey): data,
  546. },
  547. Type: v1.SecretTypeDockerConfigJson,
  548. },
  549. metav1.UpdateOptions{},
  550. )
  551. if err != nil {
  552. return nil, err
  553. }
  554. }
  555. // add secret name to the map
  556. res[key] = secretName
  557. }
  558. return res, nil
  559. }