agent.go 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305
  1. package kubernetes
  2. import (
  3. "bufio"
  4. "bytes"
  5. "compress/gzip"
  6. "context"
  7. "encoding/base64"
  8. "encoding/json"
  9. "fmt"
  10. "io"
  11. "io/ioutil"
  12. "strings"
  13. "time"
  14. "github.com/porter-dev/porter/internal/kubernetes/provisioner"
  15. "github.com/porter-dev/porter/internal/kubernetes/provisioner/aws"
  16. "github.com/porter-dev/porter/internal/kubernetes/provisioner/aws/ecr"
  17. "github.com/porter-dev/porter/internal/kubernetes/provisioner/aws/eks"
  18. "github.com/porter-dev/porter/internal/kubernetes/provisioner/do"
  19. "github.com/porter-dev/porter/internal/kubernetes/provisioner/do/docr"
  20. "github.com/porter-dev/porter/internal/kubernetes/provisioner/do/doks"
  21. "github.com/porter-dev/porter/internal/kubernetes/provisioner/gcp"
  22. "github.com/porter-dev/porter/internal/kubernetes/provisioner/gcp/gke"
  23. "github.com/porter-dev/porter/internal/models"
  24. "github.com/porter-dev/porter/internal/models/integrations"
  25. "github.com/porter-dev/porter/internal/oauth"
  26. "github.com/porter-dev/porter/internal/registry"
  27. "github.com/porter-dev/porter/internal/repository"
  28. "golang.org/x/oauth2"
  29. errors2 "errors"
  30. "github.com/gorilla/websocket"
  31. "github.com/porter-dev/porter/internal/helm/grapher"
  32. appsv1 "k8s.io/api/apps/v1"
  33. batchv1 "k8s.io/api/batch/v1"
  34. batchv1beta1 "k8s.io/api/batch/v1beta1"
  35. v1 "k8s.io/api/core/v1"
  36. v1beta1 "k8s.io/api/extensions/v1beta1"
  37. "k8s.io/apimachinery/pkg/api/errors"
  38. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  39. "k8s.io/apimachinery/pkg/runtime"
  40. "k8s.io/apimachinery/pkg/runtime/schema"
  41. "k8s.io/apimachinery/pkg/types"
  42. "k8s.io/cli-runtime/pkg/genericclioptions"
  43. "k8s.io/client-go/informers"
  44. "k8s.io/client-go/kubernetes"
  45. "k8s.io/client-go/rest"
  46. "k8s.io/client-go/tools/cache"
  47. "k8s.io/client-go/tools/remotecommand"
  48. "github.com/porter-dev/porter/internal/config"
  49. rspb "helm.sh/helm/v3/pkg/release"
  50. )
  51. // Agent is a Kubernetes agent for performing operations that interact with the
  52. // api server
  53. type Agent struct {
  54. RESTClientGetter genericclioptions.RESTClientGetter
  55. Clientset kubernetes.Interface
  56. }
  57. type Message struct {
  58. EventType string `json:"event_type"`
  59. Object interface{}
  60. Kind string
  61. }
  62. type ListOptions struct {
  63. FieldSelector string
  64. }
  65. type AuthError struct{}
  66. func (e *AuthError) Error() string {
  67. return "Unauthorized error"
  68. }
  69. // UpdateClientset updates the Agent's Clientset (this refreshes auth tokens)
  70. func (a *Agent) UpdateClientset() error {
  71. restConf, err := a.RESTClientGetter.ToRESTConfig()
  72. if err != nil {
  73. return err
  74. }
  75. clientset, err := kubernetes.NewForConfig(restConf)
  76. if err != nil {
  77. return err
  78. }
  79. a.Clientset = clientset
  80. return nil
  81. }
  82. // CreateConfigMap creates the configmap given the key-value pairs and namespace
  83. func (a *Agent) CreateConfigMap(name string, namespace string, configMap map[string]string) (*v1.ConfigMap, error) {
  84. return a.Clientset.CoreV1().ConfigMaps(namespace).Create(
  85. context.TODO(),
  86. &v1.ConfigMap{
  87. ObjectMeta: metav1.ObjectMeta{
  88. Name: name,
  89. Namespace: namespace,
  90. Labels: map[string]string{
  91. "porter": "true",
  92. },
  93. },
  94. Data: configMap,
  95. },
  96. metav1.CreateOptions{},
  97. )
  98. }
  99. // CreateLinkedSecret creates a secret given the key-value pairs and namespace. Values are
  100. // base64 encoded
  101. func (a *Agent) CreateLinkedSecret(name, namespace, cmName string, data map[string][]byte) (*v1.Secret, error) {
  102. return a.Clientset.CoreV1().Secrets(namespace).Create(
  103. context.TODO(),
  104. &v1.Secret{
  105. ObjectMeta: metav1.ObjectMeta{
  106. Name: name,
  107. Namespace: namespace,
  108. Labels: map[string]string{
  109. "porter": "true",
  110. "configmap": cmName,
  111. },
  112. },
  113. Data: data,
  114. },
  115. metav1.CreateOptions{},
  116. )
  117. }
  118. type mergeConfigMapData struct {
  119. Data map[string]*string `json:"data"`
  120. }
  121. // UpdateConfigMap updates the configmap given its name and namespace
  122. func (a *Agent) UpdateConfigMap(name string, namespace string, configMap map[string]string) error {
  123. cmData := make(map[string]*string)
  124. for key, val := range configMap {
  125. valCopy := val
  126. cmData[key] = &valCopy
  127. if len(val) == 0 {
  128. cmData[key] = nil
  129. }
  130. }
  131. mergeCM := &mergeConfigMapData{
  132. Data: cmData,
  133. }
  134. patchBytes, err := json.Marshal(mergeCM)
  135. if err != nil {
  136. return err
  137. }
  138. _, err = a.Clientset.CoreV1().ConfigMaps(namespace).Patch(
  139. context.Background(),
  140. name,
  141. types.MergePatchType,
  142. patchBytes,
  143. metav1.PatchOptions{},
  144. )
  145. return err
  146. }
  147. type mergeLinkedSecretData struct {
  148. Data map[string]*[]byte `json:"data"`
  149. }
  150. // UpdateLinkedSecret updates the secret given its name and namespace
  151. func (a *Agent) UpdateLinkedSecret(name, namespace, cmName string, data map[string][]byte) error {
  152. secretData := make(map[string]*[]byte)
  153. for key, val := range data {
  154. valCopy := val
  155. secretData[key] = &valCopy
  156. if len(val) == 0 {
  157. secretData[key] = nil
  158. }
  159. }
  160. mergeSecret := &mergeLinkedSecretData{
  161. Data: secretData,
  162. }
  163. patchBytes, err := json.Marshal(mergeSecret)
  164. if err != nil {
  165. return err
  166. }
  167. _, err = a.Clientset.CoreV1().Secrets(namespace).Patch(
  168. context.TODO(),
  169. name,
  170. types.MergePatchType,
  171. patchBytes,
  172. metav1.PatchOptions{},
  173. )
  174. return err
  175. }
  176. // DeleteConfigMap deletes the configmap given its name and namespace
  177. func (a *Agent) DeleteConfigMap(name string, namespace string) error {
  178. return a.Clientset.CoreV1().ConfigMaps(namespace).Delete(
  179. context.TODO(),
  180. name,
  181. metav1.DeleteOptions{},
  182. )
  183. }
  184. // DeleteLinkedSecret deletes the secret given its name and namespace
  185. func (a *Agent) DeleteLinkedSecret(name, namespace string) error {
  186. return a.Clientset.CoreV1().Secrets(namespace).Delete(
  187. context.TODO(),
  188. name,
  189. metav1.DeleteOptions{},
  190. )
  191. }
  192. // GetConfigMap retrieves the configmap given its name and namespace
  193. func (a *Agent) GetConfigMap(name string, namespace string) (*v1.ConfigMap, error) {
  194. return a.Clientset.CoreV1().ConfigMaps(namespace).Get(
  195. context.TODO(),
  196. name,
  197. metav1.GetOptions{},
  198. )
  199. }
  200. // GetSecret retrieves the secret given its name and namespace
  201. func (a *Agent) GetSecret(name string, namespace string) (*v1.Secret, error) {
  202. return a.Clientset.CoreV1().Secrets(namespace).Get(
  203. context.TODO(),
  204. name,
  205. metav1.GetOptions{},
  206. )
  207. }
  208. // ListConfigMaps simply lists namespaces
  209. func (a *Agent) ListConfigMaps(namespace string) (*v1.ConfigMapList, error) {
  210. return a.Clientset.CoreV1().ConfigMaps(namespace).List(
  211. context.TODO(),
  212. metav1.ListOptions{
  213. LabelSelector: "porter=true",
  214. },
  215. )
  216. }
  217. // ListEvents lists the events of a given object.
  218. func (a *Agent) ListEvents(name string, namespace string) (*v1.EventList, error) {
  219. return a.Clientset.CoreV1().Events(namespace).List(
  220. context.TODO(),
  221. metav1.ListOptions{
  222. FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=%s", name, namespace),
  223. },
  224. )
  225. }
  226. // ListNamespaces simply lists namespaces
  227. func (a *Agent) ListNamespaces() (*v1.NamespaceList, error) {
  228. return a.Clientset.CoreV1().Namespaces().List(
  229. context.TODO(),
  230. metav1.ListOptions{},
  231. )
  232. }
  233. // CreateNamespace creates a namespace with the given name.
  234. func (a *Agent) CreateNamespace(name string) (*v1.Namespace, error) {
  235. namespace := v1.Namespace{
  236. ObjectMeta: metav1.ObjectMeta{
  237. Name: name,
  238. },
  239. }
  240. return a.Clientset.CoreV1().Namespaces().Create(
  241. context.TODO(),
  242. &namespace,
  243. metav1.CreateOptions{},
  244. )
  245. }
  246. // DeleteNamespace deletes the namespace given the name.
  247. func (a *Agent) DeleteNamespace(name string) error {
  248. return a.Clientset.CoreV1().Namespaces().Delete(
  249. context.TODO(),
  250. name,
  251. metav1.DeleteOptions{},
  252. )
  253. }
  254. // ListJobsByLabel lists jobs in a namespace matching a label
  255. type Label struct {
  256. Key string
  257. Val string
  258. }
  259. func (a *Agent) ListJobsByLabel(namespace string, labels ...Label) ([]batchv1.Job, error) {
  260. selectors := make([]string, 0)
  261. for _, label := range labels {
  262. selectors = append(selectors, fmt.Sprintf("%s=%s", label.Key, label.Val))
  263. }
  264. resp, err := a.Clientset.BatchV1().Jobs(namespace).List(
  265. context.TODO(),
  266. metav1.ListOptions{
  267. LabelSelector: strings.Join(selectors, ","),
  268. },
  269. )
  270. if err != nil {
  271. return nil, err
  272. }
  273. return resp.Items, nil
  274. }
  275. // DeleteJob deletes the job in the given name and namespace.
  276. func (a *Agent) DeleteJob(name, namespace string) error {
  277. return a.Clientset.BatchV1().Jobs(namespace).Delete(
  278. context.TODO(),
  279. name,
  280. metav1.DeleteOptions{},
  281. )
  282. }
  283. // GetJobPods lists all pods belonging to a job in a namespace
  284. func (a *Agent) GetJobPods(namespace, jobName string) ([]v1.Pod, error) {
  285. resp, err := a.Clientset.CoreV1().Pods(namespace).List(
  286. context.TODO(),
  287. metav1.ListOptions{
  288. LabelSelector: fmt.Sprintf("%s=%s", "job-name", jobName),
  289. },
  290. )
  291. if err != nil {
  292. return nil, err
  293. }
  294. return resp.Items, nil
  295. }
  296. // GetIngress gets ingress given the name and namespace
  297. func (a *Agent) GetIngress(namespace string, name string) (*v1beta1.Ingress, error) {
  298. return a.Clientset.ExtensionsV1beta1().Ingresses(namespace).Get(
  299. context.TODO(),
  300. name,
  301. metav1.GetOptions{},
  302. )
  303. }
  304. // GetDeployment gets the deployment given the name and namespace
  305. func (a *Agent) GetDeployment(c grapher.Object) (*appsv1.Deployment, error) {
  306. return a.Clientset.AppsV1().Deployments(c.Namespace).Get(
  307. context.TODO(),
  308. c.Name,
  309. metav1.GetOptions{},
  310. )
  311. }
  312. // GetStatefulSet gets the statefulset given the name and namespace
  313. func (a *Agent) GetStatefulSet(c grapher.Object) (*appsv1.StatefulSet, error) {
  314. return a.Clientset.AppsV1().StatefulSets(c.Namespace).Get(
  315. context.TODO(),
  316. c.Name,
  317. metav1.GetOptions{},
  318. )
  319. }
  320. // GetReplicaSet gets the replicaset given the name and namespace
  321. func (a *Agent) GetReplicaSet(c grapher.Object) (*appsv1.ReplicaSet, error) {
  322. return a.Clientset.AppsV1().ReplicaSets(c.Namespace).Get(
  323. context.TODO(),
  324. c.Name,
  325. metav1.GetOptions{},
  326. )
  327. }
  328. // GetDaemonSet gets the daemonset by name and namespace
  329. func (a *Agent) GetDaemonSet(c grapher.Object) (*appsv1.DaemonSet, error) {
  330. return a.Clientset.AppsV1().DaemonSets(c.Namespace).Get(
  331. context.TODO(),
  332. c.Name,
  333. metav1.GetOptions{},
  334. )
  335. }
  336. // GetJob gets the job by name and namespace
  337. func (a *Agent) GetJob(c grapher.Object) (*batchv1.Job, error) {
  338. return a.Clientset.BatchV1().Jobs(c.Namespace).Get(
  339. context.TODO(),
  340. c.Name,
  341. metav1.GetOptions{},
  342. )
  343. }
  344. // GetCronJob gets the CronJob by name and namespace
  345. func (a *Agent) GetCronJob(c grapher.Object) (*batchv1beta1.CronJob, error) {
  346. return a.Clientset.BatchV1beta1().CronJobs(c.Namespace).Get(
  347. context.TODO(),
  348. c.Name,
  349. metav1.GetOptions{},
  350. )
  351. }
  352. // GetPodsByLabel retrieves pods with matching labels
  353. func (a *Agent) GetPodsByLabel(selector string, namespace string) (*v1.PodList, error) {
  354. // Search in all namespaces for matching pods
  355. return a.Clientset.CoreV1().Pods(namespace).List(
  356. context.TODO(),
  357. metav1.ListOptions{
  358. LabelSelector: selector,
  359. },
  360. )
  361. }
  362. // DeletePod deletes a pod by name and namespace
  363. func (a *Agent) DeletePod(namespace string, name string) error {
  364. return a.Clientset.CoreV1().Pods(namespace).Delete(
  365. context.TODO(),
  366. name,
  367. metav1.DeleteOptions{},
  368. )
  369. }
  370. // GetPodLogs streams real-time logs from a given pod.
  371. func (a *Agent) GetPodLogs(namespace string, name string, conn *websocket.Conn) error {
  372. // get the pod to read in the list of contains
  373. pod, err := a.Clientset.CoreV1().Pods(namespace).Get(
  374. context.Background(),
  375. name,
  376. metav1.GetOptions{},
  377. )
  378. if err != nil {
  379. return fmt.Errorf("Cannot get pod %s: %s", name, err.Error())
  380. }
  381. container := pod.Spec.Containers[0].Name
  382. tails := int64(400)
  383. // follow logs
  384. podLogOpts := v1.PodLogOptions{
  385. Follow: true,
  386. TailLines: &tails,
  387. Container: container,
  388. }
  389. req := a.Clientset.CoreV1().Pods(namespace).GetLogs(name, &podLogOpts)
  390. podLogs, err := req.Stream(context.TODO())
  391. if err != nil {
  392. return fmt.Errorf("Cannot open log stream for pod %s: %s", name, err.Error())
  393. }
  394. defer podLogs.Close()
  395. r := bufio.NewReader(podLogs)
  396. errorchan := make(chan error)
  397. go func() {
  398. // listens for websocket closing handshake
  399. for {
  400. if _, _, err := conn.ReadMessage(); err != nil {
  401. defer conn.Close()
  402. errorchan <- nil
  403. return
  404. }
  405. }
  406. }()
  407. go func() {
  408. for {
  409. select {
  410. case <-errorchan:
  411. defer close(errorchan)
  412. return
  413. default:
  414. }
  415. bytes, err := r.ReadBytes('\n')
  416. if writeErr := conn.WriteMessage(websocket.TextMessage, bytes); writeErr != nil {
  417. errorchan <- writeErr
  418. return
  419. }
  420. if err != nil {
  421. if err != io.EOF {
  422. errorchan <- err
  423. return
  424. }
  425. errorchan <- nil
  426. return
  427. }
  428. }
  429. }()
  430. for {
  431. select {
  432. case err = <-errorchan:
  433. return err
  434. }
  435. }
  436. }
  437. // StopJobWithJobSidecar sends a termination signal to a job running with a sidecar
  438. func (a *Agent) StopJobWithJobSidecar(namespace, name string) error {
  439. jobPods, err := a.GetJobPods(namespace, name)
  440. if err != nil {
  441. return err
  442. }
  443. podName := jobPods[0].ObjectMeta.Name
  444. restConf, err := a.RESTClientGetter.ToRESTConfig()
  445. restConf.GroupVersion = &schema.GroupVersion{
  446. Group: "api",
  447. Version: "v1",
  448. }
  449. restConf.NegotiatedSerializer = runtime.NewSimpleNegotiatedSerializer(runtime.SerializerInfo{})
  450. restClient, err := rest.RESTClientFor(restConf)
  451. if err != nil {
  452. return err
  453. }
  454. req := restClient.Post().
  455. Resource("pods").
  456. Name(podName).
  457. Namespace(namespace).
  458. SubResource("exec")
  459. req.Param("command", "./signal.sh")
  460. req.Param("container", "sidecar")
  461. req.Param("stdin", "true")
  462. req.Param("stdout", "false")
  463. req.Param("tty", "false")
  464. exec, err := remotecommand.NewSPDYExecutor(restConf, "POST", req.URL())
  465. if err != nil {
  466. return err
  467. }
  468. return exec.Stream(remotecommand.StreamOptions{
  469. Tty: false,
  470. Stdin: strings.NewReader("./signal.sh"),
  471. })
  472. }
  473. // RunWebsocketTask will run a websocket task. If the websocket returns an anauthorized error, it will restart
  474. // the task some number of times until failing
  475. func (a *Agent) RunWebsocketTask(task func() error) error {
  476. lastTime := int64(0)
  477. for {
  478. if err := a.UpdateClientset(); err != nil {
  479. return err
  480. }
  481. err := task()
  482. if err == nil {
  483. return nil
  484. }
  485. if !errors2.Is(err, &AuthError{}) {
  486. return err
  487. }
  488. if time.Now().Unix()-lastTime < 60 { // don't regenerate connection if too many unauthorized errors
  489. return err
  490. }
  491. lastTime = time.Now().Unix()
  492. }
  493. }
  494. // StreamControllerStatus streams controller status. Supports Deployment, StatefulSet, ReplicaSet, and DaemonSet
  495. // TODO: Support Jobs
  496. func (a *Agent) StreamControllerStatus(conn *websocket.Conn, kind string, selectors string) error {
  497. run := func() error {
  498. // selectors is an array of max length 1. StreamControllerStatus accepts calls without the selectors argument.
  499. // selectors argument is a single string with comma separated key=value pairs. (e.g. "app=porter,porter=true")
  500. tweakListOptionsFunc := func(options *metav1.ListOptions) {
  501. options.LabelSelector = selectors
  502. }
  503. factory := informers.NewSharedInformerFactoryWithOptions(
  504. a.Clientset,
  505. 0,
  506. informers.WithTweakListOptions(tweakListOptionsFunc),
  507. )
  508. var informer cache.SharedInformer
  509. // Spins up an informer depending on kind. Convert to lowercase for robustness
  510. switch strings.ToLower(kind) {
  511. case "deployment":
  512. informer = factory.Apps().V1().Deployments().Informer()
  513. case "statefulset":
  514. informer = factory.Apps().V1().StatefulSets().Informer()
  515. case "replicaset":
  516. informer = factory.Apps().V1().ReplicaSets().Informer()
  517. case "daemonset":
  518. informer = factory.Apps().V1().DaemonSets().Informer()
  519. case "job":
  520. informer = factory.Batch().V1().Jobs().Informer()
  521. case "cronjob":
  522. informer = factory.Batch().V1beta1().CronJobs().Informer()
  523. case "namespace":
  524. informer = factory.Core().V1().Namespaces().Informer()
  525. case "pod":
  526. informer = factory.Core().V1().Pods().Informer()
  527. }
  528. stopper := make(chan struct{})
  529. errorchan := make(chan error)
  530. defer close(stopper)
  531. informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
  532. if strings.HasSuffix(err.Error(), ": Unauthorized") {
  533. errorchan <- &AuthError{}
  534. }
  535. })
  536. informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
  537. UpdateFunc: func(oldObj, newObj interface{}) {
  538. msg := Message{
  539. EventType: "UPDATE",
  540. Object: newObj,
  541. Kind: strings.ToLower(kind),
  542. }
  543. if writeErr := conn.WriteJSON(msg); writeErr != nil {
  544. errorchan <- writeErr
  545. return
  546. }
  547. },
  548. AddFunc: func(obj interface{}) {
  549. msg := Message{
  550. EventType: "ADD",
  551. Object: obj,
  552. Kind: strings.ToLower(kind),
  553. }
  554. if writeErr := conn.WriteJSON(msg); writeErr != nil {
  555. errorchan <- writeErr
  556. return
  557. }
  558. },
  559. DeleteFunc: func(obj interface{}) {
  560. msg := Message{
  561. EventType: "DELETE",
  562. Object: obj,
  563. Kind: strings.ToLower(kind),
  564. }
  565. if writeErr := conn.WriteJSON(msg); writeErr != nil {
  566. errorchan <- writeErr
  567. return
  568. }
  569. },
  570. })
  571. go func() {
  572. // listens for websocket closing handshake
  573. for {
  574. if _, _, err := conn.ReadMessage(); err != nil {
  575. conn.Close()
  576. errorchan <- nil
  577. return
  578. }
  579. }
  580. }()
  581. go informer.Run(stopper)
  582. for {
  583. select {
  584. case err := <-errorchan:
  585. return err
  586. }
  587. }
  588. }
  589. return a.RunWebsocketTask(run)
  590. }
  591. var b64 = base64.StdEncoding
  592. var magicGzip = []byte{0x1f, 0x8b, 0x08}
  593. func decodeRelease(data string) (*rspb.Release, error) {
  594. // base64 decode string
  595. b, err := b64.DecodeString(data)
  596. if err != nil {
  597. return nil, err
  598. }
  599. // For backwards compatibility with releases that were stored before
  600. // compression was introduced we skip decompression if the
  601. // gzip magic header is not found
  602. if bytes.Equal(b[0:3], magicGzip) {
  603. r, err := gzip.NewReader(bytes.NewReader(b))
  604. if err != nil {
  605. return nil, err
  606. }
  607. defer r.Close()
  608. b2, err := ioutil.ReadAll(r)
  609. if err != nil {
  610. return nil, err
  611. }
  612. b = b2
  613. }
  614. var rls rspb.Release
  615. // unmarshal release object bytes
  616. if err := json.Unmarshal(b, &rls); err != nil {
  617. return nil, err
  618. }
  619. return &rls, nil
  620. }
  621. func contains(s []string, str string) bool {
  622. for _, v := range s {
  623. if v == str {
  624. return true
  625. }
  626. }
  627. return false
  628. }
  629. func parseSecretToHelmRelease(secret v1.Secret, chartList []string) (*rspb.Release, bool, error) {
  630. if secret.Type != "helm.sh/release.v1" {
  631. return nil, true, nil
  632. }
  633. releaseData, ok := secret.Data["release"]
  634. if !ok {
  635. return nil, true, fmt.Errorf("release field not found")
  636. }
  637. helm_object, err := decodeRelease(string(releaseData))
  638. if err != nil {
  639. return nil, true, err
  640. }
  641. if len(chartList) > 0 && !contains(chartList, helm_object.Name) {
  642. return nil, true, nil
  643. }
  644. return helm_object, false, nil
  645. }
  646. func (a *Agent) StreamHelmReleases(conn *websocket.Conn, namespace string, chartList []string, selectors string) error {
  647. run := func() error {
  648. tweakListOptionsFunc := func(options *metav1.ListOptions) {
  649. options.LabelSelector = selectors
  650. }
  651. factory := informers.NewSharedInformerFactoryWithOptions(
  652. a.Clientset,
  653. 0,
  654. informers.WithTweakListOptions(tweakListOptionsFunc),
  655. informers.WithNamespace(namespace),
  656. )
  657. informer := factory.Core().V1().Secrets().Informer()
  658. stopper := make(chan struct{})
  659. errorchan := make(chan error)
  660. defer close(stopper)
  661. informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
  662. if strings.HasSuffix(err.Error(), ": Unauthorized") {
  663. errorchan <- &AuthError{}
  664. }
  665. })
  666. informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
  667. UpdateFunc: func(oldObj, newObj interface{}) {
  668. secretObj, ok := newObj.(*v1.Secret)
  669. if !ok {
  670. errorchan <- fmt.Errorf("could not cast to secret")
  671. return
  672. }
  673. helm_object, isNotHelmRelease, err := parseSecretToHelmRelease(*secretObj, chartList)
  674. if isNotHelmRelease && err == nil {
  675. return
  676. }
  677. if err != nil {
  678. errorchan <- err
  679. return
  680. }
  681. msg := Message{
  682. EventType: "UPDATE",
  683. Object: helm_object,
  684. }
  685. if writeErr := conn.WriteJSON(msg); writeErr != nil {
  686. errorchan <- writeErr
  687. return
  688. }
  689. },
  690. AddFunc: func(obj interface{}) {
  691. secretObj, ok := obj.(*v1.Secret)
  692. if !ok {
  693. errorchan <- fmt.Errorf("could not cast to secret")
  694. return
  695. }
  696. helm_object, isNotHelmRelease, err := parseSecretToHelmRelease(*secretObj, chartList)
  697. if isNotHelmRelease && err == nil {
  698. return
  699. }
  700. if err != nil {
  701. errorchan <- err
  702. return
  703. }
  704. msg := Message{
  705. EventType: "ADD",
  706. Object: helm_object,
  707. }
  708. if writeErr := conn.WriteJSON(msg); writeErr != nil {
  709. errorchan <- writeErr
  710. return
  711. }
  712. },
  713. DeleteFunc: func(obj interface{}) {
  714. secretObj, ok := obj.(*v1.Secret)
  715. if !ok {
  716. errorchan <- fmt.Errorf("could not cast to secret")
  717. return
  718. }
  719. helm_object, isNotHelmRelease, err := parseSecretToHelmRelease(*secretObj, chartList)
  720. if isNotHelmRelease && err == nil {
  721. return
  722. }
  723. if err != nil {
  724. errorchan <- err
  725. return
  726. }
  727. msg := Message{
  728. EventType: "DELETE",
  729. Object: helm_object,
  730. }
  731. if writeErr := conn.WriteJSON(msg); writeErr != nil {
  732. errorchan <- writeErr
  733. return
  734. }
  735. },
  736. })
  737. go func() {
  738. // listens for websocket closing handshake
  739. for {
  740. if _, _, err := conn.ReadMessage(); err != nil {
  741. conn.Close()
  742. errorchan <- nil
  743. return
  744. }
  745. }
  746. }()
  747. go informer.Run(stopper)
  748. for {
  749. select {
  750. case err := <-errorchan:
  751. return err
  752. }
  753. }
  754. }
  755. return a.RunWebsocketTask(run)
  756. }
  757. // ProvisionECR spawns a new provisioning pod that creates an ECR instance
  758. func (a *Agent) ProvisionECR(
  759. projectID uint,
  760. awsConf *integrations.AWSIntegration,
  761. ecrName string,
  762. repo repository.Repository,
  763. infra *models.Infra,
  764. operation provisioner.ProvisionerOperation,
  765. pgConf *config.DBConf,
  766. redisConf *config.RedisConf,
  767. provImageTag string,
  768. provImagePullSecret string,
  769. ) (*batchv1.Job, error) {
  770. id := infra.GetUniqueName()
  771. prov := &provisioner.Conf{
  772. ID: id,
  773. Name: fmt.Sprintf("prov-%s-%s", id, string(operation)),
  774. Kind: provisioner.ECR,
  775. Operation: operation,
  776. Redis: redisConf,
  777. Postgres: pgConf,
  778. ProvisionerImageTag: provImageTag,
  779. ImagePullSecret: provImagePullSecret,
  780. LastApplied: infra.LastApplied,
  781. AWS: &aws.Conf{
  782. AWSRegion: awsConf.AWSRegion,
  783. AWSAccessKeyID: string(awsConf.AWSAccessKeyID),
  784. AWSSecretAccessKey: string(awsConf.AWSSecretAccessKey),
  785. },
  786. ECR: &ecr.Conf{
  787. ECRName: ecrName,
  788. },
  789. }
  790. return a.provision(prov, infra, repo)
  791. }
  792. // ProvisionEKS spawns a new provisioning pod that creates an EKS instance
  793. func (a *Agent) ProvisionEKS(
  794. projectID uint,
  795. awsConf *integrations.AWSIntegration,
  796. eksName, machineType string,
  797. repo repository.Repository,
  798. infra *models.Infra,
  799. operation provisioner.ProvisionerOperation,
  800. pgConf *config.DBConf,
  801. redisConf *config.RedisConf,
  802. provImageTag string,
  803. provImagePullSecret string,
  804. ) (*batchv1.Job, error) {
  805. id := infra.GetUniqueName()
  806. prov := &provisioner.Conf{
  807. ID: id,
  808. Name: fmt.Sprintf("prov-%s-%s", id, string(operation)),
  809. Kind: provisioner.EKS,
  810. Operation: operation,
  811. Redis: redisConf,
  812. Postgres: pgConf,
  813. ProvisionerImageTag: provImageTag,
  814. ImagePullSecret: provImagePullSecret,
  815. LastApplied: infra.LastApplied,
  816. AWS: &aws.Conf{
  817. AWSRegion: awsConf.AWSRegion,
  818. AWSAccessKeyID: string(awsConf.AWSAccessKeyID),
  819. AWSSecretAccessKey: string(awsConf.AWSSecretAccessKey),
  820. },
  821. EKS: &eks.Conf{
  822. ClusterName: eksName,
  823. MachineType: machineType,
  824. },
  825. }
  826. return a.provision(prov, infra, repo)
  827. }
  828. // ProvisionGCR spawns a new provisioning pod that creates a GCR instance
  829. func (a *Agent) ProvisionGCR(
  830. projectID uint,
  831. gcpConf *integrations.GCPIntegration,
  832. repo repository.Repository,
  833. infra *models.Infra,
  834. operation provisioner.ProvisionerOperation,
  835. pgConf *config.DBConf,
  836. redisConf *config.RedisConf,
  837. provImageTag string,
  838. provImagePullSecret string,
  839. ) (*batchv1.Job, error) {
  840. id := infra.GetUniqueName()
  841. prov := &provisioner.Conf{
  842. ID: id,
  843. Name: fmt.Sprintf("prov-%s-%s", id, string(operation)),
  844. Kind: provisioner.GCR,
  845. Operation: operation,
  846. Redis: redisConf,
  847. Postgres: pgConf,
  848. ProvisionerImageTag: provImageTag,
  849. ImagePullSecret: provImagePullSecret,
  850. LastApplied: infra.LastApplied,
  851. GCP: &gcp.Conf{
  852. GCPRegion: gcpConf.GCPRegion,
  853. GCPProjectID: gcpConf.GCPProjectID,
  854. GCPKeyData: string(gcpConf.GCPKeyData),
  855. },
  856. }
  857. return a.provision(prov, infra, repo)
  858. }
  859. // ProvisionGKE spawns a new provisioning pod that creates a GKE instance
  860. func (a *Agent) ProvisionGKE(
  861. projectID uint,
  862. gcpConf *integrations.GCPIntegration,
  863. gkeName string,
  864. repo repository.Repository,
  865. infra *models.Infra,
  866. operation provisioner.ProvisionerOperation,
  867. pgConf *config.DBConf,
  868. redisConf *config.RedisConf,
  869. provImageTag string,
  870. provImagePullSecret string,
  871. ) (*batchv1.Job, error) {
  872. id := infra.GetUniqueName()
  873. prov := &provisioner.Conf{
  874. ID: id,
  875. Name: fmt.Sprintf("prov-%s-%s", id, string(operation)),
  876. Kind: provisioner.GKE,
  877. Operation: operation,
  878. Redis: redisConf,
  879. Postgres: pgConf,
  880. ProvisionerImageTag: provImageTag,
  881. ImagePullSecret: provImagePullSecret,
  882. LastApplied: infra.LastApplied,
  883. GCP: &gcp.Conf{
  884. GCPRegion: gcpConf.GCPRegion,
  885. GCPProjectID: gcpConf.GCPProjectID,
  886. GCPKeyData: string(gcpConf.GCPKeyData),
  887. },
  888. GKE: &gke.Conf{
  889. ClusterName: gkeName,
  890. },
  891. }
  892. return a.provision(prov, infra, repo)
  893. }
  894. // ProvisionDOCR spawns a new provisioning pod that creates a DOCR instance
  895. func (a *Agent) ProvisionDOCR(
  896. projectID uint,
  897. doConf *integrations.OAuthIntegration,
  898. doAuth *oauth2.Config,
  899. repo repository.Repository,
  900. docrName, docrSubscriptionTier string,
  901. infra *models.Infra,
  902. operation provisioner.ProvisionerOperation,
  903. pgConf *config.DBConf,
  904. redisConf *config.RedisConf,
  905. provImageTag string,
  906. provImagePullSecret string,
  907. ) (*batchv1.Job, error) {
  908. // get the token
  909. oauthInt, err := repo.OAuthIntegration.ReadOAuthIntegration(
  910. infra.DOIntegrationID,
  911. )
  912. if err != nil {
  913. return nil, err
  914. }
  915. tok, _, err := oauth.GetAccessToken(oauthInt.SharedOAuthModel, doAuth, oauth.MakeUpdateOAuthIntegrationTokenFunction(oauthInt, repo))
  916. if err != nil {
  917. return nil, err
  918. }
  919. id := infra.GetUniqueName()
  920. prov := &provisioner.Conf{
  921. ID: id,
  922. Name: fmt.Sprintf("prov-%s-%s", id, string(operation)),
  923. Kind: provisioner.DOCR,
  924. Operation: operation,
  925. Redis: redisConf,
  926. Postgres: pgConf,
  927. ProvisionerImageTag: provImageTag,
  928. ImagePullSecret: provImagePullSecret,
  929. LastApplied: infra.LastApplied,
  930. DO: &do.Conf{
  931. DOToken: tok,
  932. },
  933. DOCR: &docr.Conf{
  934. DOCRName: docrName,
  935. DOCRSubscriptionTier: docrSubscriptionTier,
  936. },
  937. }
  938. return a.provision(prov, infra, repo)
  939. }
  940. // ProvisionDOKS spawns a new provisioning pod that creates a DOKS instance
  941. func (a *Agent) ProvisionDOKS(
  942. projectID uint,
  943. doConf *integrations.OAuthIntegration,
  944. doAuth *oauth2.Config,
  945. repo repository.Repository,
  946. doRegion, doksClusterName string,
  947. infra *models.Infra,
  948. operation provisioner.ProvisionerOperation,
  949. pgConf *config.DBConf,
  950. redisConf *config.RedisConf,
  951. provImageTag string,
  952. provImagePullSecret string,
  953. ) (*batchv1.Job, error) {
  954. // get the token
  955. oauthInt, err := repo.OAuthIntegration.ReadOAuthIntegration(
  956. infra.DOIntegrationID,
  957. )
  958. if err != nil {
  959. return nil, err
  960. }
  961. tok, _, err := oauth.GetAccessToken(oauthInt.SharedOAuthModel, doAuth, oauth.MakeUpdateOAuthIntegrationTokenFunction(oauthInt, repo))
  962. if err != nil {
  963. return nil, err
  964. }
  965. id := infra.GetUniqueName()
  966. prov := &provisioner.Conf{
  967. ID: id,
  968. Name: fmt.Sprintf("prov-%s-%s", id, string(operation)),
  969. Kind: provisioner.DOKS,
  970. Operation: operation,
  971. Redis: redisConf,
  972. Postgres: pgConf,
  973. LastApplied: infra.LastApplied,
  974. ProvisionerImageTag: provImageTag,
  975. ImagePullSecret: provImagePullSecret,
  976. DO: &do.Conf{
  977. DOToken: tok,
  978. },
  979. DOKS: &doks.Conf{
  980. DORegion: doRegion,
  981. DOKSClusterName: doksClusterName,
  982. },
  983. }
  984. return a.provision(prov, infra, repo)
  985. }
  986. // ProvisionTest spawns a new provisioning pod that tests provisioning
  987. func (a *Agent) ProvisionTest(
  988. projectID uint,
  989. infra *models.Infra,
  990. repo repository.Repository,
  991. operation provisioner.ProvisionerOperation,
  992. pgConf *config.DBConf,
  993. redisConf *config.RedisConf,
  994. provImageTag string,
  995. provImagePullSecret string,
  996. ) (*batchv1.Job, error) {
  997. id := infra.GetUniqueName()
  998. prov := &provisioner.Conf{
  999. ID: id,
  1000. Name: fmt.Sprintf("prov-%s-%s", id, string(operation)),
  1001. Operation: operation,
  1002. Kind: provisioner.Test,
  1003. Redis: redisConf,
  1004. Postgres: pgConf,
  1005. ProvisionerImageTag: provImageTag,
  1006. ImagePullSecret: provImagePullSecret,
  1007. }
  1008. return a.provision(prov, infra, repo)
  1009. }
  1010. func (a *Agent) provision(
  1011. prov *provisioner.Conf,
  1012. infra *models.Infra,
  1013. repo repository.Repository,
  1014. ) (*batchv1.Job, error) {
  1015. prov.Namespace = "default"
  1016. job, err := prov.GetProvisionerJobTemplate()
  1017. if err != nil {
  1018. return nil, err
  1019. }
  1020. job, err = a.Clientset.BatchV1().Jobs(prov.Namespace).Create(
  1021. context.TODO(),
  1022. job,
  1023. metav1.CreateOptions{},
  1024. )
  1025. if err != nil {
  1026. return nil, err
  1027. }
  1028. infra.LastApplied = prov.LastApplied
  1029. infra, err = repo.Infra.UpdateInfra(infra)
  1030. if err != nil {
  1031. return nil, err
  1032. }
  1033. return job, nil
  1034. }
  1035. // CreateImagePullSecrets will create the required image pull secrets and
  1036. // return a map from the registry name to the name of the secret.
  1037. func (a *Agent) CreateImagePullSecrets(
  1038. repo repository.Repository,
  1039. namespace string,
  1040. linkedRegs map[string]*models.Registry,
  1041. doAuth *oauth2.Config,
  1042. ) (map[string]string, error) {
  1043. res := make(map[string]string)
  1044. for key, val := range linkedRegs {
  1045. _reg := registry.Registry(*val)
  1046. data, err := _reg.GetDockerConfigJSON(repo, doAuth)
  1047. if err != nil {
  1048. return nil, err
  1049. }
  1050. secretName := fmt.Sprintf("porter-%s-%d", val.Externalize().Service, val.ID)
  1051. secret, err := a.Clientset.CoreV1().Secrets(namespace).Get(
  1052. context.TODO(),
  1053. secretName,
  1054. metav1.GetOptions{},
  1055. )
  1056. // if not found, create the secret
  1057. if err != nil && errors.IsNotFound(err) {
  1058. _, err = a.Clientset.CoreV1().Secrets(namespace).Create(
  1059. context.TODO(),
  1060. &v1.Secret{
  1061. ObjectMeta: metav1.ObjectMeta{
  1062. Name: secretName,
  1063. },
  1064. Data: map[string][]byte{
  1065. string(v1.DockerConfigJsonKey): data,
  1066. },
  1067. Type: v1.SecretTypeDockerConfigJson,
  1068. },
  1069. metav1.CreateOptions{},
  1070. )
  1071. if err != nil {
  1072. return nil, err
  1073. }
  1074. // add secret name to the map
  1075. res[key] = secretName
  1076. continue
  1077. } else if err != nil {
  1078. return nil, err
  1079. }
  1080. // otherwise, check that the secret contains the correct data: if
  1081. // if doesn't, update it
  1082. if !bytes.Equal(secret.Data[v1.DockerConfigJsonKey], data) {
  1083. _, err := a.Clientset.CoreV1().Secrets(namespace).Update(
  1084. context.TODO(),
  1085. &v1.Secret{
  1086. ObjectMeta: metav1.ObjectMeta{
  1087. Name: secretName,
  1088. },
  1089. Data: map[string][]byte{
  1090. string(v1.DockerConfigJsonKey): data,
  1091. },
  1092. Type: v1.SecretTypeDockerConfigJson,
  1093. },
  1094. metav1.UpdateOptions{},
  1095. )
  1096. if err != nil {
  1097. return nil, err
  1098. }
  1099. }
  1100. // add secret name to the map
  1101. res[key] = secretName
  1102. }
  1103. return res, nil
  1104. }