agent.go 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329
  1. package kubernetes
  2. import (
  3. "bufio"
  4. "bytes"
  5. "compress/gzip"
  6. "context"
  7. "encoding/base64"
  8. "encoding/json"
  9. "fmt"
  10. "io"
  11. "io/ioutil"
  12. "strings"
  13. "time"
  14. "github.com/porter-dev/porter/api/server/shared/config/env"
  15. "github.com/porter-dev/porter/internal/kubernetes/provisioner"
  16. "github.com/porter-dev/porter/internal/kubernetes/provisioner/aws"
  17. "github.com/porter-dev/porter/internal/kubernetes/provisioner/aws/ecr"
  18. "github.com/porter-dev/porter/internal/kubernetes/provisioner/aws/eks"
  19. "github.com/porter-dev/porter/internal/kubernetes/provisioner/do"
  20. "github.com/porter-dev/porter/internal/kubernetes/provisioner/do/docr"
  21. "github.com/porter-dev/porter/internal/kubernetes/provisioner/do/doks"
  22. "github.com/porter-dev/porter/internal/kubernetes/provisioner/gcp"
  23. "github.com/porter-dev/porter/internal/kubernetes/provisioner/gcp/gke"
  24. "github.com/porter-dev/porter/internal/models"
  25. "github.com/porter-dev/porter/internal/models/integrations"
  26. "github.com/porter-dev/porter/internal/oauth"
  27. "github.com/porter-dev/porter/internal/registry"
  28. "github.com/porter-dev/porter/internal/repository"
  29. "golang.org/x/oauth2"
  30. errors2 "errors"
  31. "github.com/gorilla/websocket"
  32. "github.com/porter-dev/porter/internal/helm/grapher"
  33. appsv1 "k8s.io/api/apps/v1"
  34. batchv1 "k8s.io/api/batch/v1"
  35. batchv1beta1 "k8s.io/api/batch/v1beta1"
  36. v1 "k8s.io/api/core/v1"
  37. v1beta1 "k8s.io/api/extensions/v1beta1"
  38. "k8s.io/apimachinery/pkg/api/errors"
  39. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  40. "k8s.io/apimachinery/pkg/runtime"
  41. "k8s.io/apimachinery/pkg/runtime/schema"
  42. "k8s.io/apimachinery/pkg/types"
  43. "k8s.io/cli-runtime/pkg/genericclioptions"
  44. "k8s.io/client-go/informers"
  45. "k8s.io/client-go/kubernetes"
  46. "k8s.io/client-go/rest"
  47. "k8s.io/client-go/tools/cache"
  48. "k8s.io/client-go/tools/remotecommand"
  49. rspb "helm.sh/helm/v3/pkg/release"
  50. )
  51. // Agent is a Kubernetes agent for performing operations that interact with the
  52. // api server
  53. type Agent struct {
  54. RESTClientGetter genericclioptions.RESTClientGetter
  55. Clientset kubernetes.Interface
  56. }
  57. type Message struct {
  58. EventType string `json:"event_type"`
  59. Object interface{}
  60. Kind string
  61. }
  62. type ListOptions struct {
  63. FieldSelector string
  64. }
  65. type AuthError struct{}
  66. func (e *AuthError) Error() string {
  67. return "Unauthorized error"
  68. }
  69. // UpdateClientset updates the Agent's Clientset (this refreshes auth tokens)
  70. func (a *Agent) UpdateClientset() error {
  71. restConf, err := a.RESTClientGetter.ToRESTConfig()
  72. if err != nil {
  73. return err
  74. }
  75. clientset, err := kubernetes.NewForConfig(restConf)
  76. if err != nil {
  77. return err
  78. }
  79. a.Clientset = clientset
  80. return nil
  81. }
  82. // CreateConfigMap creates the configmap given the key-value pairs and namespace
  83. func (a *Agent) CreateConfigMap(name string, namespace string, configMap map[string]string) (*v1.ConfigMap, error) {
  84. return a.Clientset.CoreV1().ConfigMaps(namespace).Create(
  85. context.TODO(),
  86. &v1.ConfigMap{
  87. ObjectMeta: metav1.ObjectMeta{
  88. Name: name,
  89. Namespace: namespace,
  90. Labels: map[string]string{
  91. "porter": "true",
  92. },
  93. },
  94. Data: configMap,
  95. },
  96. metav1.CreateOptions{},
  97. )
  98. }
  99. // CreateLinkedSecret creates a secret given the key-value pairs and namespace. Values are
  100. // base64 encoded
  101. func (a *Agent) CreateLinkedSecret(name, namespace, cmName string, data map[string][]byte) (*v1.Secret, error) {
  102. return a.Clientset.CoreV1().Secrets(namespace).Create(
  103. context.TODO(),
  104. &v1.Secret{
  105. ObjectMeta: metav1.ObjectMeta{
  106. Name: name,
  107. Namespace: namespace,
  108. Labels: map[string]string{
  109. "porter": "true",
  110. "configmap": cmName,
  111. },
  112. },
  113. Data: data,
  114. },
  115. metav1.CreateOptions{},
  116. )
  117. }
  118. type mergeConfigMapData struct {
  119. Data map[string]*string `json:"data"`
  120. }
  121. // UpdateConfigMap updates the configmap given its name and namespace
  122. func (a *Agent) UpdateConfigMap(name string, namespace string, configMap map[string]string) (*v1.ConfigMap, error) {
  123. cmData := make(map[string]*string)
  124. for key, val := range configMap {
  125. valCopy := val
  126. cmData[key] = &valCopy
  127. if len(val) == 0 {
  128. cmData[key] = nil
  129. }
  130. }
  131. mergeCM := &mergeConfigMapData{
  132. Data: cmData,
  133. }
  134. patchBytes, err := json.Marshal(mergeCM)
  135. if err != nil {
  136. return nil, err
  137. }
  138. return a.Clientset.CoreV1().ConfigMaps(namespace).Patch(
  139. context.Background(),
  140. name,
  141. types.MergePatchType,
  142. patchBytes,
  143. metav1.PatchOptions{},
  144. )
  145. }
  146. type mergeLinkedSecretData struct {
  147. Data map[string]*[]byte `json:"data"`
  148. }
  149. // UpdateLinkedSecret updates the secret given its name and namespace
  150. func (a *Agent) UpdateLinkedSecret(name, namespace, cmName string, data map[string][]byte) error {
  151. secretData := make(map[string]*[]byte)
  152. for key, val := range data {
  153. valCopy := val
  154. secretData[key] = &valCopy
  155. if len(val) == 0 {
  156. secretData[key] = nil
  157. }
  158. }
  159. mergeSecret := &mergeLinkedSecretData{
  160. Data: secretData,
  161. }
  162. patchBytes, err := json.Marshal(mergeSecret)
  163. if err != nil {
  164. return err
  165. }
  166. _, err = a.Clientset.CoreV1().Secrets(namespace).Patch(
  167. context.TODO(),
  168. name,
  169. types.MergePatchType,
  170. patchBytes,
  171. metav1.PatchOptions{},
  172. )
  173. return err
  174. }
  175. // DeleteConfigMap deletes the configmap given its name and namespace
  176. func (a *Agent) DeleteConfigMap(name string, namespace string) error {
  177. return a.Clientset.CoreV1().ConfigMaps(namespace).Delete(
  178. context.TODO(),
  179. name,
  180. metav1.DeleteOptions{},
  181. )
  182. }
  183. // DeleteLinkedSecret deletes the secret given its name and namespace
  184. func (a *Agent) DeleteLinkedSecret(name, namespace string) error {
  185. return a.Clientset.CoreV1().Secrets(namespace).Delete(
  186. context.TODO(),
  187. name,
  188. metav1.DeleteOptions{},
  189. )
  190. }
  191. // GetConfigMap retrieves the configmap given its name and namespace
  192. func (a *Agent) GetConfigMap(name string, namespace string) (*v1.ConfigMap, error) {
  193. return a.Clientset.CoreV1().ConfigMaps(namespace).Get(
  194. context.TODO(),
  195. name,
  196. metav1.GetOptions{},
  197. )
  198. }
  199. // GetSecret retrieves the secret given its name and namespace
  200. func (a *Agent) GetSecret(name string, namespace string) (*v1.Secret, error) {
  201. return a.Clientset.CoreV1().Secrets(namespace).Get(
  202. context.TODO(),
  203. name,
  204. metav1.GetOptions{},
  205. )
  206. }
  207. // ListConfigMaps simply lists namespaces
  208. func (a *Agent) ListConfigMaps(namespace string) (*v1.ConfigMapList, error) {
  209. return a.Clientset.CoreV1().ConfigMaps(namespace).List(
  210. context.TODO(),
  211. metav1.ListOptions{
  212. LabelSelector: "porter=true",
  213. },
  214. )
  215. }
  216. // ListEvents lists the events of a given object.
  217. func (a *Agent) ListEvents(name string, namespace string) (*v1.EventList, error) {
  218. return a.Clientset.CoreV1().Events(namespace).List(
  219. context.TODO(),
  220. metav1.ListOptions{
  221. FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=%s", name, namespace),
  222. },
  223. )
  224. }
  225. // ListNamespaces simply lists namespaces
  226. func (a *Agent) ListNamespaces() (*v1.NamespaceList, error) {
  227. return a.Clientset.CoreV1().Namespaces().List(
  228. context.TODO(),
  229. metav1.ListOptions{},
  230. )
  231. }
  232. // CreateNamespace creates a namespace with the given name.
  233. func (a *Agent) CreateNamespace(name string) (*v1.Namespace, error) {
  234. namespace := v1.Namespace{
  235. ObjectMeta: metav1.ObjectMeta{
  236. Name: name,
  237. },
  238. }
  239. return a.Clientset.CoreV1().Namespaces().Create(
  240. context.TODO(),
  241. &namespace,
  242. metav1.CreateOptions{},
  243. )
  244. }
  245. // DeleteNamespace deletes the namespace given the name.
  246. func (a *Agent) DeleteNamespace(name string) error {
  247. return a.Clientset.CoreV1().Namespaces().Delete(
  248. context.TODO(),
  249. name,
  250. metav1.DeleteOptions{},
  251. )
  252. }
  253. // ListJobsByLabel lists jobs in a namespace matching a label
  254. type Label struct {
  255. Key string
  256. Val string
  257. }
  258. func (a *Agent) ListJobsByLabel(namespace string, labels ...Label) ([]batchv1.Job, error) {
  259. selectors := make([]string, 0)
  260. for _, label := range labels {
  261. selectors = append(selectors, fmt.Sprintf("%s=%s", label.Key, label.Val))
  262. }
  263. resp, err := a.Clientset.BatchV1().Jobs(namespace).List(
  264. context.TODO(),
  265. metav1.ListOptions{
  266. LabelSelector: strings.Join(selectors, ","),
  267. },
  268. )
  269. if err != nil {
  270. return nil, err
  271. }
  272. return resp.Items, nil
  273. }
  274. // DeleteJob deletes the job in the given name and namespace.
  275. func (a *Agent) DeleteJob(name, namespace string) error {
  276. return a.Clientset.BatchV1().Jobs(namespace).Delete(
  277. context.TODO(),
  278. name,
  279. metav1.DeleteOptions{},
  280. )
  281. }
  282. // GetJobPods lists all pods belonging to a job in a namespace
  283. func (a *Agent) GetJobPods(namespace, jobName string) ([]v1.Pod, error) {
  284. resp, err := a.Clientset.CoreV1().Pods(namespace).List(
  285. context.TODO(),
  286. metav1.ListOptions{
  287. LabelSelector: fmt.Sprintf("%s=%s", "job-name", jobName),
  288. },
  289. )
  290. if err != nil {
  291. return nil, err
  292. }
  293. return resp.Items, nil
  294. }
  295. // GetIngress gets ingress given the name and namespace
  296. func (a *Agent) GetIngress(namespace string, name string) (*v1beta1.Ingress, error) {
  297. return a.Clientset.ExtensionsV1beta1().Ingresses(namespace).Get(
  298. context.TODO(),
  299. name,
  300. metav1.GetOptions{},
  301. )
  302. }
  303. var IsNotFoundError = fmt.Errorf("not found")
  304. // GetDeployment gets the deployment given the name and namespace
  305. func (a *Agent) GetDeployment(c grapher.Object) (*appsv1.Deployment, error) {
  306. res, err := a.Clientset.AppsV1().Deployments(c.Namespace).Get(
  307. context.TODO(),
  308. c.Name,
  309. metav1.GetOptions{},
  310. )
  311. if err != nil && errors.IsNotFound(err) {
  312. return nil, IsNotFoundError
  313. } else if err != nil {
  314. return nil, err
  315. }
  316. res.Kind = c.Kind
  317. return res, nil
  318. }
  319. // GetStatefulSet gets the statefulset given the name and namespace
  320. func (a *Agent) GetStatefulSet(c grapher.Object) (*appsv1.StatefulSet, error) {
  321. res, err := a.Clientset.AppsV1().StatefulSets(c.Namespace).Get(
  322. context.TODO(),
  323. c.Name,
  324. metav1.GetOptions{},
  325. )
  326. if err != nil && errors.IsNotFound(err) {
  327. return nil, IsNotFoundError
  328. } else if err != nil {
  329. return nil, err
  330. }
  331. res.Kind = c.Kind
  332. return res, nil
  333. }
  334. // GetReplicaSet gets the replicaset given the name and namespace
  335. func (a *Agent) GetReplicaSet(c grapher.Object) (*appsv1.ReplicaSet, error) {
  336. res, err := a.Clientset.AppsV1().ReplicaSets(c.Namespace).Get(
  337. context.TODO(),
  338. c.Name,
  339. metav1.GetOptions{},
  340. )
  341. if err != nil && errors.IsNotFound(err) {
  342. return nil, IsNotFoundError
  343. } else if err != nil {
  344. return nil, err
  345. }
  346. res.Kind = c.Kind
  347. return res, nil
  348. }
  349. // GetDaemonSet gets the daemonset by name and namespace
  350. func (a *Agent) GetDaemonSet(c grapher.Object) (*appsv1.DaemonSet, error) {
  351. res, err := a.Clientset.AppsV1().DaemonSets(c.Namespace).Get(
  352. context.TODO(),
  353. c.Name,
  354. metav1.GetOptions{},
  355. )
  356. if err != nil && errors.IsNotFound(err) {
  357. return nil, IsNotFoundError
  358. } else if err != nil {
  359. return nil, err
  360. }
  361. res.Kind = c.Kind
  362. return res, nil
  363. }
  364. // GetJob gets the job by name and namespace
  365. func (a *Agent) GetJob(c grapher.Object) (*batchv1.Job, error) {
  366. res, err := a.Clientset.BatchV1().Jobs(c.Namespace).Get(
  367. context.TODO(),
  368. c.Name,
  369. metav1.GetOptions{},
  370. )
  371. if err != nil && errors.IsNotFound(err) {
  372. return nil, IsNotFoundError
  373. } else if err != nil {
  374. return nil, err
  375. }
  376. res.Kind = c.Kind
  377. return res, nil
  378. }
  379. // GetCronJob gets the CronJob by name and namespace
  380. func (a *Agent) GetCronJob(c grapher.Object) (*batchv1beta1.CronJob, error) {
  381. res, err := a.Clientset.BatchV1beta1().CronJobs(c.Namespace).Get(
  382. context.TODO(),
  383. c.Name,
  384. metav1.GetOptions{},
  385. )
  386. if err != nil && errors.IsNotFound(err) {
  387. return nil, IsNotFoundError
  388. } else if err != nil {
  389. return nil, err
  390. }
  391. res.Kind = c.Kind
  392. return res, nil
  393. }
  394. // GetPodsByLabel retrieves pods with matching labels
  395. func (a *Agent) GetPodsByLabel(selector string, namespace string) (*v1.PodList, error) {
  396. // Search in all namespaces for matching pods
  397. return a.Clientset.CoreV1().Pods(namespace).List(
  398. context.TODO(),
  399. metav1.ListOptions{
  400. LabelSelector: selector,
  401. },
  402. )
  403. }
  404. // DeletePod deletes a pod by name and namespace
  405. func (a *Agent) DeletePod(namespace string, name string) error {
  406. return a.Clientset.CoreV1().Pods(namespace).Delete(
  407. context.TODO(),
  408. name,
  409. metav1.DeleteOptions{},
  410. )
  411. }
  412. // GetPodLogs streams real-time logs from a given pod.
  413. func (a *Agent) GetPodLogs(namespace string, name string, conn *websocket.Conn) error {
  414. // get the pod to read in the list of contains
  415. pod, err := a.Clientset.CoreV1().Pods(namespace).Get(
  416. context.Background(),
  417. name,
  418. metav1.GetOptions{},
  419. )
  420. if err != nil {
  421. return fmt.Errorf("Cannot get pod %s: %s", name, err.Error())
  422. }
  423. container := pod.Spec.Containers[0].Name
  424. tails := int64(400)
  425. // follow logs
  426. podLogOpts := v1.PodLogOptions{
  427. Follow: true,
  428. TailLines: &tails,
  429. Container: container,
  430. }
  431. req := a.Clientset.CoreV1().Pods(namespace).GetLogs(name, &podLogOpts)
  432. podLogs, err := req.Stream(context.TODO())
  433. if err != nil {
  434. return fmt.Errorf("Cannot open log stream for pod %s: %s", name, err.Error())
  435. }
  436. defer podLogs.Close()
  437. r := bufio.NewReader(podLogs)
  438. errorchan := make(chan error)
  439. go func() {
  440. // listens for websocket closing handshake
  441. for {
  442. if _, _, err := conn.ReadMessage(); err != nil {
  443. defer conn.Close()
  444. errorchan <- nil
  445. return
  446. }
  447. }
  448. }()
  449. go func() {
  450. for {
  451. select {
  452. case <-errorchan:
  453. defer close(errorchan)
  454. return
  455. default:
  456. }
  457. bytes, err := r.ReadBytes('\n')
  458. if writeErr := conn.WriteMessage(websocket.TextMessage, bytes); writeErr != nil {
  459. errorchan <- writeErr
  460. return
  461. }
  462. if err != nil {
  463. if err != io.EOF {
  464. errorchan <- err
  465. return
  466. }
  467. errorchan <- nil
  468. return
  469. }
  470. }
  471. }()
  472. for {
  473. select {
  474. case err = <-errorchan:
  475. return err
  476. }
  477. }
  478. }
  479. // StopJobWithJobSidecar sends a termination signal to a job running with a sidecar
  480. func (a *Agent) StopJobWithJobSidecar(namespace, name string) error {
  481. jobPods, err := a.GetJobPods(namespace, name)
  482. if err != nil {
  483. return err
  484. }
  485. podName := jobPods[0].ObjectMeta.Name
  486. restConf, err := a.RESTClientGetter.ToRESTConfig()
  487. restConf.GroupVersion = &schema.GroupVersion{
  488. Group: "api",
  489. Version: "v1",
  490. }
  491. restConf.NegotiatedSerializer = runtime.NewSimpleNegotiatedSerializer(runtime.SerializerInfo{})
  492. restClient, err := rest.RESTClientFor(restConf)
  493. if err != nil {
  494. return err
  495. }
  496. req := restClient.Post().
  497. Resource("pods").
  498. Name(podName).
  499. Namespace(namespace).
  500. SubResource("exec")
  501. req.Param("command", "./signal.sh")
  502. req.Param("container", "sidecar")
  503. req.Param("stdin", "true")
  504. req.Param("stdout", "false")
  505. req.Param("tty", "false")
  506. exec, err := remotecommand.NewSPDYExecutor(restConf, "POST", req.URL())
  507. if err != nil {
  508. return err
  509. }
  510. return exec.Stream(remotecommand.StreamOptions{
  511. Tty: false,
  512. Stdin: strings.NewReader("./signal.sh"),
  513. })
  514. }
  515. // RunWebsocketTask will run a websocket task. If the websocket returns an anauthorized error, it will restart
  516. // the task some number of times until failing
  517. func (a *Agent) RunWebsocketTask(task func() error) error {
  518. lastTime := int64(0)
  519. for {
  520. if err := a.UpdateClientset(); err != nil {
  521. return err
  522. }
  523. err := task()
  524. if err == nil {
  525. return nil
  526. }
  527. if !errors2.Is(err, &AuthError{}) {
  528. return err
  529. }
  530. if time.Now().Unix()-lastTime < 60 { // don't regenerate connection if too many unauthorized errors
  531. return err
  532. }
  533. lastTime = time.Now().Unix()
  534. }
  535. }
  536. // StreamControllerStatus streams controller status. Supports Deployment, StatefulSet, ReplicaSet, and DaemonSet
  537. // TODO: Support Jobs
  538. func (a *Agent) StreamControllerStatus(conn *websocket.Conn, kind string, selectors string) error {
  539. run := func() error {
  540. // selectors is an array of max length 1. StreamControllerStatus accepts calls without the selectors argument.
  541. // selectors argument is a single string with comma separated key=value pairs. (e.g. "app=porter,porter=true")
  542. tweakListOptionsFunc := func(options *metav1.ListOptions) {
  543. options.LabelSelector = selectors
  544. }
  545. factory := informers.NewSharedInformerFactoryWithOptions(
  546. a.Clientset,
  547. 0,
  548. informers.WithTweakListOptions(tweakListOptionsFunc),
  549. )
  550. var informer cache.SharedInformer
  551. // Spins up an informer depending on kind. Convert to lowercase for robustness
  552. switch strings.ToLower(kind) {
  553. case "deployment":
  554. informer = factory.Apps().V1().Deployments().Informer()
  555. case "statefulset":
  556. informer = factory.Apps().V1().StatefulSets().Informer()
  557. case "replicaset":
  558. informer = factory.Apps().V1().ReplicaSets().Informer()
  559. case "daemonset":
  560. informer = factory.Apps().V1().DaemonSets().Informer()
  561. case "job":
  562. informer = factory.Batch().V1().Jobs().Informer()
  563. case "cronjob":
  564. informer = factory.Batch().V1beta1().CronJobs().Informer()
  565. case "namespace":
  566. informer = factory.Core().V1().Namespaces().Informer()
  567. case "pod":
  568. informer = factory.Core().V1().Pods().Informer()
  569. }
  570. stopper := make(chan struct{})
  571. errorchan := make(chan error)
  572. defer close(stopper)
  573. informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
  574. if strings.HasSuffix(err.Error(), ": Unauthorized") {
  575. errorchan <- &AuthError{}
  576. }
  577. })
  578. informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
  579. UpdateFunc: func(oldObj, newObj interface{}) {
  580. msg := Message{
  581. EventType: "UPDATE",
  582. Object: newObj,
  583. Kind: strings.ToLower(kind),
  584. }
  585. if writeErr := conn.WriteJSON(msg); writeErr != nil {
  586. errorchan <- writeErr
  587. return
  588. }
  589. },
  590. AddFunc: func(obj interface{}) {
  591. msg := Message{
  592. EventType: "ADD",
  593. Object: obj,
  594. Kind: strings.ToLower(kind),
  595. }
  596. if writeErr := conn.WriteJSON(msg); writeErr != nil {
  597. errorchan <- writeErr
  598. return
  599. }
  600. },
  601. DeleteFunc: func(obj interface{}) {
  602. msg := Message{
  603. EventType: "DELETE",
  604. Object: obj,
  605. Kind: strings.ToLower(kind),
  606. }
  607. if writeErr := conn.WriteJSON(msg); writeErr != nil {
  608. errorchan <- writeErr
  609. return
  610. }
  611. },
  612. })
  613. go func() {
  614. // listens for websocket closing handshake
  615. for {
  616. if _, _, err := conn.ReadMessage(); err != nil {
  617. conn.Close()
  618. errorchan <- nil
  619. return
  620. }
  621. }
  622. }()
  623. go informer.Run(stopper)
  624. for {
  625. select {
  626. case err := <-errorchan:
  627. return err
  628. }
  629. }
  630. }
  631. return a.RunWebsocketTask(run)
  632. }
  633. var b64 = base64.StdEncoding
  634. var magicGzip = []byte{0x1f, 0x8b, 0x08}
  635. func decodeRelease(data string) (*rspb.Release, error) {
  636. // base64 decode string
  637. b, err := b64.DecodeString(data)
  638. if err != nil {
  639. return nil, err
  640. }
  641. // For backwards compatibility with releases that were stored before
  642. // compression was introduced we skip decompression if the
  643. // gzip magic header is not found
  644. if bytes.Equal(b[0:3], magicGzip) {
  645. r, err := gzip.NewReader(bytes.NewReader(b))
  646. if err != nil {
  647. return nil, err
  648. }
  649. defer r.Close()
  650. b2, err := ioutil.ReadAll(r)
  651. if err != nil {
  652. return nil, err
  653. }
  654. b = b2
  655. }
  656. var rls rspb.Release
  657. // unmarshal release object bytes
  658. if err := json.Unmarshal(b, &rls); err != nil {
  659. return nil, err
  660. }
  661. return &rls, nil
  662. }
  663. func contains(s []string, str string) bool {
  664. for _, v := range s {
  665. if v == str {
  666. return true
  667. }
  668. }
  669. return false
  670. }
  671. func parseSecretToHelmRelease(secret v1.Secret, chartList []string) (*rspb.Release, bool, error) {
  672. if secret.Type != "helm.sh/release.v1" {
  673. return nil, true, nil
  674. }
  675. releaseData, ok := secret.Data["release"]
  676. if !ok {
  677. return nil, true, fmt.Errorf("release field not found")
  678. }
  679. helm_object, err := decodeRelease(string(releaseData))
  680. if err != nil {
  681. return nil, true, err
  682. }
  683. if len(chartList) > 0 && !contains(chartList, helm_object.Name) {
  684. return nil, true, nil
  685. }
  686. return helm_object, false, nil
  687. }
  688. func (a *Agent) StreamHelmReleases(conn *websocket.Conn, namespace string, chartList []string, selectors string) error {
  689. run := func() error {
  690. tweakListOptionsFunc := func(options *metav1.ListOptions) {
  691. options.LabelSelector = selectors
  692. }
  693. factory := informers.NewSharedInformerFactoryWithOptions(
  694. a.Clientset,
  695. 0,
  696. informers.WithTweakListOptions(tweakListOptionsFunc),
  697. informers.WithNamespace(namespace),
  698. )
  699. informer := factory.Core().V1().Secrets().Informer()
  700. stopper := make(chan struct{})
  701. errorchan := make(chan error)
  702. defer close(stopper)
  703. informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
  704. if strings.HasSuffix(err.Error(), ": Unauthorized") {
  705. errorchan <- &AuthError{}
  706. }
  707. })
  708. informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
  709. UpdateFunc: func(oldObj, newObj interface{}) {
  710. secretObj, ok := newObj.(*v1.Secret)
  711. if !ok {
  712. errorchan <- fmt.Errorf("could not cast to secret")
  713. return
  714. }
  715. helm_object, isNotHelmRelease, err := parseSecretToHelmRelease(*secretObj, chartList)
  716. if isNotHelmRelease && err == nil {
  717. return
  718. }
  719. if err != nil {
  720. errorchan <- err
  721. return
  722. }
  723. msg := Message{
  724. EventType: "UPDATE",
  725. Object: helm_object,
  726. }
  727. if writeErr := conn.WriteJSON(msg); writeErr != nil {
  728. errorchan <- writeErr
  729. return
  730. }
  731. },
  732. AddFunc: func(obj interface{}) {
  733. secretObj, ok := obj.(*v1.Secret)
  734. if !ok {
  735. errorchan <- fmt.Errorf("could not cast to secret")
  736. return
  737. }
  738. helm_object, isNotHelmRelease, err := parseSecretToHelmRelease(*secretObj, chartList)
  739. if isNotHelmRelease && err == nil {
  740. return
  741. }
  742. if err != nil {
  743. errorchan <- err
  744. return
  745. }
  746. msg := Message{
  747. EventType: "ADD",
  748. Object: helm_object,
  749. }
  750. if writeErr := conn.WriteJSON(msg); writeErr != nil {
  751. errorchan <- writeErr
  752. return
  753. }
  754. },
  755. DeleteFunc: func(obj interface{}) {
  756. secretObj, ok := obj.(*v1.Secret)
  757. if !ok {
  758. errorchan <- fmt.Errorf("could not cast to secret")
  759. return
  760. }
  761. helm_object, isNotHelmRelease, err := parseSecretToHelmRelease(*secretObj, chartList)
  762. if isNotHelmRelease && err == nil {
  763. return
  764. }
  765. if err != nil {
  766. errorchan <- err
  767. return
  768. }
  769. msg := Message{
  770. EventType: "DELETE",
  771. Object: helm_object,
  772. }
  773. if writeErr := conn.WriteJSON(msg); writeErr != nil {
  774. errorchan <- writeErr
  775. return
  776. }
  777. },
  778. })
  779. go func() {
  780. // listens for websocket closing handshake
  781. for {
  782. if _, _, err := conn.ReadMessage(); err != nil {
  783. conn.Close()
  784. errorchan <- nil
  785. return
  786. }
  787. }
  788. }()
  789. go informer.Run(stopper)
  790. for {
  791. select {
  792. case err := <-errorchan:
  793. return err
  794. }
  795. }
  796. }
  797. return a.RunWebsocketTask(run)
  798. }
  799. type SharedProvisionOpts struct {
  800. ProjectID uint
  801. Repo repository.Repository
  802. Infra *models.Infra
  803. Operation provisioner.ProvisionerOperation
  804. PGConf *env.DBConf
  805. RedisConf *env.RedisConf
  806. ProvImageTag string
  807. ProvImagePullSecret string
  808. }
  809. // ProvisionECR spawns a new provisioning pod that creates an ECR instance
  810. func (a *Agent) ProvisionECR(
  811. opts *SharedProvisionOpts,
  812. awsConf *integrations.AWSIntegration,
  813. ecrName string,
  814. ) (*batchv1.Job, error) {
  815. id := opts.Infra.GetUniqueName()
  816. prov := &provisioner.Conf{
  817. ID: id,
  818. Name: fmt.Sprintf("prov-%s-%s", id, string(opts.Operation)),
  819. Kind: provisioner.ECR,
  820. Operation: opts.Operation,
  821. Redis: opts.RedisConf,
  822. Postgres: opts.PGConf,
  823. ProvisionerImageTag: opts.ProvImageTag,
  824. ImagePullSecret: opts.ProvImagePullSecret,
  825. LastApplied: opts.Infra.LastApplied,
  826. AWS: &aws.Conf{
  827. AWSRegion: awsConf.AWSRegion,
  828. AWSAccessKeyID: string(awsConf.AWSAccessKeyID),
  829. AWSSecretAccessKey: string(awsConf.AWSSecretAccessKey),
  830. },
  831. ECR: &ecr.Conf{
  832. ECRName: ecrName,
  833. },
  834. }
  835. return a.provision(prov, opts.Infra, opts.Repo)
  836. }
  837. // ProvisionEKS spawns a new provisioning pod that creates an EKS instance
  838. func (a *Agent) ProvisionEKS(
  839. opts *SharedProvisionOpts,
  840. awsConf *integrations.AWSIntegration,
  841. eksName, machineType string,
  842. ) (*batchv1.Job, error) {
  843. id := opts.Infra.GetUniqueName()
  844. prov := &provisioner.Conf{
  845. ID: id,
  846. Name: fmt.Sprintf("prov-%s-%s", id, string(opts.Operation)),
  847. Kind: provisioner.EKS,
  848. Operation: opts.Operation,
  849. Redis: opts.RedisConf,
  850. Postgres: opts.PGConf,
  851. ProvisionerImageTag: opts.ProvImageTag,
  852. ImagePullSecret: opts.ProvImagePullSecret,
  853. LastApplied: opts.Infra.LastApplied,
  854. AWS: &aws.Conf{
  855. AWSRegion: awsConf.AWSRegion,
  856. AWSAccessKeyID: string(awsConf.AWSAccessKeyID),
  857. AWSSecretAccessKey: string(awsConf.AWSSecretAccessKey),
  858. },
  859. EKS: &eks.Conf{
  860. ClusterName: eksName,
  861. MachineType: machineType,
  862. },
  863. }
  864. return a.provision(prov, opts.Infra, opts.Repo)
  865. }
  866. // ProvisionGCR spawns a new provisioning pod that creates a GCR instance
  867. func (a *Agent) ProvisionGCR(
  868. opts *SharedProvisionOpts,
  869. gcpConf *integrations.GCPIntegration,
  870. ) (*batchv1.Job, error) {
  871. id := opts.Infra.GetUniqueName()
  872. prov := &provisioner.Conf{
  873. ID: id,
  874. Name: fmt.Sprintf("prov-%s-%s", id, string(opts.Operation)),
  875. Kind: provisioner.GCR,
  876. Operation: opts.Operation,
  877. Redis: opts.RedisConf,
  878. Postgres: opts.PGConf,
  879. ProvisionerImageTag: opts.ProvImageTag,
  880. ImagePullSecret: opts.ProvImagePullSecret,
  881. LastApplied: opts.Infra.LastApplied,
  882. GCP: &gcp.Conf{
  883. GCPRegion: gcpConf.GCPRegion,
  884. GCPProjectID: gcpConf.GCPProjectID,
  885. GCPKeyData: string(gcpConf.GCPKeyData),
  886. },
  887. }
  888. return a.provision(prov, opts.Infra, opts.Repo)
  889. }
  890. // ProvisionGKE spawns a new provisioning pod that creates a GKE instance
  891. func (a *Agent) ProvisionGKE(
  892. opts *SharedProvisionOpts,
  893. gcpConf *integrations.GCPIntegration,
  894. gkeName string,
  895. ) (*batchv1.Job, error) {
  896. id := opts.Infra.GetUniqueName()
  897. prov := &provisioner.Conf{
  898. ID: id,
  899. Name: fmt.Sprintf("prov-%s-%s", id, string(opts.Operation)),
  900. Kind: provisioner.GKE,
  901. Operation: opts.Operation,
  902. Redis: opts.RedisConf,
  903. Postgres: opts.PGConf,
  904. ProvisionerImageTag: opts.ProvImageTag,
  905. ImagePullSecret: opts.ProvImagePullSecret,
  906. LastApplied: opts.Infra.LastApplied,
  907. GCP: &gcp.Conf{
  908. GCPRegion: gcpConf.GCPRegion,
  909. GCPProjectID: gcpConf.GCPProjectID,
  910. GCPKeyData: string(gcpConf.GCPKeyData),
  911. },
  912. GKE: &gke.Conf{
  913. ClusterName: gkeName,
  914. },
  915. }
  916. return a.provision(prov, opts.Infra, opts.Repo)
  917. }
  918. // ProvisionDOCR spawns a new provisioning pod that creates a DOCR instance
  919. func (a *Agent) ProvisionDOCR(
  920. opts *SharedProvisionOpts,
  921. doConf *integrations.OAuthIntegration,
  922. doAuth *oauth2.Config,
  923. docrName, docrSubscriptionTier string,
  924. ) (*batchv1.Job, error) {
  925. // get the token
  926. oauthInt, err := opts.Repo.OAuthIntegration().ReadOAuthIntegration(
  927. opts.ProjectID,
  928. opts.Infra.DOIntegrationID,
  929. )
  930. if err != nil {
  931. return nil, err
  932. }
  933. tok, _, err := oauth.GetAccessToken(oauthInt.SharedOAuthModel, doAuth, oauth.MakeUpdateOAuthIntegrationTokenFunction(oauthInt, opts.Repo))
  934. if err != nil {
  935. return nil, err
  936. }
  937. id := opts.Infra.GetUniqueName()
  938. prov := &provisioner.Conf{
  939. ID: id,
  940. Name: fmt.Sprintf("prov-%s-%s", id, string(opts.Operation)),
  941. Kind: provisioner.DOCR,
  942. Operation: opts.Operation,
  943. Redis: opts.RedisConf,
  944. Postgres: opts.PGConf,
  945. ProvisionerImageTag: opts.ProvImageTag,
  946. ImagePullSecret: opts.ProvImagePullSecret,
  947. LastApplied: opts.Infra.LastApplied,
  948. DO: &do.Conf{
  949. DOToken: tok,
  950. },
  951. DOCR: &docr.Conf{
  952. DOCRName: docrName,
  953. DOCRSubscriptionTier: docrSubscriptionTier,
  954. },
  955. }
  956. return a.provision(prov, opts.Infra, opts.Repo)
  957. }
  958. // ProvisionDOKS spawns a new provisioning pod that creates a DOKS instance
  959. func (a *Agent) ProvisionDOKS(
  960. opts *SharedProvisionOpts,
  961. doConf *integrations.OAuthIntegration,
  962. doAuth *oauth2.Config,
  963. doRegion, doksClusterName string,
  964. ) (*batchv1.Job, error) {
  965. // get the token
  966. oauthInt, err := opts.Repo.OAuthIntegration().ReadOAuthIntegration(
  967. opts.ProjectID,
  968. opts.Infra.DOIntegrationID,
  969. )
  970. if err != nil {
  971. return nil, err
  972. }
  973. tok, _, err := oauth.GetAccessToken(oauthInt.SharedOAuthModel, doAuth, oauth.MakeUpdateOAuthIntegrationTokenFunction(oauthInt, opts.Repo))
  974. if err != nil {
  975. return nil, err
  976. }
  977. id := opts.Infra.GetUniqueName()
  978. prov := &provisioner.Conf{
  979. ID: id,
  980. Name: fmt.Sprintf("prov-%s-%s", id, string(opts.Operation)),
  981. Kind: provisioner.DOKS,
  982. Operation: opts.Operation,
  983. Redis: opts.RedisConf,
  984. Postgres: opts.PGConf,
  985. LastApplied: opts.Infra.LastApplied,
  986. ProvisionerImageTag: opts.ProvImageTag,
  987. ImagePullSecret: opts.ProvImagePullSecret,
  988. DO: &do.Conf{
  989. DOToken: tok,
  990. },
  991. DOKS: &doks.Conf{
  992. DORegion: doRegion,
  993. DOKSClusterName: doksClusterName,
  994. },
  995. }
  996. return a.provision(prov, opts.Infra, opts.Repo)
  997. }
  998. // ProvisionTest spawns a new provisioning pod that tests provisioning
  999. func (a *Agent) ProvisionTest(
  1000. opts *SharedProvisionOpts,
  1001. ) (*batchv1.Job, error) {
  1002. id := opts.Infra.GetUniqueName()
  1003. prov := &provisioner.Conf{
  1004. ID: id,
  1005. Name: fmt.Sprintf("prov-%s-%s", id, string(opts.Operation)),
  1006. Operation: opts.Operation,
  1007. Kind: provisioner.Test,
  1008. Redis: opts.RedisConf,
  1009. Postgres: opts.PGConf,
  1010. ProvisionerImageTag: opts.ProvImageTag,
  1011. ImagePullSecret: opts.ProvImagePullSecret,
  1012. }
  1013. return a.provision(prov, opts.Infra, opts.Repo)
  1014. }
  1015. func (a *Agent) provision(
  1016. prov *provisioner.Conf,
  1017. infra *models.Infra,
  1018. repo repository.Repository,
  1019. ) (*batchv1.Job, error) {
  1020. prov.Namespace = "default"
  1021. job, err := prov.GetProvisionerJobTemplate()
  1022. if err != nil {
  1023. return nil, err
  1024. }
  1025. job, err = a.Clientset.BatchV1().Jobs(prov.Namespace).Create(
  1026. context.TODO(),
  1027. job,
  1028. metav1.CreateOptions{},
  1029. )
  1030. if err != nil {
  1031. return nil, err
  1032. }
  1033. infra.LastApplied = prov.LastApplied
  1034. infra, err = repo.Infra().UpdateInfra(infra)
  1035. if err != nil {
  1036. return nil, err
  1037. }
  1038. return job, nil
  1039. }
  1040. // CreateImagePullSecrets will create the required image pull secrets and
  1041. // return a map from the registry name to the name of the secret.
  1042. func (a *Agent) CreateImagePullSecrets(
  1043. repo repository.Repository,
  1044. namespace string,
  1045. linkedRegs map[string]*models.Registry,
  1046. doAuth *oauth2.Config,
  1047. ) (map[string]string, error) {
  1048. res := make(map[string]string)
  1049. for key, val := range linkedRegs {
  1050. _reg := registry.Registry(*val)
  1051. data, err := _reg.GetDockerConfigJSON(repo, doAuth)
  1052. if err != nil {
  1053. return nil, err
  1054. }
  1055. secretName := fmt.Sprintf("porter-%s-%d", val.ToRegistryType().Service, val.ID)
  1056. secret, err := a.Clientset.CoreV1().Secrets(namespace).Get(
  1057. context.TODO(),
  1058. secretName,
  1059. metav1.GetOptions{},
  1060. )
  1061. // if not found, create the secret
  1062. if err != nil && errors.IsNotFound(err) {
  1063. _, err = a.Clientset.CoreV1().Secrets(namespace).Create(
  1064. context.TODO(),
  1065. &v1.Secret{
  1066. ObjectMeta: metav1.ObjectMeta{
  1067. Name: secretName,
  1068. },
  1069. Data: map[string][]byte{
  1070. string(v1.DockerConfigJsonKey): data,
  1071. },
  1072. Type: v1.SecretTypeDockerConfigJson,
  1073. },
  1074. metav1.CreateOptions{},
  1075. )
  1076. if err != nil {
  1077. return nil, err
  1078. }
  1079. // add secret name to the map
  1080. res[key] = secretName
  1081. continue
  1082. } else if err != nil {
  1083. return nil, err
  1084. }
  1085. // otherwise, check that the secret contains the correct data: if
  1086. // if doesn't, update it
  1087. if !bytes.Equal(secret.Data[v1.DockerConfigJsonKey], data) {
  1088. _, err := a.Clientset.CoreV1().Secrets(namespace).Update(
  1089. context.TODO(),
  1090. &v1.Secret{
  1091. ObjectMeta: metav1.ObjectMeta{
  1092. Name: secretName,
  1093. },
  1094. Data: map[string][]byte{
  1095. string(v1.DockerConfigJsonKey): data,
  1096. },
  1097. Type: v1.SecretTypeDockerConfigJson,
  1098. },
  1099. metav1.UpdateOptions{},
  1100. )
  1101. if err != nil {
  1102. return nil, err
  1103. }
  1104. }
  1105. // add secret name to the map
  1106. res[key] = secretName
  1107. }
  1108. return res, nil
  1109. }