agent.go 32 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345
  1. package kubernetes
  2. import (
  3. "bufio"
  4. "bytes"
  5. "compress/gzip"
  6. "context"
  7. "encoding/base64"
  8. "encoding/json"
  9. "fmt"
  10. "io"
  11. "io/ioutil"
  12. "strings"
  13. "time"
  14. "github.com/porter-dev/porter/api/server/shared/config/env"
  15. "github.com/porter-dev/porter/internal/kubernetes/provisioner"
  16. "github.com/porter-dev/porter/internal/kubernetes/provisioner/aws"
  17. "github.com/porter-dev/porter/internal/kubernetes/provisioner/aws/ecr"
  18. "github.com/porter-dev/porter/internal/kubernetes/provisioner/aws/eks"
  19. "github.com/porter-dev/porter/internal/kubernetes/provisioner/do"
  20. "github.com/porter-dev/porter/internal/kubernetes/provisioner/do/docr"
  21. "github.com/porter-dev/porter/internal/kubernetes/provisioner/do/doks"
  22. "github.com/porter-dev/porter/internal/kubernetes/provisioner/gcp"
  23. "github.com/porter-dev/porter/internal/kubernetes/provisioner/gcp/gke"
  24. "github.com/porter-dev/porter/internal/models"
  25. "github.com/porter-dev/porter/internal/models/integrations"
  26. "github.com/porter-dev/porter/internal/oauth"
  27. "github.com/porter-dev/porter/internal/registry"
  28. "github.com/porter-dev/porter/internal/repository"
  29. "golang.org/x/oauth2"
  30. errors2 "errors"
  31. "github.com/gorilla/websocket"
  32. "github.com/porter-dev/porter/internal/helm/grapher"
  33. appsv1 "k8s.io/api/apps/v1"
  34. batchv1 "k8s.io/api/batch/v1"
  35. batchv1beta1 "k8s.io/api/batch/v1beta1"
  36. v1 "k8s.io/api/core/v1"
  37. v1beta1 "k8s.io/api/extensions/v1beta1"
  38. "k8s.io/apimachinery/pkg/api/errors"
  39. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  40. "k8s.io/apimachinery/pkg/runtime"
  41. "k8s.io/apimachinery/pkg/runtime/schema"
  42. "k8s.io/apimachinery/pkg/types"
  43. "k8s.io/cli-runtime/pkg/genericclioptions"
  44. "k8s.io/client-go/informers"
  45. "k8s.io/client-go/kubernetes"
  46. "k8s.io/client-go/rest"
  47. "k8s.io/client-go/tools/cache"
  48. "k8s.io/client-go/tools/remotecommand"
  49. rspb "helm.sh/helm/v3/pkg/release"
  50. )
  51. // Agent is a Kubernetes agent for performing operations that interact with the
  52. // api server
  53. type Agent struct {
  54. RESTClientGetter genericclioptions.RESTClientGetter
  55. Clientset kubernetes.Interface
  56. }
  57. type Message struct {
  58. EventType string `json:"event_type"`
  59. Object interface{}
  60. Kind string
  61. }
  62. type ListOptions struct {
  63. FieldSelector string
  64. }
  65. type AuthError struct{}
  66. func (e *AuthError) Error() string {
  67. return "Unauthorized error"
  68. }
  69. // UpdateClientset updates the Agent's Clientset (this refreshes auth tokens)
  70. func (a *Agent) UpdateClientset() error {
  71. restConf, err := a.RESTClientGetter.ToRESTConfig()
  72. if err != nil {
  73. return err
  74. }
  75. clientset, err := kubernetes.NewForConfig(restConf)
  76. if err != nil {
  77. return err
  78. }
  79. a.Clientset = clientset
  80. return nil
  81. }
  82. // CreateConfigMap creates the configmap given the key-value pairs and namespace
  83. func (a *Agent) CreateConfigMap(name string, namespace string, configMap map[string]string) (*v1.ConfigMap, error) {
  84. return a.Clientset.CoreV1().ConfigMaps(namespace).Create(
  85. context.TODO(),
  86. &v1.ConfigMap{
  87. ObjectMeta: metav1.ObjectMeta{
  88. Name: name,
  89. Namespace: namespace,
  90. Labels: map[string]string{
  91. "porter": "true",
  92. },
  93. },
  94. Data: configMap,
  95. },
  96. metav1.CreateOptions{},
  97. )
  98. }
  99. // CreateLinkedSecret creates a secret given the key-value pairs and namespace. Values are
  100. // base64 encoded
  101. func (a *Agent) CreateLinkedSecret(name, namespace, cmName string, data map[string][]byte) (*v1.Secret, error) {
  102. return a.Clientset.CoreV1().Secrets(namespace).Create(
  103. context.TODO(),
  104. &v1.Secret{
  105. ObjectMeta: metav1.ObjectMeta{
  106. Name: name,
  107. Namespace: namespace,
  108. Labels: map[string]string{
  109. "porter": "true",
  110. "configmap": cmName,
  111. },
  112. },
  113. Data: data,
  114. },
  115. metav1.CreateOptions{},
  116. )
  117. }
  118. type mergeConfigMapData struct {
  119. Data map[string]*string `json:"data"`
  120. }
  121. // UpdateConfigMap updates the configmap given its name and namespace
  122. func (a *Agent) UpdateConfigMap(name string, namespace string, configMap map[string]string) (*v1.ConfigMap, error) {
  123. cmData := make(map[string]*string)
  124. for key, val := range configMap {
  125. valCopy := val
  126. cmData[key] = &valCopy
  127. if len(val) == 0 {
  128. cmData[key] = nil
  129. }
  130. }
  131. mergeCM := &mergeConfigMapData{
  132. Data: cmData,
  133. }
  134. patchBytes, err := json.Marshal(mergeCM)
  135. if err != nil {
  136. return nil, err
  137. }
  138. return a.Clientset.CoreV1().ConfigMaps(namespace).Patch(
  139. context.Background(),
  140. name,
  141. types.MergePatchType,
  142. patchBytes,
  143. metav1.PatchOptions{},
  144. )
  145. }
  146. type mergeLinkedSecretData struct {
  147. Data map[string]*[]byte `json:"data"`
  148. }
  149. // UpdateLinkedSecret updates the secret given its name and namespace
  150. func (a *Agent) UpdateLinkedSecret(name, namespace, cmName string, data map[string][]byte) error {
  151. secretData := make(map[string]*[]byte)
  152. for key, val := range data {
  153. valCopy := val
  154. secretData[key] = &valCopy
  155. if len(val) == 0 {
  156. secretData[key] = nil
  157. }
  158. }
  159. mergeSecret := &mergeLinkedSecretData{
  160. Data: secretData,
  161. }
  162. patchBytes, err := json.Marshal(mergeSecret)
  163. if err != nil {
  164. return err
  165. }
  166. _, err = a.Clientset.CoreV1().Secrets(namespace).Patch(
  167. context.TODO(),
  168. name,
  169. types.MergePatchType,
  170. patchBytes,
  171. metav1.PatchOptions{},
  172. )
  173. return err
  174. }
  175. // DeleteConfigMap deletes the configmap given its name and namespace
  176. func (a *Agent) DeleteConfigMap(name string, namespace string) error {
  177. return a.Clientset.CoreV1().ConfigMaps(namespace).Delete(
  178. context.TODO(),
  179. name,
  180. metav1.DeleteOptions{},
  181. )
  182. }
  183. // DeleteLinkedSecret deletes the secret given its name and namespace
  184. func (a *Agent) DeleteLinkedSecret(name, namespace string) error {
  185. return a.Clientset.CoreV1().Secrets(namespace).Delete(
  186. context.TODO(),
  187. name,
  188. metav1.DeleteOptions{},
  189. )
  190. }
  191. // GetConfigMap retrieves the configmap given its name and namespace
  192. func (a *Agent) GetConfigMap(name string, namespace string) (*v1.ConfigMap, error) {
  193. return a.Clientset.CoreV1().ConfigMaps(namespace).Get(
  194. context.TODO(),
  195. name,
  196. metav1.GetOptions{},
  197. )
  198. }
  199. // GetSecret retrieves the secret given its name and namespace
  200. func (a *Agent) GetSecret(name string, namespace string) (*v1.Secret, error) {
  201. return a.Clientset.CoreV1().Secrets(namespace).Get(
  202. context.TODO(),
  203. name,
  204. metav1.GetOptions{},
  205. )
  206. }
  207. // ListConfigMaps simply lists namespaces
  208. func (a *Agent) ListConfigMaps(namespace string) (*v1.ConfigMapList, error) {
  209. return a.Clientset.CoreV1().ConfigMaps(namespace).List(
  210. context.TODO(),
  211. metav1.ListOptions{
  212. LabelSelector: "porter=true",
  213. },
  214. )
  215. }
  216. // ListEvents lists the events of a given object.
  217. func (a *Agent) ListEvents(name string, namespace string) (*v1.EventList, error) {
  218. return a.Clientset.CoreV1().Events(namespace).List(
  219. context.TODO(),
  220. metav1.ListOptions{
  221. FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=%s", name, namespace),
  222. },
  223. )
  224. }
  225. // ListNamespaces simply lists namespaces
  226. func (a *Agent) ListNamespaces() (*v1.NamespaceList, error) {
  227. return a.Clientset.CoreV1().Namespaces().List(
  228. context.TODO(),
  229. metav1.ListOptions{},
  230. )
  231. }
  232. // CreateNamespace creates a namespace with the given name.
  233. func (a *Agent) CreateNamespace(name string) (*v1.Namespace, error) {
  234. namespace := v1.Namespace{
  235. ObjectMeta: metav1.ObjectMeta{
  236. Name: name,
  237. },
  238. }
  239. return a.Clientset.CoreV1().Namespaces().Create(
  240. context.TODO(),
  241. &namespace,
  242. metav1.CreateOptions{},
  243. )
  244. }
  245. // DeleteNamespace deletes the namespace given the name.
  246. func (a *Agent) DeleteNamespace(name string) error {
  247. return a.Clientset.CoreV1().Namespaces().Delete(
  248. context.TODO(),
  249. name,
  250. metav1.DeleteOptions{},
  251. )
  252. }
  253. // ListJobsByLabel lists jobs in a namespace matching a label
  254. type Label struct {
  255. Key string
  256. Val string
  257. }
  258. func (a *Agent) ListJobsByLabel(namespace string, labels ...Label) ([]batchv1.Job, error) {
  259. selectors := make([]string, 0)
  260. for _, label := range labels {
  261. selectors = append(selectors, fmt.Sprintf("%s=%s", label.Key, label.Val))
  262. }
  263. resp, err := a.Clientset.BatchV1().Jobs(namespace).List(
  264. context.TODO(),
  265. metav1.ListOptions{
  266. LabelSelector: strings.Join(selectors, ","),
  267. },
  268. )
  269. if err != nil {
  270. return nil, err
  271. }
  272. return resp.Items, nil
  273. }
  274. // DeleteJob deletes the job in the given name and namespace.
  275. func (a *Agent) DeleteJob(name, namespace string) error {
  276. return a.Clientset.BatchV1().Jobs(namespace).Delete(
  277. context.TODO(),
  278. name,
  279. metav1.DeleteOptions{},
  280. )
  281. }
  282. // GetJobPods lists all pods belonging to a job in a namespace
  283. func (a *Agent) GetJobPods(namespace, jobName string) ([]v1.Pod, error) {
  284. resp, err := a.Clientset.CoreV1().Pods(namespace).List(
  285. context.TODO(),
  286. metav1.ListOptions{
  287. LabelSelector: fmt.Sprintf("%s=%s", "job-name", jobName),
  288. },
  289. )
  290. if err != nil {
  291. return nil, err
  292. }
  293. return resp.Items, nil
  294. }
  295. // GetIngress gets ingress given the name and namespace
  296. func (a *Agent) GetIngress(namespace string, name string) (*v1beta1.Ingress, error) {
  297. resp, err := a.Clientset.ExtensionsV1beta1().Ingresses(namespace).Get(
  298. context.TODO(),
  299. name,
  300. metav1.GetOptions{},
  301. )
  302. if err != nil && errors.IsNotFound(err) {
  303. return nil, IsNotFoundError
  304. } else if err != nil {
  305. return nil, err
  306. }
  307. return resp, nil
  308. }
  309. var IsNotFoundError = fmt.Errorf("not found")
  310. // GetDeployment gets the deployment given the name and namespace
  311. func (a *Agent) GetDeployment(c grapher.Object) (*appsv1.Deployment, error) {
  312. res, err := a.Clientset.AppsV1().Deployments(c.Namespace).Get(
  313. context.TODO(),
  314. c.Name,
  315. metav1.GetOptions{},
  316. )
  317. if err != nil && errors.IsNotFound(err) {
  318. return nil, IsNotFoundError
  319. } else if err != nil {
  320. return nil, err
  321. }
  322. res.Kind = c.Kind
  323. return res, nil
  324. }
  325. // GetStatefulSet gets the statefulset given the name and namespace
  326. func (a *Agent) GetStatefulSet(c grapher.Object) (*appsv1.StatefulSet, error) {
  327. res, err := a.Clientset.AppsV1().StatefulSets(c.Namespace).Get(
  328. context.TODO(),
  329. c.Name,
  330. metav1.GetOptions{},
  331. )
  332. if err != nil && errors.IsNotFound(err) {
  333. return nil, IsNotFoundError
  334. } else if err != nil {
  335. return nil, err
  336. }
  337. res.Kind = c.Kind
  338. return res, nil
  339. }
  340. // GetReplicaSet gets the replicaset given the name and namespace
  341. func (a *Agent) GetReplicaSet(c grapher.Object) (*appsv1.ReplicaSet, error) {
  342. res, err := a.Clientset.AppsV1().ReplicaSets(c.Namespace).Get(
  343. context.TODO(),
  344. c.Name,
  345. metav1.GetOptions{},
  346. )
  347. if err != nil && errors.IsNotFound(err) {
  348. return nil, IsNotFoundError
  349. } else if err != nil {
  350. return nil, err
  351. }
  352. res.Kind = c.Kind
  353. return res, nil
  354. }
  355. // GetDaemonSet gets the daemonset by name and namespace
  356. func (a *Agent) GetDaemonSet(c grapher.Object) (*appsv1.DaemonSet, error) {
  357. res, err := a.Clientset.AppsV1().DaemonSets(c.Namespace).Get(
  358. context.TODO(),
  359. c.Name,
  360. metav1.GetOptions{},
  361. )
  362. if err != nil && errors.IsNotFound(err) {
  363. return nil, IsNotFoundError
  364. } else if err != nil {
  365. return nil, err
  366. }
  367. res.Kind = c.Kind
  368. return res, nil
  369. }
  370. // GetJob gets the job by name and namespace
  371. func (a *Agent) GetJob(c grapher.Object) (*batchv1.Job, error) {
  372. res, err := a.Clientset.BatchV1().Jobs(c.Namespace).Get(
  373. context.TODO(),
  374. c.Name,
  375. metav1.GetOptions{},
  376. )
  377. if err != nil && errors.IsNotFound(err) {
  378. return nil, IsNotFoundError
  379. } else if err != nil {
  380. return nil, err
  381. }
  382. res.Kind = c.Kind
  383. return res, nil
  384. }
  385. // GetCronJob gets the CronJob by name and namespace
  386. func (a *Agent) GetCronJob(c grapher.Object) (*batchv1beta1.CronJob, error) {
  387. res, err := a.Clientset.BatchV1beta1().CronJobs(c.Namespace).Get(
  388. context.TODO(),
  389. c.Name,
  390. metav1.GetOptions{},
  391. )
  392. if err != nil && errors.IsNotFound(err) {
  393. return nil, IsNotFoundError
  394. } else if err != nil {
  395. return nil, err
  396. }
  397. res.Kind = c.Kind
  398. return res, nil
  399. }
  400. // GetPodsByLabel retrieves pods with matching labels
  401. func (a *Agent) GetPodsByLabel(selector string, namespace string) (*v1.PodList, error) {
  402. // Search in all namespaces for matching pods
  403. return a.Clientset.CoreV1().Pods(namespace).List(
  404. context.TODO(),
  405. metav1.ListOptions{
  406. LabelSelector: selector,
  407. },
  408. )
  409. }
  410. // DeletePod deletes a pod by name and namespace
  411. func (a *Agent) DeletePod(namespace string, name string) error {
  412. err := a.Clientset.CoreV1().Pods(namespace).Delete(
  413. context.TODO(),
  414. name,
  415. metav1.DeleteOptions{},
  416. )
  417. if err != nil && errors.IsNotFound(err) {
  418. return IsNotFoundError
  419. }
  420. return err
  421. }
  422. // GetPodLogs streams real-time logs from a given pod.
  423. func (a *Agent) GetPodLogs(namespace string, name string, conn *websocket.Conn) error {
  424. // get the pod to read in the list of contains
  425. pod, err := a.Clientset.CoreV1().Pods(namespace).Get(
  426. context.Background(),
  427. name,
  428. metav1.GetOptions{},
  429. )
  430. if err != nil && errors.IsNotFound(err) {
  431. return IsNotFoundError
  432. } else if err != nil {
  433. return fmt.Errorf("Cannot get logs from pod %s: %s", name, err.Error())
  434. }
  435. container := pod.Spec.Containers[0].Name
  436. tails := int64(400)
  437. // follow logs
  438. podLogOpts := v1.PodLogOptions{
  439. Follow: true,
  440. TailLines: &tails,
  441. Container: container,
  442. }
  443. req := a.Clientset.CoreV1().Pods(namespace).GetLogs(name, &podLogOpts)
  444. podLogs, err := req.Stream(context.TODO())
  445. if err != nil {
  446. return fmt.Errorf("Cannot open log stream for pod %s: %s", name, err.Error())
  447. }
  448. defer podLogs.Close()
  449. r := bufio.NewReader(podLogs)
  450. errorchan := make(chan error)
  451. go func() {
  452. // listens for websocket closing handshake
  453. for {
  454. if _, _, err := conn.ReadMessage(); err != nil {
  455. defer conn.Close()
  456. errorchan <- nil
  457. return
  458. }
  459. }
  460. }()
  461. go func() {
  462. for {
  463. select {
  464. case <-errorchan:
  465. defer close(errorchan)
  466. return
  467. default:
  468. }
  469. bytes, err := r.ReadBytes('\n')
  470. if writeErr := conn.WriteMessage(websocket.TextMessage, bytes); writeErr != nil {
  471. errorchan <- writeErr
  472. return
  473. }
  474. if err != nil {
  475. if err != io.EOF {
  476. errorchan <- err
  477. return
  478. }
  479. errorchan <- nil
  480. return
  481. }
  482. }
  483. }()
  484. for {
  485. select {
  486. case err = <-errorchan:
  487. return err
  488. }
  489. }
  490. }
  491. // StopJobWithJobSidecar sends a termination signal to a job running with a sidecar
  492. func (a *Agent) StopJobWithJobSidecar(namespace, name string) error {
  493. jobPods, err := a.GetJobPods(namespace, name)
  494. if err != nil {
  495. return err
  496. }
  497. podName := jobPods[0].ObjectMeta.Name
  498. restConf, err := a.RESTClientGetter.ToRESTConfig()
  499. restConf.GroupVersion = &schema.GroupVersion{
  500. Group: "api",
  501. Version: "v1",
  502. }
  503. restConf.NegotiatedSerializer = runtime.NewSimpleNegotiatedSerializer(runtime.SerializerInfo{})
  504. restClient, err := rest.RESTClientFor(restConf)
  505. if err != nil {
  506. return err
  507. }
  508. req := restClient.Post().
  509. Resource("pods").
  510. Name(podName).
  511. Namespace(namespace).
  512. SubResource("exec")
  513. req.Param("command", "./signal.sh")
  514. req.Param("container", "sidecar")
  515. req.Param("stdin", "true")
  516. req.Param("stdout", "false")
  517. req.Param("tty", "false")
  518. exec, err := remotecommand.NewSPDYExecutor(restConf, "POST", req.URL())
  519. if err != nil {
  520. return err
  521. }
  522. return exec.Stream(remotecommand.StreamOptions{
  523. Tty: false,
  524. Stdin: strings.NewReader("./signal.sh"),
  525. })
  526. }
  527. // RunWebsocketTask will run a websocket task. If the websocket returns an anauthorized error, it will restart
  528. // the task some number of times until failing
  529. func (a *Agent) RunWebsocketTask(task func() error) error {
  530. lastTime := int64(0)
  531. for {
  532. if err := a.UpdateClientset(); err != nil {
  533. return err
  534. }
  535. err := task()
  536. if err == nil {
  537. return nil
  538. }
  539. if !errors2.Is(err, &AuthError{}) {
  540. return err
  541. }
  542. if time.Now().Unix()-lastTime < 60 { // don't regenerate connection if too many unauthorized errors
  543. return err
  544. }
  545. lastTime = time.Now().Unix()
  546. }
  547. }
  548. // StreamControllerStatus streams controller status. Supports Deployment, StatefulSet, ReplicaSet, and DaemonSet
  549. // TODO: Support Jobs
  550. func (a *Agent) StreamControllerStatus(conn *websocket.Conn, kind string, selectors string) error {
  551. run := func() error {
  552. // selectors is an array of max length 1. StreamControllerStatus accepts calls without the selectors argument.
  553. // selectors argument is a single string with comma separated key=value pairs. (e.g. "app=porter,porter=true")
  554. tweakListOptionsFunc := func(options *metav1.ListOptions) {
  555. options.LabelSelector = selectors
  556. }
  557. factory := informers.NewSharedInformerFactoryWithOptions(
  558. a.Clientset,
  559. 0,
  560. informers.WithTweakListOptions(tweakListOptionsFunc),
  561. )
  562. var informer cache.SharedInformer
  563. // Spins up an informer depending on kind. Convert to lowercase for robustness
  564. switch strings.ToLower(kind) {
  565. case "deployment":
  566. informer = factory.Apps().V1().Deployments().Informer()
  567. case "statefulset":
  568. informer = factory.Apps().V1().StatefulSets().Informer()
  569. case "replicaset":
  570. informer = factory.Apps().V1().ReplicaSets().Informer()
  571. case "daemonset":
  572. informer = factory.Apps().V1().DaemonSets().Informer()
  573. case "job":
  574. informer = factory.Batch().V1().Jobs().Informer()
  575. case "cronjob":
  576. informer = factory.Batch().V1beta1().CronJobs().Informer()
  577. case "namespace":
  578. informer = factory.Core().V1().Namespaces().Informer()
  579. case "pod":
  580. informer = factory.Core().V1().Pods().Informer()
  581. }
  582. stopper := make(chan struct{})
  583. errorchan := make(chan error)
  584. defer close(stopper)
  585. informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
  586. if strings.HasSuffix(err.Error(), ": Unauthorized") {
  587. errorchan <- &AuthError{}
  588. }
  589. })
  590. informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
  591. UpdateFunc: func(oldObj, newObj interface{}) {
  592. msg := Message{
  593. EventType: "UPDATE",
  594. Object: newObj,
  595. Kind: strings.ToLower(kind),
  596. }
  597. if writeErr := conn.WriteJSON(msg); writeErr != nil {
  598. errorchan <- writeErr
  599. return
  600. }
  601. },
  602. AddFunc: func(obj interface{}) {
  603. msg := Message{
  604. EventType: "ADD",
  605. Object: obj,
  606. Kind: strings.ToLower(kind),
  607. }
  608. if writeErr := conn.WriteJSON(msg); writeErr != nil {
  609. errorchan <- writeErr
  610. return
  611. }
  612. },
  613. DeleteFunc: func(obj interface{}) {
  614. msg := Message{
  615. EventType: "DELETE",
  616. Object: obj,
  617. Kind: strings.ToLower(kind),
  618. }
  619. if writeErr := conn.WriteJSON(msg); writeErr != nil {
  620. errorchan <- writeErr
  621. return
  622. }
  623. },
  624. })
  625. go func() {
  626. // listens for websocket closing handshake
  627. for {
  628. if _, _, err := conn.ReadMessage(); err != nil {
  629. conn.Close()
  630. errorchan <- nil
  631. return
  632. }
  633. }
  634. }()
  635. go informer.Run(stopper)
  636. for {
  637. select {
  638. case err := <-errorchan:
  639. return err
  640. }
  641. }
  642. }
  643. return a.RunWebsocketTask(run)
  644. }
  645. var b64 = base64.StdEncoding
  646. var magicGzip = []byte{0x1f, 0x8b, 0x08}
  647. func decodeRelease(data string) (*rspb.Release, error) {
  648. // base64 decode string
  649. b, err := b64.DecodeString(data)
  650. if err != nil {
  651. return nil, err
  652. }
  653. // For backwards compatibility with releases that were stored before
  654. // compression was introduced we skip decompression if the
  655. // gzip magic header is not found
  656. if bytes.Equal(b[0:3], magicGzip) {
  657. r, err := gzip.NewReader(bytes.NewReader(b))
  658. if err != nil {
  659. return nil, err
  660. }
  661. defer r.Close()
  662. b2, err := ioutil.ReadAll(r)
  663. if err != nil {
  664. return nil, err
  665. }
  666. b = b2
  667. }
  668. var rls rspb.Release
  669. // unmarshal release object bytes
  670. if err := json.Unmarshal(b, &rls); err != nil {
  671. return nil, err
  672. }
  673. return &rls, nil
  674. }
  675. func contains(s []string, str string) bool {
  676. for _, v := range s {
  677. if v == str {
  678. return true
  679. }
  680. }
  681. return false
  682. }
  683. func parseSecretToHelmRelease(secret v1.Secret, chartList []string) (*rspb.Release, bool, error) {
  684. if secret.Type != "helm.sh/release.v1" {
  685. return nil, true, nil
  686. }
  687. releaseData, ok := secret.Data["release"]
  688. if !ok {
  689. return nil, true, fmt.Errorf("release field not found")
  690. }
  691. helm_object, err := decodeRelease(string(releaseData))
  692. if err != nil {
  693. return nil, true, err
  694. }
  695. if len(chartList) > 0 && !contains(chartList, helm_object.Name) {
  696. return nil, true, nil
  697. }
  698. return helm_object, false, nil
  699. }
  700. func (a *Agent) StreamHelmReleases(conn *websocket.Conn, namespace string, chartList []string, selectors string) error {
  701. run := func() error {
  702. tweakListOptionsFunc := func(options *metav1.ListOptions) {
  703. options.LabelSelector = selectors
  704. }
  705. factory := informers.NewSharedInformerFactoryWithOptions(
  706. a.Clientset,
  707. 0,
  708. informers.WithTweakListOptions(tweakListOptionsFunc),
  709. informers.WithNamespace(namespace),
  710. )
  711. informer := factory.Core().V1().Secrets().Informer()
  712. stopper := make(chan struct{})
  713. errorchan := make(chan error)
  714. defer close(stopper)
  715. informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
  716. if strings.HasSuffix(err.Error(), ": Unauthorized") {
  717. errorchan <- &AuthError{}
  718. }
  719. })
  720. informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
  721. UpdateFunc: func(oldObj, newObj interface{}) {
  722. secretObj, ok := newObj.(*v1.Secret)
  723. if !ok {
  724. errorchan <- fmt.Errorf("could not cast to secret")
  725. return
  726. }
  727. helm_object, isNotHelmRelease, err := parseSecretToHelmRelease(*secretObj, chartList)
  728. if isNotHelmRelease && err == nil {
  729. return
  730. }
  731. if err != nil {
  732. errorchan <- err
  733. return
  734. }
  735. msg := Message{
  736. EventType: "UPDATE",
  737. Object: helm_object,
  738. }
  739. if writeErr := conn.WriteJSON(msg); writeErr != nil {
  740. errorchan <- writeErr
  741. return
  742. }
  743. },
  744. AddFunc: func(obj interface{}) {
  745. secretObj, ok := obj.(*v1.Secret)
  746. if !ok {
  747. errorchan <- fmt.Errorf("could not cast to secret")
  748. return
  749. }
  750. helm_object, isNotHelmRelease, err := parseSecretToHelmRelease(*secretObj, chartList)
  751. if isNotHelmRelease && err == nil {
  752. return
  753. }
  754. if err != nil {
  755. errorchan <- err
  756. return
  757. }
  758. msg := Message{
  759. EventType: "ADD",
  760. Object: helm_object,
  761. }
  762. if writeErr := conn.WriteJSON(msg); writeErr != nil {
  763. errorchan <- writeErr
  764. return
  765. }
  766. },
  767. DeleteFunc: func(obj interface{}) {
  768. secretObj, ok := obj.(*v1.Secret)
  769. if !ok {
  770. errorchan <- fmt.Errorf("could not cast to secret")
  771. return
  772. }
  773. helm_object, isNotHelmRelease, err := parseSecretToHelmRelease(*secretObj, chartList)
  774. if isNotHelmRelease && err == nil {
  775. return
  776. }
  777. if err != nil {
  778. errorchan <- err
  779. return
  780. }
  781. msg := Message{
  782. EventType: "DELETE",
  783. Object: helm_object,
  784. }
  785. if writeErr := conn.WriteJSON(msg); writeErr != nil {
  786. errorchan <- writeErr
  787. return
  788. }
  789. },
  790. })
  791. go func() {
  792. // listens for websocket closing handshake
  793. for {
  794. if _, _, err := conn.ReadMessage(); err != nil {
  795. conn.Close()
  796. errorchan <- nil
  797. return
  798. }
  799. }
  800. }()
  801. go informer.Run(stopper)
  802. for {
  803. select {
  804. case err := <-errorchan:
  805. return err
  806. }
  807. }
  808. }
  809. return a.RunWebsocketTask(run)
  810. }
  811. type SharedProvisionOpts struct {
  812. ProjectID uint
  813. Repo repository.Repository
  814. Infra *models.Infra
  815. Operation provisioner.ProvisionerOperation
  816. PGConf *env.DBConf
  817. RedisConf *env.RedisConf
  818. ProvImageTag string
  819. ProvImagePullSecret string
  820. }
  821. // ProvisionECR spawns a new provisioning pod that creates an ECR instance
  822. func (a *Agent) ProvisionECR(
  823. opts *SharedProvisionOpts,
  824. awsConf *integrations.AWSIntegration,
  825. ecrName string,
  826. ) (*batchv1.Job, error) {
  827. id := opts.Infra.GetUniqueName()
  828. prov := &provisioner.Conf{
  829. ID: id,
  830. Name: fmt.Sprintf("prov-%s-%s", id, string(opts.Operation)),
  831. Kind: provisioner.ECR,
  832. Operation: opts.Operation,
  833. Redis: opts.RedisConf,
  834. Postgres: opts.PGConf,
  835. ProvisionerImageTag: opts.ProvImageTag,
  836. ImagePullSecret: opts.ProvImagePullSecret,
  837. LastApplied: opts.Infra.LastApplied,
  838. AWS: &aws.Conf{
  839. AWSRegion: awsConf.AWSRegion,
  840. AWSAccessKeyID: string(awsConf.AWSAccessKeyID),
  841. AWSSecretAccessKey: string(awsConf.AWSSecretAccessKey),
  842. },
  843. ECR: &ecr.Conf{
  844. ECRName: ecrName,
  845. },
  846. }
  847. return a.provision(prov, opts.Infra, opts.Repo)
  848. }
  849. // ProvisionEKS spawns a new provisioning pod that creates an EKS instance
  850. func (a *Agent) ProvisionEKS(
  851. opts *SharedProvisionOpts,
  852. awsConf *integrations.AWSIntegration,
  853. eksName, machineType string,
  854. ) (*batchv1.Job, error) {
  855. id := opts.Infra.GetUniqueName()
  856. prov := &provisioner.Conf{
  857. ID: id,
  858. Name: fmt.Sprintf("prov-%s-%s", id, string(opts.Operation)),
  859. Kind: provisioner.EKS,
  860. Operation: opts.Operation,
  861. Redis: opts.RedisConf,
  862. Postgres: opts.PGConf,
  863. ProvisionerImageTag: opts.ProvImageTag,
  864. ImagePullSecret: opts.ProvImagePullSecret,
  865. LastApplied: opts.Infra.LastApplied,
  866. AWS: &aws.Conf{
  867. AWSRegion: awsConf.AWSRegion,
  868. AWSAccessKeyID: string(awsConf.AWSAccessKeyID),
  869. AWSSecretAccessKey: string(awsConf.AWSSecretAccessKey),
  870. },
  871. EKS: &eks.Conf{
  872. ClusterName: eksName,
  873. MachineType: machineType,
  874. },
  875. }
  876. return a.provision(prov, opts.Infra, opts.Repo)
  877. }
  878. // ProvisionGCR spawns a new provisioning pod that creates a GCR instance
  879. func (a *Agent) ProvisionGCR(
  880. opts *SharedProvisionOpts,
  881. gcpConf *integrations.GCPIntegration,
  882. ) (*batchv1.Job, error) {
  883. id := opts.Infra.GetUniqueName()
  884. prov := &provisioner.Conf{
  885. ID: id,
  886. Name: fmt.Sprintf("prov-%s-%s", id, string(opts.Operation)),
  887. Kind: provisioner.GCR,
  888. Operation: opts.Operation,
  889. Redis: opts.RedisConf,
  890. Postgres: opts.PGConf,
  891. ProvisionerImageTag: opts.ProvImageTag,
  892. ImagePullSecret: opts.ProvImagePullSecret,
  893. LastApplied: opts.Infra.LastApplied,
  894. GCP: &gcp.Conf{
  895. GCPRegion: gcpConf.GCPRegion,
  896. GCPProjectID: gcpConf.GCPProjectID,
  897. GCPKeyData: string(gcpConf.GCPKeyData),
  898. },
  899. }
  900. return a.provision(prov, opts.Infra, opts.Repo)
  901. }
  902. // ProvisionGKE spawns a new provisioning pod that creates a GKE instance
  903. func (a *Agent) ProvisionGKE(
  904. opts *SharedProvisionOpts,
  905. gcpConf *integrations.GCPIntegration,
  906. gkeName string,
  907. ) (*batchv1.Job, error) {
  908. id := opts.Infra.GetUniqueName()
  909. prov := &provisioner.Conf{
  910. ID: id,
  911. Name: fmt.Sprintf("prov-%s-%s", id, string(opts.Operation)),
  912. Kind: provisioner.GKE,
  913. Operation: opts.Operation,
  914. Redis: opts.RedisConf,
  915. Postgres: opts.PGConf,
  916. ProvisionerImageTag: opts.ProvImageTag,
  917. ImagePullSecret: opts.ProvImagePullSecret,
  918. LastApplied: opts.Infra.LastApplied,
  919. GCP: &gcp.Conf{
  920. GCPRegion: gcpConf.GCPRegion,
  921. GCPProjectID: gcpConf.GCPProjectID,
  922. GCPKeyData: string(gcpConf.GCPKeyData),
  923. },
  924. GKE: &gke.Conf{
  925. ClusterName: gkeName,
  926. },
  927. }
  928. return a.provision(prov, opts.Infra, opts.Repo)
  929. }
  930. // ProvisionDOCR spawns a new provisioning pod that creates a DOCR instance
  931. func (a *Agent) ProvisionDOCR(
  932. opts *SharedProvisionOpts,
  933. doConf *integrations.OAuthIntegration,
  934. doAuth *oauth2.Config,
  935. docrName, docrSubscriptionTier string,
  936. ) (*batchv1.Job, error) {
  937. // get the token
  938. oauthInt, err := opts.Repo.OAuthIntegration().ReadOAuthIntegration(
  939. opts.ProjectID,
  940. opts.Infra.DOIntegrationID,
  941. )
  942. if err != nil {
  943. return nil, err
  944. }
  945. tok, _, err := oauth.GetAccessToken(oauthInt.SharedOAuthModel, doAuth, oauth.MakeUpdateOAuthIntegrationTokenFunction(oauthInt, opts.Repo))
  946. if err != nil {
  947. return nil, err
  948. }
  949. id := opts.Infra.GetUniqueName()
  950. prov := &provisioner.Conf{
  951. ID: id,
  952. Name: fmt.Sprintf("prov-%s-%s", id, string(opts.Operation)),
  953. Kind: provisioner.DOCR,
  954. Operation: opts.Operation,
  955. Redis: opts.RedisConf,
  956. Postgres: opts.PGConf,
  957. ProvisionerImageTag: opts.ProvImageTag,
  958. ImagePullSecret: opts.ProvImagePullSecret,
  959. LastApplied: opts.Infra.LastApplied,
  960. DO: &do.Conf{
  961. DOToken: tok,
  962. },
  963. DOCR: &docr.Conf{
  964. DOCRName: docrName,
  965. DOCRSubscriptionTier: docrSubscriptionTier,
  966. },
  967. }
  968. return a.provision(prov, opts.Infra, opts.Repo)
  969. }
  970. // ProvisionDOKS spawns a new provisioning pod that creates a DOKS instance
  971. func (a *Agent) ProvisionDOKS(
  972. opts *SharedProvisionOpts,
  973. doConf *integrations.OAuthIntegration,
  974. doAuth *oauth2.Config,
  975. doRegion, doksClusterName string,
  976. ) (*batchv1.Job, error) {
  977. // get the token
  978. oauthInt, err := opts.Repo.OAuthIntegration().ReadOAuthIntegration(
  979. opts.ProjectID,
  980. opts.Infra.DOIntegrationID,
  981. )
  982. if err != nil {
  983. return nil, err
  984. }
  985. tok, _, err := oauth.GetAccessToken(oauthInt.SharedOAuthModel, doAuth, oauth.MakeUpdateOAuthIntegrationTokenFunction(oauthInt, opts.Repo))
  986. if err != nil {
  987. return nil, err
  988. }
  989. id := opts.Infra.GetUniqueName()
  990. prov := &provisioner.Conf{
  991. ID: id,
  992. Name: fmt.Sprintf("prov-%s-%s", id, string(opts.Operation)),
  993. Kind: provisioner.DOKS,
  994. Operation: opts.Operation,
  995. Redis: opts.RedisConf,
  996. Postgres: opts.PGConf,
  997. LastApplied: opts.Infra.LastApplied,
  998. ProvisionerImageTag: opts.ProvImageTag,
  999. ImagePullSecret: opts.ProvImagePullSecret,
  1000. DO: &do.Conf{
  1001. DOToken: tok,
  1002. },
  1003. DOKS: &doks.Conf{
  1004. DORegion: doRegion,
  1005. DOKSClusterName: doksClusterName,
  1006. },
  1007. }
  1008. return a.provision(prov, opts.Infra, opts.Repo)
  1009. }
  1010. // ProvisionTest spawns a new provisioning pod that tests provisioning
  1011. func (a *Agent) ProvisionTest(
  1012. opts *SharedProvisionOpts,
  1013. ) (*batchv1.Job, error) {
  1014. id := opts.Infra.GetUniqueName()
  1015. prov := &provisioner.Conf{
  1016. ID: id,
  1017. Name: fmt.Sprintf("prov-%s-%s", id, string(opts.Operation)),
  1018. Operation: opts.Operation,
  1019. Kind: provisioner.Test,
  1020. Redis: opts.RedisConf,
  1021. Postgres: opts.PGConf,
  1022. ProvisionerImageTag: opts.ProvImageTag,
  1023. ImagePullSecret: opts.ProvImagePullSecret,
  1024. }
  1025. return a.provision(prov, opts.Infra, opts.Repo)
  1026. }
  1027. func (a *Agent) provision(
  1028. prov *provisioner.Conf,
  1029. infra *models.Infra,
  1030. repo repository.Repository,
  1031. ) (*batchv1.Job, error) {
  1032. prov.Namespace = "default"
  1033. job, err := prov.GetProvisionerJobTemplate()
  1034. if err != nil {
  1035. return nil, err
  1036. }
  1037. job, err = a.Clientset.BatchV1().Jobs(prov.Namespace).Create(
  1038. context.TODO(),
  1039. job,
  1040. metav1.CreateOptions{},
  1041. )
  1042. if err != nil {
  1043. return nil, err
  1044. }
  1045. infra.LastApplied = prov.LastApplied
  1046. infra, err = repo.Infra().UpdateInfra(infra)
  1047. if err != nil {
  1048. return nil, err
  1049. }
  1050. return job, nil
  1051. }
  1052. // CreateImagePullSecrets will create the required image pull secrets and
  1053. // return a map from the registry name to the name of the secret.
  1054. func (a *Agent) CreateImagePullSecrets(
  1055. repo repository.Repository,
  1056. namespace string,
  1057. linkedRegs map[string]*models.Registry,
  1058. doAuth *oauth2.Config,
  1059. ) (map[string]string, error) {
  1060. res := make(map[string]string)
  1061. for key, val := range linkedRegs {
  1062. _reg := registry.Registry(*val)
  1063. data, err := _reg.GetDockerConfigJSON(repo, doAuth)
  1064. if err != nil {
  1065. return nil, err
  1066. }
  1067. secretName := fmt.Sprintf("porter-%s-%d", val.ToRegistryType().Service, val.ID)
  1068. secret, err := a.Clientset.CoreV1().Secrets(namespace).Get(
  1069. context.TODO(),
  1070. secretName,
  1071. metav1.GetOptions{},
  1072. )
  1073. // if not found, create the secret
  1074. if err != nil && errors.IsNotFound(err) {
  1075. _, err = a.Clientset.CoreV1().Secrets(namespace).Create(
  1076. context.TODO(),
  1077. &v1.Secret{
  1078. ObjectMeta: metav1.ObjectMeta{
  1079. Name: secretName,
  1080. },
  1081. Data: map[string][]byte{
  1082. string(v1.DockerConfigJsonKey): data,
  1083. },
  1084. Type: v1.SecretTypeDockerConfigJson,
  1085. },
  1086. metav1.CreateOptions{},
  1087. )
  1088. if err != nil {
  1089. return nil, err
  1090. }
  1091. // add secret name to the map
  1092. res[key] = secretName
  1093. continue
  1094. } else if err != nil {
  1095. return nil, err
  1096. }
  1097. // otherwise, check that the secret contains the correct data: if
  1098. // if doesn't, update it
  1099. if !bytes.Equal(secret.Data[v1.DockerConfigJsonKey], data) {
  1100. _, err := a.Clientset.CoreV1().Secrets(namespace).Update(
  1101. context.TODO(),
  1102. &v1.Secret{
  1103. ObjectMeta: metav1.ObjectMeta{
  1104. Name: secretName,
  1105. },
  1106. Data: map[string][]byte{
  1107. string(v1.DockerConfigJsonKey): data,
  1108. },
  1109. Type: v1.SecretTypeDockerConfigJson,
  1110. },
  1111. metav1.UpdateOptions{},
  1112. )
  1113. if err != nil {
  1114. return nil, err
  1115. }
  1116. }
  1117. // add secret name to the map
  1118. res[key] = secretName
  1119. }
  1120. return res, nil
  1121. }