agent.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626
  1. package kubernetes
  2. import (
  3. "bufio"
  4. "bytes"
  5. "context"
  6. "fmt"
  7. "io"
  8. "strings"
  9. "github.com/porter-dev/porter/internal/kubernetes/provisioner"
  10. "github.com/porter-dev/porter/internal/kubernetes/provisioner/aws"
  11. "github.com/porter-dev/porter/internal/kubernetes/provisioner/aws/ecr"
  12. "github.com/porter-dev/porter/internal/kubernetes/provisioner/aws/eks"
  13. "github.com/porter-dev/porter/internal/kubernetes/provisioner/do"
  14. "github.com/porter-dev/porter/internal/kubernetes/provisioner/do/docr"
  15. "github.com/porter-dev/porter/internal/kubernetes/provisioner/do/doks"
  16. "github.com/porter-dev/porter/internal/kubernetes/provisioner/gcp"
  17. "github.com/porter-dev/porter/internal/kubernetes/provisioner/gcp/gke"
  18. "github.com/porter-dev/porter/internal/models"
  19. "github.com/porter-dev/porter/internal/models/integrations"
  20. "github.com/porter-dev/porter/internal/oauth"
  21. "github.com/porter-dev/porter/internal/registry"
  22. "github.com/porter-dev/porter/internal/repository"
  23. "golang.org/x/oauth2"
  24. "github.com/gorilla/websocket"
  25. "github.com/porter-dev/porter/internal/helm/grapher"
  26. appsv1 "k8s.io/api/apps/v1"
  27. batchv1 "k8s.io/api/batch/v1"
  28. v1 "k8s.io/api/core/v1"
  29. v1beta1 "k8s.io/api/extensions/v1beta1"
  30. "k8s.io/apimachinery/pkg/api/errors"
  31. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  32. "k8s.io/cli-runtime/pkg/genericclioptions"
  33. "k8s.io/client-go/informers"
  34. "k8s.io/client-go/kubernetes"
  35. "k8s.io/client-go/tools/cache"
  36. "github.com/porter-dev/porter/internal/config"
  37. )
  38. // Agent is a Kubernetes agent for performing operations that interact with the
  39. // api server
  40. type Agent struct {
  41. RESTClientGetter genericclioptions.RESTClientGetter
  42. Clientset kubernetes.Interface
  43. }
  44. type Message struct {
  45. EventType string
  46. Object interface{}
  47. Kind string
  48. }
  49. type ListOptions struct {
  50. FieldSelector string
  51. }
  52. // ListNamespaces simply lists namespaces
  53. func (a *Agent) ListNamespaces() (*v1.NamespaceList, error) {
  54. return a.Clientset.CoreV1().Namespaces().List(
  55. context.TODO(),
  56. metav1.ListOptions{},
  57. )
  58. }
  59. // GetIngress gets ingress given the name and namespace
  60. func (a *Agent) GetIngress(namespace string, name string) (*v1beta1.Ingress, error) {
  61. return a.Clientset.ExtensionsV1beta1().Ingresses(namespace).Get(
  62. context.TODO(),
  63. name,
  64. metav1.GetOptions{},
  65. )
  66. }
  67. // GetDeployment gets the deployment given the name and namespace
  68. func (a *Agent) GetDeployment(c grapher.Object) (*appsv1.Deployment, error) {
  69. return a.Clientset.AppsV1().Deployments(c.Namespace).Get(
  70. context.TODO(),
  71. c.Name,
  72. metav1.GetOptions{},
  73. )
  74. }
  75. // GetStatefulSet gets the statefulset given the name and namespace
  76. func (a *Agent) GetStatefulSet(c grapher.Object) (*appsv1.StatefulSet, error) {
  77. return a.Clientset.AppsV1().StatefulSets(c.Namespace).Get(
  78. context.TODO(),
  79. c.Name,
  80. metav1.GetOptions{},
  81. )
  82. }
  83. // GetReplicaSet gets the replicaset given the name and namespace
  84. func (a *Agent) GetReplicaSet(c grapher.Object) (*appsv1.ReplicaSet, error) {
  85. return a.Clientset.AppsV1().ReplicaSets(c.Namespace).Get(
  86. context.TODO(),
  87. c.Name,
  88. metav1.GetOptions{},
  89. )
  90. }
  91. // GetDaemonSet gets the daemonset by name and namespace
  92. func (a *Agent) GetDaemonSet(c grapher.Object) (*appsv1.DaemonSet, error) {
  93. return a.Clientset.AppsV1().DaemonSets(c.Namespace).Get(
  94. context.TODO(),
  95. c.Name,
  96. metav1.GetOptions{},
  97. )
  98. }
  99. // GetPodsByLabel retrieves pods with matching labels
  100. func (a *Agent) GetPodsByLabel(selector string) (*v1.PodList, error) {
  101. // Search in all namespaces for matching pods
  102. return a.Clientset.CoreV1().Pods("").List(
  103. context.TODO(),
  104. metav1.ListOptions{
  105. LabelSelector: selector,
  106. },
  107. )
  108. }
  109. // GetPodLogs streams real-time logs from a given pod.
  110. func (a *Agent) GetPodLogs(namespace string, name string, conn *websocket.Conn) error {
  111. tails := int64(400)
  112. // follow logs
  113. podLogOpts := v1.PodLogOptions{
  114. Follow: true,
  115. TailLines: &tails,
  116. }
  117. req := a.Clientset.CoreV1().Pods(namespace).GetLogs(name, &podLogOpts)
  118. podLogs, err := req.Stream(context.TODO())
  119. if err != nil {
  120. return fmt.Errorf("Cannot open log stream for pod %s", name)
  121. }
  122. defer podLogs.Close()
  123. r := bufio.NewReader(podLogs)
  124. errorchan := make(chan error)
  125. go func() {
  126. // listens for websocket closing handshake
  127. for {
  128. if _, _, err := conn.ReadMessage(); err != nil {
  129. defer conn.Close()
  130. errorchan <- nil
  131. fmt.Println("Successfully closed log stream")
  132. return
  133. }
  134. }
  135. }()
  136. go func() {
  137. for {
  138. select {
  139. case <-errorchan:
  140. defer close(errorchan)
  141. return
  142. default:
  143. }
  144. bytes, err := r.ReadBytes('\n')
  145. if writeErr := conn.WriteMessage(websocket.TextMessage, bytes); writeErr != nil {
  146. errorchan <- writeErr
  147. return
  148. }
  149. if err != nil {
  150. if err != io.EOF {
  151. errorchan <- err
  152. return
  153. }
  154. errorchan <- nil
  155. return
  156. }
  157. }
  158. }()
  159. for {
  160. select {
  161. case err = <-errorchan:
  162. return err
  163. }
  164. }
  165. }
  166. // StreamControllerStatus streams controller status. Supports Deployment, StatefulSet, ReplicaSet, and DaemonSet
  167. // TODO: Support Jobs
  168. func (a *Agent) StreamControllerStatus(conn *websocket.Conn, kind string) error {
  169. factory := informers.NewSharedInformerFactory(
  170. a.Clientset,
  171. 0,
  172. )
  173. var informer cache.SharedInformer
  174. // Spins up an informer depending on kind. Convert to lowercase for robustness
  175. switch strings.ToLower(kind) {
  176. case "deployment":
  177. informer = factory.Apps().V1().Deployments().Informer()
  178. case "statefulset":
  179. informer = factory.Apps().V1().StatefulSets().Informer()
  180. case "replicaset":
  181. informer = factory.Apps().V1().ReplicaSets().Informer()
  182. case "daemonset":
  183. informer = factory.Apps().V1().DaemonSets().Informer()
  184. }
  185. stopper := make(chan struct{})
  186. errorchan := make(chan error)
  187. defer close(errorchan)
  188. informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
  189. UpdateFunc: func(oldObj, newObj interface{}) {
  190. msg := Message{
  191. EventType: "UPDATE",
  192. Object: newObj,
  193. Kind: strings.ToLower(kind),
  194. }
  195. if writeErr := conn.WriteJSON(msg); writeErr != nil {
  196. errorchan <- writeErr
  197. return
  198. }
  199. },
  200. })
  201. go func() {
  202. // listens for websocket closing handshake
  203. for {
  204. if _, _, err := conn.ReadMessage(); err != nil {
  205. defer conn.Close()
  206. defer close(stopper)
  207. defer fmt.Println("Successfully closed controller status stream")
  208. errorchan <- nil
  209. return
  210. }
  211. }
  212. }()
  213. go informer.Run(stopper)
  214. for {
  215. select {
  216. case err := <-errorchan:
  217. return err
  218. }
  219. }
  220. }
  221. // ProvisionECR spawns a new provisioning pod that creates an ECR instance
  222. func (a *Agent) ProvisionECR(
  223. projectID uint,
  224. awsConf *integrations.AWSIntegration,
  225. ecrName string,
  226. repo repository.Repository,
  227. infra *models.Infra,
  228. operation provisioner.ProvisionerOperation,
  229. pgConf *config.DBConf,
  230. redisConf *config.RedisConf,
  231. provImageTag string,
  232. ) (*batchv1.Job, error) {
  233. id := infra.GetUniqueName()
  234. prov := &provisioner.Conf{
  235. ID: id,
  236. Name: fmt.Sprintf("prov-%s-%s", id, string(operation)),
  237. Kind: provisioner.ECR,
  238. Operation: operation,
  239. Redis: redisConf,
  240. Postgres: pgConf,
  241. ProvisionerImageTag: provImageTag,
  242. LastApplied: infra.LastApplied,
  243. AWS: &aws.Conf{
  244. AWSRegion: awsConf.AWSRegion,
  245. AWSAccessKeyID: string(awsConf.AWSAccessKeyID),
  246. AWSSecretAccessKey: string(awsConf.AWSSecretAccessKey),
  247. },
  248. ECR: &ecr.Conf{
  249. ECRName: ecrName,
  250. },
  251. }
  252. return a.provision(prov, infra, repo)
  253. }
  254. // ProvisionEKS spawns a new provisioning pod that creates an EKS instance
  255. func (a *Agent) ProvisionEKS(
  256. projectID uint,
  257. awsConf *integrations.AWSIntegration,
  258. eksName string,
  259. repo repository.Repository,
  260. infra *models.Infra,
  261. operation provisioner.ProvisionerOperation,
  262. pgConf *config.DBConf,
  263. redisConf *config.RedisConf,
  264. provImageTag string,
  265. ) (*batchv1.Job, error) {
  266. id := infra.GetUniqueName()
  267. prov := &provisioner.Conf{
  268. ID: id,
  269. Name: fmt.Sprintf("prov-%s-%s", id, string(operation)),
  270. Kind: provisioner.EKS,
  271. Operation: operation,
  272. Redis: redisConf,
  273. Postgres: pgConf,
  274. ProvisionerImageTag: provImageTag,
  275. LastApplied: infra.LastApplied,
  276. AWS: &aws.Conf{
  277. AWSRegion: awsConf.AWSRegion,
  278. AWSAccessKeyID: string(awsConf.AWSAccessKeyID),
  279. AWSSecretAccessKey: string(awsConf.AWSSecretAccessKey),
  280. },
  281. EKS: &eks.Conf{
  282. ClusterName: eksName,
  283. },
  284. }
  285. return a.provision(prov, infra, repo)
  286. }
  287. // ProvisionGCR spawns a new provisioning pod that creates a GCR instance
  288. func (a *Agent) ProvisionGCR(
  289. projectID uint,
  290. gcpConf *integrations.GCPIntegration,
  291. repo repository.Repository,
  292. infra *models.Infra,
  293. operation provisioner.ProvisionerOperation,
  294. pgConf *config.DBConf,
  295. redisConf *config.RedisConf,
  296. provImageTag string,
  297. ) (*batchv1.Job, error) {
  298. id := infra.GetUniqueName()
  299. prov := &provisioner.Conf{
  300. ID: id,
  301. Name: fmt.Sprintf("prov-%s-%s", id, string(operation)),
  302. Kind: provisioner.GCR,
  303. Operation: operation,
  304. Redis: redisConf,
  305. Postgres: pgConf,
  306. ProvisionerImageTag: provImageTag,
  307. LastApplied: infra.LastApplied,
  308. GCP: &gcp.Conf{
  309. GCPRegion: gcpConf.GCPRegion,
  310. GCPProjectID: gcpConf.GCPProjectID,
  311. GCPKeyData: string(gcpConf.GCPKeyData),
  312. },
  313. }
  314. return a.provision(prov, infra, repo)
  315. }
  316. // ProvisionGKE spawns a new provisioning pod that creates a GKE instance
  317. func (a *Agent) ProvisionGKE(
  318. projectID uint,
  319. gcpConf *integrations.GCPIntegration,
  320. gkeName string,
  321. repo repository.Repository,
  322. infra *models.Infra,
  323. operation provisioner.ProvisionerOperation,
  324. pgConf *config.DBConf,
  325. redisConf *config.RedisConf,
  326. provImageTag string,
  327. ) (*batchv1.Job, error) {
  328. id := infra.GetUniqueName()
  329. prov := &provisioner.Conf{
  330. ID: id,
  331. Name: fmt.Sprintf("prov-%s-%s", id, string(operation)),
  332. Kind: provisioner.GKE,
  333. Operation: operation,
  334. Redis: redisConf,
  335. Postgres: pgConf,
  336. ProvisionerImageTag: provImageTag,
  337. LastApplied: infra.LastApplied,
  338. GCP: &gcp.Conf{
  339. GCPRegion: gcpConf.GCPRegion,
  340. GCPProjectID: gcpConf.GCPProjectID,
  341. GCPKeyData: string(gcpConf.GCPKeyData),
  342. },
  343. GKE: &gke.Conf{
  344. ClusterName: gkeName,
  345. },
  346. }
  347. return a.provision(prov, infra, repo)
  348. }
  349. // ProvisionDOCR spawns a new provisioning pod that creates a DOCR instance
  350. func (a *Agent) ProvisionDOCR(
  351. projectID uint,
  352. doConf *integrations.OAuthIntegration,
  353. doAuth *oauth2.Config,
  354. repo repository.Repository,
  355. docrName, docrSubscriptionTier string,
  356. infra *models.Infra,
  357. operation provisioner.ProvisionerOperation,
  358. pgConf *config.DBConf,
  359. redisConf *config.RedisConf,
  360. provImageTag string,
  361. ) (*batchv1.Job, error) {
  362. // get the token
  363. oauthInt, err := repo.OAuthIntegration.ReadOAuthIntegration(
  364. infra.DOIntegrationID,
  365. )
  366. if err != nil {
  367. return nil, err
  368. }
  369. tok, _, err := oauth.GetAccessToken(oauthInt, doAuth, repo)
  370. if err != nil {
  371. return nil, err
  372. }
  373. id := infra.GetUniqueName()
  374. prov := &provisioner.Conf{
  375. ID: id,
  376. Name: fmt.Sprintf("prov-%s-%s", id, string(operation)),
  377. Kind: provisioner.DOCR,
  378. Operation: operation,
  379. Redis: redisConf,
  380. Postgres: pgConf,
  381. ProvisionerImageTag: provImageTag,
  382. LastApplied: infra.LastApplied,
  383. DO: &do.Conf{
  384. DOToken: tok,
  385. },
  386. DOCR: &docr.Conf{
  387. DOCRName: docrName,
  388. DOCRSubscriptionTier: docrSubscriptionTier,
  389. },
  390. }
  391. return a.provision(prov, infra, repo)
  392. }
  393. // ProvisionDOKS spawns a new provisioning pod that creates a DOKS instance
  394. func (a *Agent) ProvisionDOKS(
  395. projectID uint,
  396. doConf *integrations.OAuthIntegration,
  397. doAuth *oauth2.Config,
  398. repo repository.Repository,
  399. doRegion, doksClusterName string,
  400. infra *models.Infra,
  401. operation provisioner.ProvisionerOperation,
  402. pgConf *config.DBConf,
  403. redisConf *config.RedisConf,
  404. provImageTag string,
  405. ) (*batchv1.Job, error) {
  406. // get the token
  407. oauthInt, err := repo.OAuthIntegration.ReadOAuthIntegration(
  408. infra.DOIntegrationID,
  409. )
  410. if err != nil {
  411. return nil, err
  412. }
  413. tok, _, err := oauth.GetAccessToken(oauthInt, doAuth, repo)
  414. if err != nil {
  415. return nil, err
  416. }
  417. id := infra.GetUniqueName()
  418. prov := &provisioner.Conf{
  419. ID: id,
  420. Name: fmt.Sprintf("prov-%s-%s", id, string(operation)),
  421. Kind: provisioner.DOKS,
  422. Operation: operation,
  423. Redis: redisConf,
  424. Postgres: pgConf,
  425. LastApplied: infra.LastApplied,
  426. ProvisionerImageTag: provImageTag,
  427. DO: &do.Conf{
  428. DOToken: tok,
  429. },
  430. DOKS: &doks.Conf{
  431. DORegion: doRegion,
  432. DOKSClusterName: doksClusterName,
  433. },
  434. }
  435. return a.provision(prov, infra, repo)
  436. }
  437. // ProvisionTest spawns a new provisioning pod that tests provisioning
  438. func (a *Agent) ProvisionTest(
  439. projectID uint,
  440. infra *models.Infra,
  441. repo repository.Repository,
  442. operation provisioner.ProvisionerOperation,
  443. pgConf *config.DBConf,
  444. redisConf *config.RedisConf,
  445. provImageTag string,
  446. ) (*batchv1.Job, error) {
  447. id := infra.GetUniqueName()
  448. prov := &provisioner.Conf{
  449. ID: id,
  450. Name: fmt.Sprintf("prov-%s-%s", id, string(operation)),
  451. Operation: operation,
  452. Kind: provisioner.Test,
  453. Redis: redisConf,
  454. Postgres: pgConf,
  455. ProvisionerImageTag: provImageTag,
  456. }
  457. return a.provision(prov, infra, repo)
  458. }
  459. func (a *Agent) provision(
  460. prov *provisioner.Conf,
  461. infra *models.Infra,
  462. repo repository.Repository,
  463. ) (*batchv1.Job, error) {
  464. prov.Namespace = "default"
  465. job, err := prov.GetProvisionerJobTemplate()
  466. if err != nil {
  467. return nil, err
  468. }
  469. job, err = a.Clientset.BatchV1().Jobs(prov.Namespace).Create(
  470. context.TODO(),
  471. job,
  472. metav1.CreateOptions{},
  473. )
  474. if err != nil {
  475. return nil, err
  476. }
  477. infra.LastApplied = prov.LastApplied
  478. infra, err = repo.Infra.UpdateInfra(infra)
  479. if err != nil {
  480. return nil, err
  481. }
  482. return job, nil
  483. }
  484. // CreateImagePullSecrets will create the required image pull secrets and
  485. // return a map from the registry name to the name of the secret.
  486. func (a *Agent) CreateImagePullSecrets(
  487. repo repository.Repository,
  488. namespace string,
  489. linkedRegs map[string]*models.Registry,
  490. doAuth *oauth2.Config,
  491. ) (map[string]string, error) {
  492. res := make(map[string]string)
  493. for key, val := range linkedRegs {
  494. _reg := registry.Registry(*val)
  495. data, err := _reg.GetDockerConfigJSON(repo, doAuth)
  496. if err != nil {
  497. return nil, err
  498. }
  499. secretName := fmt.Sprintf("porter-%s-%d", val.Externalize().Service, val.ID)
  500. secret, err := a.Clientset.CoreV1().Secrets(namespace).Get(
  501. context.TODO(),
  502. secretName,
  503. metav1.GetOptions{},
  504. )
  505. // if not found, create the secret
  506. if err != nil && errors.IsNotFound(err) {
  507. _, err = a.Clientset.CoreV1().Secrets(namespace).Create(
  508. context.TODO(),
  509. &v1.Secret{
  510. ObjectMeta: metav1.ObjectMeta{
  511. Name: secretName,
  512. },
  513. Data: map[string][]byte{
  514. string(v1.DockerConfigJsonKey): data,
  515. },
  516. Type: v1.SecretTypeDockerConfigJson,
  517. },
  518. metav1.CreateOptions{},
  519. )
  520. if err != nil {
  521. return nil, err
  522. }
  523. // add secret name to the map
  524. res[key] = secretName
  525. continue
  526. } else if err != nil {
  527. return nil, err
  528. }
  529. // otherwise, check that the secret contains the correct data: if
  530. // if doesn't, update it
  531. if !bytes.Equal(secret.Data[v1.DockerConfigJsonKey], data) {
  532. _, err := a.Clientset.CoreV1().Secrets(namespace).Update(
  533. context.TODO(),
  534. &v1.Secret{
  535. ObjectMeta: metav1.ObjectMeta{
  536. Name: secretName,
  537. },
  538. Data: map[string][]byte{
  539. string(v1.DockerConfigJsonKey): data,
  540. },
  541. Type: v1.SecretTypeDockerConfigJson,
  542. },
  543. metav1.UpdateOptions{},
  544. )
  545. if err != nil {
  546. return nil, err
  547. }
  548. }
  549. // add secret name to the map
  550. res[key] = secretName
  551. }
  552. return res, nil
  553. }