agent.go 31 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315
  1. package kubernetes
  2. import (
  3. "bufio"
  4. "bytes"
  5. "compress/gzip"
  6. "context"
  7. "encoding/base64"
  8. "encoding/json"
  9. "fmt"
  10. "io"
  11. "io/ioutil"
  12. "strings"
  13. "time"
  14. "github.com/porter-dev/porter/api/server/shared/config/env"
  15. "github.com/porter-dev/porter/internal/kubernetes/provisioner"
  16. "github.com/porter-dev/porter/internal/kubernetes/provisioner/aws"
  17. "github.com/porter-dev/porter/internal/kubernetes/provisioner/aws/ecr"
  18. "github.com/porter-dev/porter/internal/kubernetes/provisioner/aws/eks"
  19. "github.com/porter-dev/porter/internal/kubernetes/provisioner/do"
  20. "github.com/porter-dev/porter/internal/kubernetes/provisioner/do/docr"
  21. "github.com/porter-dev/porter/internal/kubernetes/provisioner/do/doks"
  22. "github.com/porter-dev/porter/internal/kubernetes/provisioner/gcp"
  23. "github.com/porter-dev/porter/internal/kubernetes/provisioner/gcp/gke"
  24. "github.com/porter-dev/porter/internal/models"
  25. "github.com/porter-dev/porter/internal/models/integrations"
  26. "github.com/porter-dev/porter/internal/oauth"
  27. "github.com/porter-dev/porter/internal/registry"
  28. "github.com/porter-dev/porter/internal/repository"
  29. "golang.org/x/oauth2"
  30. errors2 "errors"
  31. "github.com/gorilla/websocket"
  32. "github.com/porter-dev/porter/internal/helm/grapher"
  33. appsv1 "k8s.io/api/apps/v1"
  34. batchv1 "k8s.io/api/batch/v1"
  35. batchv1beta1 "k8s.io/api/batch/v1beta1"
  36. v1 "k8s.io/api/core/v1"
  37. v1beta1 "k8s.io/api/extensions/v1beta1"
  38. "k8s.io/apimachinery/pkg/api/errors"
  39. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  40. "k8s.io/apimachinery/pkg/runtime"
  41. "k8s.io/apimachinery/pkg/runtime/schema"
  42. "k8s.io/apimachinery/pkg/types"
  43. "k8s.io/cli-runtime/pkg/genericclioptions"
  44. "k8s.io/client-go/informers"
  45. "k8s.io/client-go/kubernetes"
  46. "k8s.io/client-go/rest"
  47. "k8s.io/client-go/tools/cache"
  48. "k8s.io/client-go/tools/remotecommand"
  49. rspb "helm.sh/helm/v3/pkg/release"
  50. )
  51. // Agent is a Kubernetes agent for performing operations that interact with the
  52. // api server
  53. type Agent struct {
  54. RESTClientGetter genericclioptions.RESTClientGetter
  55. Clientset kubernetes.Interface
  56. }
  57. type Message struct {
  58. EventType string `json:"event_type"`
  59. Object interface{}
  60. Kind string
  61. }
  62. type ListOptions struct {
  63. FieldSelector string
  64. }
  65. type AuthError struct{}
  66. func (e *AuthError) Error() string {
  67. return "Unauthorized error"
  68. }
  69. // UpdateClientset updates the Agent's Clientset (this refreshes auth tokens)
  70. func (a *Agent) UpdateClientset() error {
  71. restConf, err := a.RESTClientGetter.ToRESTConfig()
  72. if err != nil {
  73. return err
  74. }
  75. clientset, err := kubernetes.NewForConfig(restConf)
  76. if err != nil {
  77. return err
  78. }
  79. a.Clientset = clientset
  80. return nil
  81. }
  82. // CreateConfigMap creates the configmap given the key-value pairs and namespace
  83. func (a *Agent) CreateConfigMap(name string, namespace string, configMap map[string]string) (*v1.ConfigMap, error) {
  84. return a.Clientset.CoreV1().ConfigMaps(namespace).Create(
  85. context.TODO(),
  86. &v1.ConfigMap{
  87. ObjectMeta: metav1.ObjectMeta{
  88. Name: name,
  89. Namespace: namespace,
  90. Labels: map[string]string{
  91. "porter": "true",
  92. },
  93. },
  94. Data: configMap,
  95. },
  96. metav1.CreateOptions{},
  97. )
  98. }
  99. // CreateLinkedSecret creates a secret given the key-value pairs and namespace. Values are
  100. // base64 encoded
  101. func (a *Agent) CreateLinkedSecret(name, namespace, cmName string, data map[string][]byte) (*v1.Secret, error) {
  102. return a.Clientset.CoreV1().Secrets(namespace).Create(
  103. context.TODO(),
  104. &v1.Secret{
  105. ObjectMeta: metav1.ObjectMeta{
  106. Name: name,
  107. Namespace: namespace,
  108. Labels: map[string]string{
  109. "porter": "true",
  110. "configmap": cmName,
  111. },
  112. },
  113. Data: data,
  114. },
  115. metav1.CreateOptions{},
  116. )
  117. }
  118. type mergeConfigMapData struct {
  119. Data map[string]*string `json:"data"`
  120. }
  121. // UpdateConfigMap updates the configmap given its name and namespace
  122. func (a *Agent) UpdateConfigMap(name string, namespace string, configMap map[string]string) (*v1.ConfigMap, error) {
  123. cmData := make(map[string]*string)
  124. for key, val := range configMap {
  125. valCopy := val
  126. cmData[key] = &valCopy
  127. if len(val) == 0 {
  128. cmData[key] = nil
  129. }
  130. }
  131. mergeCM := &mergeConfigMapData{
  132. Data: cmData,
  133. }
  134. patchBytes, err := json.Marshal(mergeCM)
  135. if err != nil {
  136. return nil, err
  137. }
  138. return a.Clientset.CoreV1().ConfigMaps(namespace).Patch(
  139. context.Background(),
  140. name,
  141. types.MergePatchType,
  142. patchBytes,
  143. metav1.PatchOptions{},
  144. )
  145. }
  146. type mergeLinkedSecretData struct {
  147. Data map[string]*[]byte `json:"data"`
  148. }
  149. // UpdateLinkedSecret updates the secret given its name and namespace
  150. func (a *Agent) UpdateLinkedSecret(name, namespace, cmName string, data map[string][]byte) error {
  151. secretData := make(map[string]*[]byte)
  152. for key, val := range data {
  153. valCopy := val
  154. secretData[key] = &valCopy
  155. if len(val) == 0 {
  156. secretData[key] = nil
  157. }
  158. }
  159. mergeSecret := &mergeLinkedSecretData{
  160. Data: secretData,
  161. }
  162. patchBytes, err := json.Marshal(mergeSecret)
  163. if err != nil {
  164. return err
  165. }
  166. _, err = a.Clientset.CoreV1().Secrets(namespace).Patch(
  167. context.TODO(),
  168. name,
  169. types.MergePatchType,
  170. patchBytes,
  171. metav1.PatchOptions{},
  172. )
  173. return err
  174. }
  175. // DeleteConfigMap deletes the configmap given its name and namespace
  176. func (a *Agent) DeleteConfigMap(name string, namespace string) error {
  177. return a.Clientset.CoreV1().ConfigMaps(namespace).Delete(
  178. context.TODO(),
  179. name,
  180. metav1.DeleteOptions{},
  181. )
  182. }
  183. // DeleteLinkedSecret deletes the secret given its name and namespace
  184. func (a *Agent) DeleteLinkedSecret(name, namespace string) error {
  185. return a.Clientset.CoreV1().Secrets(namespace).Delete(
  186. context.TODO(),
  187. name,
  188. metav1.DeleteOptions{},
  189. )
  190. }
  191. // GetConfigMap retrieves the configmap given its name and namespace
  192. func (a *Agent) GetConfigMap(name string, namespace string) (*v1.ConfigMap, error) {
  193. return a.Clientset.CoreV1().ConfigMaps(namespace).Get(
  194. context.TODO(),
  195. name,
  196. metav1.GetOptions{},
  197. )
  198. }
  199. // GetSecret retrieves the secret given its name and namespace
  200. func (a *Agent) GetSecret(name string, namespace string) (*v1.Secret, error) {
  201. return a.Clientset.CoreV1().Secrets(namespace).Get(
  202. context.TODO(),
  203. name,
  204. metav1.GetOptions{},
  205. )
  206. }
  207. // ListConfigMaps simply lists namespaces
  208. func (a *Agent) ListConfigMaps(namespace string) (*v1.ConfigMapList, error) {
  209. return a.Clientset.CoreV1().ConfigMaps(namespace).List(
  210. context.TODO(),
  211. metav1.ListOptions{
  212. LabelSelector: "porter=true",
  213. },
  214. )
  215. }
  216. // ListEvents lists the events of a given object.
  217. func (a *Agent) ListEvents(name string, namespace string) (*v1.EventList, error) {
  218. return a.Clientset.CoreV1().Events(namespace).List(
  219. context.TODO(),
  220. metav1.ListOptions{
  221. FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=%s", name, namespace),
  222. },
  223. )
  224. }
  225. // ListNamespaces simply lists namespaces
  226. func (a *Agent) ListNamespaces() (*v1.NamespaceList, error) {
  227. return a.Clientset.CoreV1().Namespaces().List(
  228. context.TODO(),
  229. metav1.ListOptions{},
  230. )
  231. }
  232. // CreateNamespace creates a namespace with the given name.
  233. func (a *Agent) CreateNamespace(name string) (*v1.Namespace, error) {
  234. namespace := v1.Namespace{
  235. ObjectMeta: metav1.ObjectMeta{
  236. Name: name,
  237. },
  238. }
  239. return a.Clientset.CoreV1().Namespaces().Create(
  240. context.TODO(),
  241. &namespace,
  242. metav1.CreateOptions{},
  243. )
  244. }
  245. // DeleteNamespace deletes the namespace given the name.
  246. func (a *Agent) DeleteNamespace(name string) error {
  247. return a.Clientset.CoreV1().Namespaces().Delete(
  248. context.TODO(),
  249. name,
  250. metav1.DeleteOptions{},
  251. )
  252. }
  253. // ListJobsByLabel lists jobs in a namespace matching a label
  254. type Label struct {
  255. Key string
  256. Val string
  257. }
  258. func (a *Agent) ListJobsByLabel(namespace string, labels ...Label) ([]batchv1.Job, error) {
  259. selectors := make([]string, 0)
  260. for _, label := range labels {
  261. selectors = append(selectors, fmt.Sprintf("%s=%s", label.Key, label.Val))
  262. }
  263. resp, err := a.Clientset.BatchV1().Jobs(namespace).List(
  264. context.TODO(),
  265. metav1.ListOptions{
  266. LabelSelector: strings.Join(selectors, ","),
  267. },
  268. )
  269. if err != nil {
  270. return nil, err
  271. }
  272. return resp.Items, nil
  273. }
  274. // DeleteJob deletes the job in the given name and namespace.
  275. func (a *Agent) DeleteJob(name, namespace string) error {
  276. return a.Clientset.BatchV1().Jobs(namespace).Delete(
  277. context.TODO(),
  278. name,
  279. metav1.DeleteOptions{},
  280. )
  281. }
  282. // GetJobPods lists all pods belonging to a job in a namespace
  283. func (a *Agent) GetJobPods(namespace, jobName string) ([]v1.Pod, error) {
  284. resp, err := a.Clientset.CoreV1().Pods(namespace).List(
  285. context.TODO(),
  286. metav1.ListOptions{
  287. LabelSelector: fmt.Sprintf("%s=%s", "job-name", jobName),
  288. },
  289. )
  290. if err != nil {
  291. return nil, err
  292. }
  293. return resp.Items, nil
  294. }
  295. // GetIngress gets ingress given the name and namespace
  296. func (a *Agent) GetIngress(namespace string, name string) (*v1beta1.Ingress, error) {
  297. return a.Clientset.ExtensionsV1beta1().Ingresses(namespace).Get(
  298. context.TODO(),
  299. name,
  300. metav1.GetOptions{},
  301. )
  302. }
  303. // GetDeployment gets the deployment given the name and namespace
  304. func (a *Agent) GetDeployment(c grapher.Object) (*appsv1.Deployment, error) {
  305. res, err := a.Clientset.AppsV1().Deployments(c.Namespace).Get(
  306. context.TODO(),
  307. c.Name,
  308. metav1.GetOptions{},
  309. )
  310. if err != nil {
  311. return nil, err
  312. }
  313. res.Kind = c.Kind
  314. return res, nil
  315. }
  316. // GetStatefulSet gets the statefulset given the name and namespace
  317. func (a *Agent) GetStatefulSet(c grapher.Object) (*appsv1.StatefulSet, error) {
  318. res, err := a.Clientset.AppsV1().StatefulSets(c.Namespace).Get(
  319. context.TODO(),
  320. c.Name,
  321. metav1.GetOptions{},
  322. )
  323. if err != nil {
  324. return nil, err
  325. }
  326. res.Kind = c.Kind
  327. return res, nil
  328. }
  329. // GetReplicaSet gets the replicaset given the name and namespace
  330. func (a *Agent) GetReplicaSet(c grapher.Object) (*appsv1.ReplicaSet, error) {
  331. res, err := a.Clientset.AppsV1().ReplicaSets(c.Namespace).Get(
  332. context.TODO(),
  333. c.Name,
  334. metav1.GetOptions{},
  335. )
  336. if err != nil {
  337. return nil, err
  338. }
  339. res.Kind = c.Kind
  340. return res, nil
  341. }
  342. // GetDaemonSet gets the daemonset by name and namespace
  343. func (a *Agent) GetDaemonSet(c grapher.Object) (*appsv1.DaemonSet, error) {
  344. res, err := a.Clientset.AppsV1().DaemonSets(c.Namespace).Get(
  345. context.TODO(),
  346. c.Name,
  347. metav1.GetOptions{},
  348. )
  349. if err != nil {
  350. return nil, err
  351. }
  352. res.Kind = c.Kind
  353. return res, nil
  354. }
  355. // GetJob gets the job by name and namespace
  356. func (a *Agent) GetJob(c grapher.Object) (*batchv1.Job, error) {
  357. res, err := a.Clientset.BatchV1().Jobs(c.Namespace).Get(
  358. context.TODO(),
  359. c.Name,
  360. metav1.GetOptions{},
  361. )
  362. if err != nil {
  363. return nil, err
  364. }
  365. res.Kind = c.Kind
  366. return res, nil
  367. }
  368. // GetCronJob gets the CronJob by name and namespace
  369. func (a *Agent) GetCronJob(c grapher.Object) (*batchv1beta1.CronJob, error) {
  370. res, err := a.Clientset.BatchV1beta1().CronJobs(c.Namespace).Get(
  371. context.TODO(),
  372. c.Name,
  373. metav1.GetOptions{},
  374. )
  375. if err != nil {
  376. return nil, err
  377. }
  378. res.Kind = c.Kind
  379. return res, nil
  380. }
  381. // GetPodsByLabel retrieves pods with matching labels
  382. func (a *Agent) GetPodsByLabel(selector string, namespace string) (*v1.PodList, error) {
  383. // Search in all namespaces for matching pods
  384. return a.Clientset.CoreV1().Pods(namespace).List(
  385. context.TODO(),
  386. metav1.ListOptions{
  387. LabelSelector: selector,
  388. },
  389. )
  390. }
  391. // DeletePod deletes a pod by name and namespace
  392. func (a *Agent) DeletePod(namespace string, name string) error {
  393. return a.Clientset.CoreV1().Pods(namespace).Delete(
  394. context.TODO(),
  395. name,
  396. metav1.DeleteOptions{},
  397. )
  398. }
  399. // GetPodLogs streams real-time logs from a given pod.
  400. func (a *Agent) GetPodLogs(namespace string, name string, conn *websocket.Conn) error {
  401. // get the pod to read in the list of contains
  402. pod, err := a.Clientset.CoreV1().Pods(namespace).Get(
  403. context.Background(),
  404. name,
  405. metav1.GetOptions{},
  406. )
  407. if err != nil {
  408. return fmt.Errorf("Cannot get pod %s: %s", name, err.Error())
  409. }
  410. container := pod.Spec.Containers[0].Name
  411. tails := int64(400)
  412. // follow logs
  413. podLogOpts := v1.PodLogOptions{
  414. Follow: true,
  415. TailLines: &tails,
  416. Container: container,
  417. }
  418. req := a.Clientset.CoreV1().Pods(namespace).GetLogs(name, &podLogOpts)
  419. podLogs, err := req.Stream(context.TODO())
  420. if err != nil {
  421. return fmt.Errorf("Cannot open log stream for pod %s: %s", name, err.Error())
  422. }
  423. defer podLogs.Close()
  424. r := bufio.NewReader(podLogs)
  425. errorchan := make(chan error)
  426. go func() {
  427. // listens for websocket closing handshake
  428. for {
  429. if _, _, err := conn.ReadMessage(); err != nil {
  430. defer conn.Close()
  431. errorchan <- nil
  432. return
  433. }
  434. }
  435. }()
  436. go func() {
  437. for {
  438. select {
  439. case <-errorchan:
  440. defer close(errorchan)
  441. return
  442. default:
  443. }
  444. bytes, err := r.ReadBytes('\n')
  445. if writeErr := conn.WriteMessage(websocket.TextMessage, bytes); writeErr != nil {
  446. errorchan <- writeErr
  447. return
  448. }
  449. if err != nil {
  450. if err != io.EOF {
  451. errorchan <- err
  452. return
  453. }
  454. errorchan <- nil
  455. return
  456. }
  457. }
  458. }()
  459. for {
  460. select {
  461. case err = <-errorchan:
  462. return err
  463. }
  464. }
  465. }
  466. // StopJobWithJobSidecar sends a termination signal to a job running with a sidecar
  467. func (a *Agent) StopJobWithJobSidecar(namespace, name string) error {
  468. jobPods, err := a.GetJobPods(namespace, name)
  469. if err != nil {
  470. return err
  471. }
  472. podName := jobPods[0].ObjectMeta.Name
  473. restConf, err := a.RESTClientGetter.ToRESTConfig()
  474. restConf.GroupVersion = &schema.GroupVersion{
  475. Group: "api",
  476. Version: "v1",
  477. }
  478. restConf.NegotiatedSerializer = runtime.NewSimpleNegotiatedSerializer(runtime.SerializerInfo{})
  479. restClient, err := rest.RESTClientFor(restConf)
  480. if err != nil {
  481. return err
  482. }
  483. req := restClient.Post().
  484. Resource("pods").
  485. Name(podName).
  486. Namespace(namespace).
  487. SubResource("exec")
  488. req.Param("command", "./signal.sh")
  489. req.Param("container", "sidecar")
  490. req.Param("stdin", "true")
  491. req.Param("stdout", "false")
  492. req.Param("tty", "false")
  493. exec, err := remotecommand.NewSPDYExecutor(restConf, "POST", req.URL())
  494. if err != nil {
  495. return err
  496. }
  497. return exec.Stream(remotecommand.StreamOptions{
  498. Tty: false,
  499. Stdin: strings.NewReader("./signal.sh"),
  500. })
  501. }
  502. // RunWebsocketTask will run a websocket task. If the websocket returns an anauthorized error, it will restart
  503. // the task some number of times until failing
  504. func (a *Agent) RunWebsocketTask(task func() error) error {
  505. lastTime := int64(0)
  506. for {
  507. if err := a.UpdateClientset(); err != nil {
  508. return err
  509. }
  510. err := task()
  511. if err == nil {
  512. return nil
  513. }
  514. if !errors2.Is(err, &AuthError{}) {
  515. return err
  516. }
  517. if time.Now().Unix()-lastTime < 60 { // don't regenerate connection if too many unauthorized errors
  518. return err
  519. }
  520. lastTime = time.Now().Unix()
  521. }
  522. }
  523. // StreamControllerStatus streams controller status. Supports Deployment, StatefulSet, ReplicaSet, and DaemonSet
  524. // TODO: Support Jobs
  525. func (a *Agent) StreamControllerStatus(conn *websocket.Conn, kind string, selectors string) error {
  526. run := func() error {
  527. // selectors is an array of max length 1. StreamControllerStatus accepts calls without the selectors argument.
  528. // selectors argument is a single string with comma separated key=value pairs. (e.g. "app=porter,porter=true")
  529. tweakListOptionsFunc := func(options *metav1.ListOptions) {
  530. options.LabelSelector = selectors
  531. }
  532. factory := informers.NewSharedInformerFactoryWithOptions(
  533. a.Clientset,
  534. 0,
  535. informers.WithTweakListOptions(tweakListOptionsFunc),
  536. )
  537. var informer cache.SharedInformer
  538. // Spins up an informer depending on kind. Convert to lowercase for robustness
  539. switch strings.ToLower(kind) {
  540. case "deployment":
  541. informer = factory.Apps().V1().Deployments().Informer()
  542. case "statefulset":
  543. informer = factory.Apps().V1().StatefulSets().Informer()
  544. case "replicaset":
  545. informer = factory.Apps().V1().ReplicaSets().Informer()
  546. case "daemonset":
  547. informer = factory.Apps().V1().DaemonSets().Informer()
  548. case "job":
  549. informer = factory.Batch().V1().Jobs().Informer()
  550. case "cronjob":
  551. informer = factory.Batch().V1beta1().CronJobs().Informer()
  552. case "namespace":
  553. informer = factory.Core().V1().Namespaces().Informer()
  554. case "pod":
  555. informer = factory.Core().V1().Pods().Informer()
  556. }
  557. stopper := make(chan struct{})
  558. errorchan := make(chan error)
  559. defer close(stopper)
  560. informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
  561. if strings.HasSuffix(err.Error(), ": Unauthorized") {
  562. errorchan <- &AuthError{}
  563. }
  564. })
  565. informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
  566. UpdateFunc: func(oldObj, newObj interface{}) {
  567. msg := Message{
  568. EventType: "UPDATE",
  569. Object: newObj,
  570. Kind: strings.ToLower(kind),
  571. }
  572. if writeErr := conn.WriteJSON(msg); writeErr != nil {
  573. errorchan <- writeErr
  574. return
  575. }
  576. },
  577. AddFunc: func(obj interface{}) {
  578. msg := Message{
  579. EventType: "ADD",
  580. Object: obj,
  581. Kind: strings.ToLower(kind),
  582. }
  583. if writeErr := conn.WriteJSON(msg); writeErr != nil {
  584. errorchan <- writeErr
  585. return
  586. }
  587. },
  588. DeleteFunc: func(obj interface{}) {
  589. msg := Message{
  590. EventType: "DELETE",
  591. Object: obj,
  592. Kind: strings.ToLower(kind),
  593. }
  594. if writeErr := conn.WriteJSON(msg); writeErr != nil {
  595. errorchan <- writeErr
  596. return
  597. }
  598. },
  599. })
  600. go func() {
  601. // listens for websocket closing handshake
  602. for {
  603. if _, _, err := conn.ReadMessage(); err != nil {
  604. conn.Close()
  605. errorchan <- nil
  606. return
  607. }
  608. }
  609. }()
  610. go informer.Run(stopper)
  611. for {
  612. select {
  613. case err := <-errorchan:
  614. return err
  615. }
  616. }
  617. }
  618. return a.RunWebsocketTask(run)
  619. }
  620. var b64 = base64.StdEncoding
  621. var magicGzip = []byte{0x1f, 0x8b, 0x08}
  622. func decodeRelease(data string) (*rspb.Release, error) {
  623. // base64 decode string
  624. b, err := b64.DecodeString(data)
  625. if err != nil {
  626. return nil, err
  627. }
  628. // For backwards compatibility with releases that were stored before
  629. // compression was introduced we skip decompression if the
  630. // gzip magic header is not found
  631. if bytes.Equal(b[0:3], magicGzip) {
  632. r, err := gzip.NewReader(bytes.NewReader(b))
  633. if err != nil {
  634. return nil, err
  635. }
  636. defer r.Close()
  637. b2, err := ioutil.ReadAll(r)
  638. if err != nil {
  639. return nil, err
  640. }
  641. b = b2
  642. }
  643. var rls rspb.Release
  644. // unmarshal release object bytes
  645. if err := json.Unmarshal(b, &rls); err != nil {
  646. return nil, err
  647. }
  648. return &rls, nil
  649. }
  650. func contains(s []string, str string) bool {
  651. for _, v := range s {
  652. if v == str {
  653. return true
  654. }
  655. }
  656. return false
  657. }
  658. func parseSecretToHelmRelease(secret v1.Secret, chartList []string) (*rspb.Release, bool, error) {
  659. if secret.Type != "helm.sh/release.v1" {
  660. return nil, true, nil
  661. }
  662. releaseData, ok := secret.Data["release"]
  663. if !ok {
  664. return nil, true, fmt.Errorf("release field not found")
  665. }
  666. helm_object, err := decodeRelease(string(releaseData))
  667. if err != nil {
  668. return nil, true, err
  669. }
  670. if len(chartList) > 0 && !contains(chartList, helm_object.Name) {
  671. return nil, true, nil
  672. }
  673. return helm_object, false, nil
  674. }
  675. func (a *Agent) StreamHelmReleases(conn *websocket.Conn, namespace string, chartList []string, selectors string) error {
  676. run := func() error {
  677. tweakListOptionsFunc := func(options *metav1.ListOptions) {
  678. options.LabelSelector = selectors
  679. }
  680. factory := informers.NewSharedInformerFactoryWithOptions(
  681. a.Clientset,
  682. 0,
  683. informers.WithTweakListOptions(tweakListOptionsFunc),
  684. informers.WithNamespace(namespace),
  685. )
  686. informer := factory.Core().V1().Secrets().Informer()
  687. stopper := make(chan struct{})
  688. errorchan := make(chan error)
  689. defer close(stopper)
  690. informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
  691. if strings.HasSuffix(err.Error(), ": Unauthorized") {
  692. errorchan <- &AuthError{}
  693. }
  694. })
  695. informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
  696. UpdateFunc: func(oldObj, newObj interface{}) {
  697. secretObj, ok := newObj.(*v1.Secret)
  698. if !ok {
  699. errorchan <- fmt.Errorf("could not cast to secret")
  700. return
  701. }
  702. helm_object, isNotHelmRelease, err := parseSecretToHelmRelease(*secretObj, chartList)
  703. if isNotHelmRelease && err == nil {
  704. return
  705. }
  706. if err != nil {
  707. errorchan <- err
  708. return
  709. }
  710. msg := Message{
  711. EventType: "UPDATE",
  712. Object: helm_object,
  713. }
  714. if writeErr := conn.WriteJSON(msg); writeErr != nil {
  715. errorchan <- writeErr
  716. return
  717. }
  718. },
  719. AddFunc: func(obj interface{}) {
  720. secretObj, ok := obj.(*v1.Secret)
  721. if !ok {
  722. errorchan <- fmt.Errorf("could not cast to secret")
  723. return
  724. }
  725. helm_object, isNotHelmRelease, err := parseSecretToHelmRelease(*secretObj, chartList)
  726. if isNotHelmRelease && err == nil {
  727. return
  728. }
  729. if err != nil {
  730. errorchan <- err
  731. return
  732. }
  733. msg := Message{
  734. EventType: "ADD",
  735. Object: helm_object,
  736. }
  737. if writeErr := conn.WriteJSON(msg); writeErr != nil {
  738. errorchan <- writeErr
  739. return
  740. }
  741. },
  742. DeleteFunc: func(obj interface{}) {
  743. secretObj, ok := obj.(*v1.Secret)
  744. if !ok {
  745. errorchan <- fmt.Errorf("could not cast to secret")
  746. return
  747. }
  748. helm_object, isNotHelmRelease, err := parseSecretToHelmRelease(*secretObj, chartList)
  749. if isNotHelmRelease && err == nil {
  750. return
  751. }
  752. if err != nil {
  753. errorchan <- err
  754. return
  755. }
  756. msg := Message{
  757. EventType: "DELETE",
  758. Object: helm_object,
  759. }
  760. if writeErr := conn.WriteJSON(msg); writeErr != nil {
  761. errorchan <- writeErr
  762. return
  763. }
  764. },
  765. })
  766. go func() {
  767. // listens for websocket closing handshake
  768. for {
  769. if _, _, err := conn.ReadMessage(); err != nil {
  770. conn.Close()
  771. errorchan <- nil
  772. return
  773. }
  774. }
  775. }()
  776. go informer.Run(stopper)
  777. for {
  778. select {
  779. case err := <-errorchan:
  780. return err
  781. }
  782. }
  783. }
  784. return a.RunWebsocketTask(run)
  785. }
  786. type SharedProvisionOpts struct {
  787. ProjectID uint
  788. Repo repository.Repository
  789. Infra *models.Infra
  790. Operation provisioner.ProvisionerOperation
  791. PGConf *env.DBConf
  792. RedisConf *env.RedisConf
  793. ProvImageTag string
  794. ProvImagePullSecret string
  795. }
  796. // ProvisionECR spawns a new provisioning pod that creates an ECR instance
  797. func (a *Agent) ProvisionECR(
  798. opts *SharedProvisionOpts,
  799. awsConf *integrations.AWSIntegration,
  800. ecrName string,
  801. ) (*batchv1.Job, error) {
  802. id := opts.Infra.GetUniqueName()
  803. prov := &provisioner.Conf{
  804. ID: id,
  805. Name: fmt.Sprintf("prov-%s-%s", id, string(opts.Operation)),
  806. Kind: provisioner.ECR,
  807. Operation: opts.Operation,
  808. Redis: opts.RedisConf,
  809. Postgres: opts.PGConf,
  810. ProvisionerImageTag: opts.ProvImageTag,
  811. ImagePullSecret: opts.ProvImagePullSecret,
  812. LastApplied: opts.Infra.LastApplied,
  813. AWS: &aws.Conf{
  814. AWSRegion: awsConf.AWSRegion,
  815. AWSAccessKeyID: string(awsConf.AWSAccessKeyID),
  816. AWSSecretAccessKey: string(awsConf.AWSSecretAccessKey),
  817. },
  818. ECR: &ecr.Conf{
  819. ECRName: ecrName,
  820. },
  821. }
  822. return a.provision(prov, opts.Infra, opts.Repo)
  823. }
  824. // ProvisionEKS spawns a new provisioning pod that creates an EKS instance
  825. func (a *Agent) ProvisionEKS(
  826. opts *SharedProvisionOpts,
  827. awsConf *integrations.AWSIntegration,
  828. eksName, machineType string,
  829. ) (*batchv1.Job, error) {
  830. id := opts.Infra.GetUniqueName()
  831. prov := &provisioner.Conf{
  832. ID: id,
  833. Name: fmt.Sprintf("prov-%s-%s", id, string(opts.Operation)),
  834. Kind: provisioner.EKS,
  835. Operation: opts.Operation,
  836. Redis: opts.RedisConf,
  837. Postgres: opts.PGConf,
  838. ProvisionerImageTag: opts.ProvImageTag,
  839. ImagePullSecret: opts.ProvImagePullSecret,
  840. LastApplied: opts.Infra.LastApplied,
  841. AWS: &aws.Conf{
  842. AWSRegion: awsConf.AWSRegion,
  843. AWSAccessKeyID: string(awsConf.AWSAccessKeyID),
  844. AWSSecretAccessKey: string(awsConf.AWSSecretAccessKey),
  845. },
  846. EKS: &eks.Conf{
  847. ClusterName: eksName,
  848. MachineType: machineType,
  849. },
  850. }
  851. return a.provision(prov, opts.Infra, opts.Repo)
  852. }
  853. // ProvisionGCR spawns a new provisioning pod that creates a GCR instance
  854. func (a *Agent) ProvisionGCR(
  855. opts *SharedProvisionOpts,
  856. gcpConf *integrations.GCPIntegration,
  857. ) (*batchv1.Job, error) {
  858. id := opts.Infra.GetUniqueName()
  859. prov := &provisioner.Conf{
  860. ID: id,
  861. Name: fmt.Sprintf("prov-%s-%s", id, string(opts.Operation)),
  862. Kind: provisioner.GCR,
  863. Operation: opts.Operation,
  864. Redis: opts.RedisConf,
  865. Postgres: opts.PGConf,
  866. ProvisionerImageTag: opts.ProvImageTag,
  867. ImagePullSecret: opts.ProvImagePullSecret,
  868. LastApplied: opts.Infra.LastApplied,
  869. GCP: &gcp.Conf{
  870. GCPRegion: gcpConf.GCPRegion,
  871. GCPProjectID: gcpConf.GCPProjectID,
  872. GCPKeyData: string(gcpConf.GCPKeyData),
  873. },
  874. }
  875. return a.provision(prov, opts.Infra, opts.Repo)
  876. }
  877. // ProvisionGKE spawns a new provisioning pod that creates a GKE instance
  878. func (a *Agent) ProvisionGKE(
  879. opts *SharedProvisionOpts,
  880. gcpConf *integrations.GCPIntegration,
  881. gkeName string,
  882. ) (*batchv1.Job, error) {
  883. id := opts.Infra.GetUniqueName()
  884. prov := &provisioner.Conf{
  885. ID: id,
  886. Name: fmt.Sprintf("prov-%s-%s", id, string(opts.Operation)),
  887. Kind: provisioner.GKE,
  888. Operation: opts.Operation,
  889. Redis: opts.RedisConf,
  890. Postgres: opts.PGConf,
  891. ProvisionerImageTag: opts.ProvImageTag,
  892. ImagePullSecret: opts.ProvImagePullSecret,
  893. LastApplied: opts.Infra.LastApplied,
  894. GCP: &gcp.Conf{
  895. GCPRegion: gcpConf.GCPRegion,
  896. GCPProjectID: gcpConf.GCPProjectID,
  897. GCPKeyData: string(gcpConf.GCPKeyData),
  898. },
  899. GKE: &gke.Conf{
  900. ClusterName: gkeName,
  901. },
  902. }
  903. return a.provision(prov, opts.Infra, opts.Repo)
  904. }
  905. // ProvisionDOCR spawns a new provisioning pod that creates a DOCR instance
  906. func (a *Agent) ProvisionDOCR(
  907. opts *SharedProvisionOpts,
  908. doConf *integrations.OAuthIntegration,
  909. doAuth *oauth2.Config,
  910. docrName, docrSubscriptionTier string,
  911. ) (*batchv1.Job, error) {
  912. // get the token
  913. oauthInt, err := opts.Repo.OAuthIntegration().ReadOAuthIntegration(
  914. opts.ProjectID,
  915. opts.Infra.DOIntegrationID,
  916. )
  917. if err != nil {
  918. return nil, err
  919. }
  920. tok, _, err := oauth.GetAccessToken(oauthInt.SharedOAuthModel, doAuth, oauth.MakeUpdateOAuthIntegrationTokenFunction(oauthInt, opts.Repo))
  921. if err != nil {
  922. return nil, err
  923. }
  924. id := opts.Infra.GetUniqueName()
  925. prov := &provisioner.Conf{
  926. ID: id,
  927. Name: fmt.Sprintf("prov-%s-%s", id, string(opts.Operation)),
  928. Kind: provisioner.DOCR,
  929. Operation: opts.Operation,
  930. Redis: opts.RedisConf,
  931. Postgres: opts.PGConf,
  932. ProvisionerImageTag: opts.ProvImageTag,
  933. ImagePullSecret: opts.ProvImagePullSecret,
  934. LastApplied: opts.Infra.LastApplied,
  935. DO: &do.Conf{
  936. DOToken: tok,
  937. },
  938. DOCR: &docr.Conf{
  939. DOCRName: docrName,
  940. DOCRSubscriptionTier: docrSubscriptionTier,
  941. },
  942. }
  943. return a.provision(prov, opts.Infra, opts.Repo)
  944. }
  945. // ProvisionDOKS spawns a new provisioning pod that creates a DOKS instance
  946. func (a *Agent) ProvisionDOKS(
  947. opts *SharedProvisionOpts,
  948. doConf *integrations.OAuthIntegration,
  949. doAuth *oauth2.Config,
  950. doRegion, doksClusterName string,
  951. ) (*batchv1.Job, error) {
  952. // get the token
  953. oauthInt, err := opts.Repo.OAuthIntegration().ReadOAuthIntegration(
  954. opts.ProjectID,
  955. opts.Infra.DOIntegrationID,
  956. )
  957. if err != nil {
  958. return nil, err
  959. }
  960. tok, _, err := oauth.GetAccessToken(oauthInt.SharedOAuthModel, doAuth, oauth.MakeUpdateOAuthIntegrationTokenFunction(oauthInt, opts.Repo))
  961. if err != nil {
  962. return nil, err
  963. }
  964. id := opts.Infra.GetUniqueName()
  965. prov := &provisioner.Conf{
  966. ID: id,
  967. Name: fmt.Sprintf("prov-%s-%s", id, string(opts.Operation)),
  968. Kind: provisioner.DOKS,
  969. Operation: opts.Operation,
  970. Redis: opts.RedisConf,
  971. Postgres: opts.PGConf,
  972. LastApplied: opts.Infra.LastApplied,
  973. ProvisionerImageTag: opts.ProvImageTag,
  974. ImagePullSecret: opts.ProvImagePullSecret,
  975. DO: &do.Conf{
  976. DOToken: tok,
  977. },
  978. DOKS: &doks.Conf{
  979. DORegion: doRegion,
  980. DOKSClusterName: doksClusterName,
  981. },
  982. }
  983. return a.provision(prov, opts.Infra, opts.Repo)
  984. }
  985. // ProvisionTest spawns a new provisioning pod that tests provisioning
  986. func (a *Agent) ProvisionTest(
  987. opts *SharedProvisionOpts,
  988. ) (*batchv1.Job, error) {
  989. id := opts.Infra.GetUniqueName()
  990. prov := &provisioner.Conf{
  991. ID: id,
  992. Name: fmt.Sprintf("prov-%s-%s", id, string(opts.Operation)),
  993. Operation: opts.Operation,
  994. Kind: provisioner.Test,
  995. Redis: opts.RedisConf,
  996. Postgres: opts.PGConf,
  997. ProvisionerImageTag: opts.ProvImageTag,
  998. ImagePullSecret: opts.ProvImagePullSecret,
  999. }
  1000. return a.provision(prov, opts.Infra, opts.Repo)
  1001. }
  1002. func (a *Agent) provision(
  1003. prov *provisioner.Conf,
  1004. infra *models.Infra,
  1005. repo repository.Repository,
  1006. ) (*batchv1.Job, error) {
  1007. prov.Namespace = "default"
  1008. job, err := prov.GetProvisionerJobTemplate()
  1009. if err != nil {
  1010. return nil, err
  1011. }
  1012. job, err = a.Clientset.BatchV1().Jobs(prov.Namespace).Create(
  1013. context.TODO(),
  1014. job,
  1015. metav1.CreateOptions{},
  1016. )
  1017. if err != nil {
  1018. return nil, err
  1019. }
  1020. infra.LastApplied = prov.LastApplied
  1021. infra, err = repo.Infra().UpdateInfra(infra)
  1022. if err != nil {
  1023. return nil, err
  1024. }
  1025. return job, nil
  1026. }
  1027. // CreateImagePullSecrets will create the required image pull secrets and
  1028. // return a map from the registry name to the name of the secret.
  1029. func (a *Agent) CreateImagePullSecrets(
  1030. repo repository.Repository,
  1031. namespace string,
  1032. linkedRegs map[string]*models.Registry,
  1033. doAuth *oauth2.Config,
  1034. ) (map[string]string, error) {
  1035. res := make(map[string]string)
  1036. for key, val := range linkedRegs {
  1037. _reg := registry.Registry(*val)
  1038. data, err := _reg.GetDockerConfigJSON(repo, doAuth)
  1039. if err != nil {
  1040. return nil, err
  1041. }
  1042. secretName := fmt.Sprintf("porter-%s-%d", val.ToRegistryType().Service, val.ID)
  1043. secret, err := a.Clientset.CoreV1().Secrets(namespace).Get(
  1044. context.TODO(),
  1045. secretName,
  1046. metav1.GetOptions{},
  1047. )
  1048. // if not found, create the secret
  1049. if err != nil && errors.IsNotFound(err) {
  1050. _, err = a.Clientset.CoreV1().Secrets(namespace).Create(
  1051. context.TODO(),
  1052. &v1.Secret{
  1053. ObjectMeta: metav1.ObjectMeta{
  1054. Name: secretName,
  1055. },
  1056. Data: map[string][]byte{
  1057. string(v1.DockerConfigJsonKey): data,
  1058. },
  1059. Type: v1.SecretTypeDockerConfigJson,
  1060. },
  1061. metav1.CreateOptions{},
  1062. )
  1063. if err != nil {
  1064. return nil, err
  1065. }
  1066. // add secret name to the map
  1067. res[key] = secretName
  1068. continue
  1069. } else if err != nil {
  1070. return nil, err
  1071. }
  1072. // otherwise, check that the secret contains the correct data: if
  1073. // if doesn't, update it
  1074. if !bytes.Equal(secret.Data[v1.DockerConfigJsonKey], data) {
  1075. _, err := a.Clientset.CoreV1().Secrets(namespace).Update(
  1076. context.TODO(),
  1077. &v1.Secret{
  1078. ObjectMeta: metav1.ObjectMeta{
  1079. Name: secretName,
  1080. },
  1081. Data: map[string][]byte{
  1082. string(v1.DockerConfigJsonKey): data,
  1083. },
  1084. Type: v1.SecretTypeDockerConfigJson,
  1085. },
  1086. metav1.UpdateOptions{},
  1087. )
  1088. if err != nil {
  1089. return nil, err
  1090. }
  1091. }
  1092. // add secret name to the map
  1093. res[key] = secretName
  1094. }
  1095. return res, nil
  1096. }