agent.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334
  1. package kubernetes
  2. import (
  3. "bufio"
  4. "context"
  5. "fmt"
  6. "io"
  7. "strings"
  8. "github.com/porter-dev/porter/internal/kubernetes/provisioner"
  9. "github.com/porter-dev/porter/internal/kubernetes/provisioner/aws"
  10. "github.com/porter-dev/porter/internal/kubernetes/provisioner/aws/ecr"
  11. "github.com/porter-dev/porter/internal/kubernetes/provisioner/aws/eks"
  12. "github.com/porter-dev/porter/internal/models"
  13. "github.com/porter-dev/porter/internal/models/integrations"
  14. "github.com/gorilla/websocket"
  15. "github.com/porter-dev/porter/internal/helm/grapher"
  16. appsv1 "k8s.io/api/apps/v1"
  17. batchv1 "k8s.io/api/batch/v1"
  18. v1 "k8s.io/api/core/v1"
  19. v1beta1 "k8s.io/api/extensions/v1beta1"
  20. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  21. "k8s.io/cli-runtime/pkg/genericclioptions"
  22. "k8s.io/client-go/informers"
  23. "k8s.io/client-go/kubernetes"
  24. "k8s.io/client-go/tools/cache"
  25. "github.com/porter-dev/porter/internal/config"
  26. )
  27. // Agent is a Kubernetes agent for performing operations that interact with the
  28. // api server
  29. type Agent struct {
  30. RESTClientGetter genericclioptions.RESTClientGetter
  31. Clientset kubernetes.Interface
  32. }
  33. type Message struct {
  34. EventType string
  35. Object interface{}
  36. Kind string
  37. }
  38. type ListOptions struct {
  39. FieldSelector string
  40. }
  41. // ListNamespaces simply lists namespaces
  42. func (a *Agent) ListNamespaces() (*v1.NamespaceList, error) {
  43. return a.Clientset.CoreV1().Namespaces().List(
  44. context.TODO(),
  45. metav1.ListOptions{},
  46. )
  47. }
  48. // GetIngress gets ingress given the name and namespace
  49. func (a *Agent) GetIngress(namespace string, name string) (*v1beta1.Ingress, error) {
  50. return a.Clientset.ExtensionsV1beta1().Ingresses(namespace).Get(
  51. context.TODO(),
  52. name,
  53. metav1.GetOptions{},
  54. )
  55. }
  56. // GetDeployment gets the deployment given the name and namespace
  57. func (a *Agent) GetDeployment(c grapher.Object) (*appsv1.Deployment, error) {
  58. return a.Clientset.AppsV1().Deployments(c.Namespace).Get(
  59. context.TODO(),
  60. c.Name,
  61. metav1.GetOptions{},
  62. )
  63. }
  64. // GetStatefulSet gets the statefulset given the name and namespace
  65. func (a *Agent) GetStatefulSet(c grapher.Object) (*appsv1.StatefulSet, error) {
  66. return a.Clientset.AppsV1().StatefulSets(c.Namespace).Get(
  67. context.TODO(),
  68. c.Name,
  69. metav1.GetOptions{},
  70. )
  71. }
  72. // GetReplicaSet gets the replicaset given the name and namespace
  73. func (a *Agent) GetReplicaSet(c grapher.Object) (*appsv1.ReplicaSet, error) {
  74. return a.Clientset.AppsV1().ReplicaSets(c.Namespace).Get(
  75. context.TODO(),
  76. c.Name,
  77. metav1.GetOptions{},
  78. )
  79. }
  80. // GetDaemonSet gets the daemonset by name and namespace
  81. func (a *Agent) GetDaemonSet(c grapher.Object) (*appsv1.DaemonSet, error) {
  82. return a.Clientset.AppsV1().DaemonSets(c.Namespace).Get(
  83. context.TODO(),
  84. c.Name,
  85. metav1.GetOptions{},
  86. )
  87. }
  88. // GetPodsByLabel retrieves pods with matching labels
  89. func (a *Agent) GetPodsByLabel(selector string) (*v1.PodList, error) {
  90. // Search in all namespaces for matching pods
  91. return a.Clientset.CoreV1().Pods("").List(
  92. context.TODO(),
  93. metav1.ListOptions{
  94. LabelSelector: selector,
  95. },
  96. )
  97. }
  98. // GetPodLogs streams real-time logs from a given pod.
  99. func (a *Agent) GetPodLogs(namespace string, name string, conn *websocket.Conn) error {
  100. // follow logs
  101. tails := int64(30)
  102. podLogOpts := v1.PodLogOptions{
  103. Follow: true,
  104. TailLines: &tails,
  105. }
  106. req := a.Clientset.CoreV1().Pods(namespace).GetLogs(name, &podLogOpts)
  107. podLogs, err := req.Stream(context.TODO())
  108. if err != nil {
  109. return fmt.Errorf("Cannot open log stream for pod %s", name)
  110. }
  111. defer podLogs.Close()
  112. r := bufio.NewReader(podLogs)
  113. errorchan := make(chan error)
  114. go func() {
  115. // listens for websocket closing handshake
  116. for {
  117. if _, _, err := conn.ReadMessage(); err != nil {
  118. defer conn.Close()
  119. errorchan <- nil
  120. fmt.Println("Successfully closed log stream")
  121. return
  122. }
  123. }
  124. }()
  125. go func() {
  126. for {
  127. select {
  128. case <-errorchan:
  129. defer close(errorchan)
  130. return
  131. default:
  132. }
  133. bytes, err := r.ReadBytes('\n')
  134. if writeErr := conn.WriteMessage(websocket.TextMessage, bytes); writeErr != nil {
  135. errorchan <- writeErr
  136. return
  137. }
  138. if err != nil {
  139. if err != io.EOF {
  140. errorchan <- err
  141. return
  142. }
  143. errorchan <- nil
  144. return
  145. }
  146. }
  147. }()
  148. for {
  149. select {
  150. case err = <-errorchan:
  151. return err
  152. }
  153. }
  154. }
  155. // StreamControllerStatus streams controller status. Supports Deployment, StatefulSet, ReplicaSet, and DaemonSet
  156. // TODO: Support Jobs
  157. func (a *Agent) StreamControllerStatus(conn *websocket.Conn, kind string) error {
  158. factory := informers.NewSharedInformerFactory(
  159. a.Clientset,
  160. 0,
  161. )
  162. var informer cache.SharedInformer
  163. // Spins up an informer depending on kind. Convert to lowercase for robustness
  164. switch strings.ToLower(kind) {
  165. case "deployment":
  166. informer = factory.Apps().V1().Deployments().Informer()
  167. case "statefulset":
  168. informer = factory.Apps().V1().StatefulSets().Informer()
  169. case "replicaset":
  170. informer = factory.Apps().V1().ReplicaSets().Informer()
  171. case "daemonset":
  172. informer = factory.Apps().V1().DaemonSets().Informer()
  173. }
  174. stopper := make(chan struct{})
  175. errorchan := make(chan error)
  176. defer close(errorchan)
  177. informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
  178. UpdateFunc: func(oldObj, newObj interface{}) {
  179. msg := Message{
  180. EventType: "UPDATE",
  181. Object: newObj,
  182. Kind: strings.ToLower(kind),
  183. }
  184. if writeErr := conn.WriteJSON(msg); writeErr != nil {
  185. errorchan <- writeErr
  186. return
  187. }
  188. },
  189. })
  190. go func() {
  191. // listens for websocket closing handshake
  192. for {
  193. if _, _, err := conn.ReadMessage(); err != nil {
  194. defer conn.Close()
  195. defer close(stopper)
  196. defer fmt.Println("Successfully closed controller status stream")
  197. errorchan <- nil
  198. return
  199. }
  200. }
  201. }()
  202. go informer.Run(stopper)
  203. for {
  204. select {
  205. case err := <-errorchan:
  206. return err
  207. }
  208. }
  209. }
  210. // ProvisionECR spawns a new provisioning pod that creates an ECR instance
  211. func (a *Agent) ProvisionECR(
  212. projectID uint,
  213. awsConf *integrations.AWSIntegration,
  214. ecrName string,
  215. awsInfra *models.AWSInfra,
  216. operation provisioner.ProvisionerOperation,
  217. pgConf *config.DBConf,
  218. redisConf *config.RedisConf,
  219. ) (*batchv1.Job, error) {
  220. id := awsInfra.GetID()
  221. prov := &provisioner.Conf{
  222. ID: id,
  223. Name: fmt.Sprintf("prov-%s-%s", id, string(operation)),
  224. Kind: provisioner.ECR,
  225. Operation: operation,
  226. Redis: redisConf,
  227. Postgres: pgConf,
  228. AWS: &aws.Conf{
  229. AWSRegion: awsConf.AWSRegion,
  230. AWSAccessKeyID: string(awsConf.AWSAccessKeyID),
  231. AWSSecretAccessKey: string(awsConf.AWSSecretAccessKey),
  232. },
  233. ECR: &ecr.Conf{
  234. ECRName: ecrName,
  235. },
  236. }
  237. return a.provision(prov)
  238. }
  239. // ProvisionEKS spawns a new provisioning pod that creates an EKS instance
  240. func (a *Agent) ProvisionEKS(
  241. projectID uint,
  242. awsConf *integrations.AWSIntegration,
  243. eksName string,
  244. awsInfra *models.AWSInfra,
  245. operation provisioner.ProvisionerOperation,
  246. pgConf *config.DBConf,
  247. redisConf *config.RedisConf,
  248. ) (*batchv1.Job, error) {
  249. id := awsInfra.GetID()
  250. prov := &provisioner.Conf{
  251. ID: id,
  252. Name: fmt.Sprintf("prov-%s-%s", id, string(operation)),
  253. Kind: provisioner.EKS,
  254. Operation: operation,
  255. Redis: redisConf,
  256. Postgres: pgConf,
  257. AWS: &aws.Conf{
  258. AWSRegion: awsConf.AWSRegion,
  259. AWSAccessKeyID: string(awsConf.AWSAccessKeyID),
  260. AWSSecretAccessKey: string(awsConf.AWSSecretAccessKey),
  261. },
  262. EKS: &eks.Conf{
  263. ClusterName: eksName,
  264. },
  265. }
  266. return a.provision(prov)
  267. }
  268. // ProvisionTest spawns a new provisioning pod that tests provisioning
  269. func (a *Agent) ProvisionTest(
  270. projectID uint,
  271. operation provisioner.ProvisionerOperation,
  272. pgConf *config.DBConf,
  273. redisConf *config.RedisConf,
  274. ) (*batchv1.Job, error) {
  275. prov := &provisioner.Conf{
  276. ID: fmt.Sprintf("%s-%d", "testing", projectID),
  277. Name: fmt.Sprintf("prov-%s-%d-%s", "testing", projectID, string(operation)),
  278. Operation: operation,
  279. Kind: provisioner.Test,
  280. Redis: redisConf,
  281. Postgres: pgConf,
  282. }
  283. return a.provision(prov)
  284. }
  285. func (a *Agent) provision(
  286. prov *provisioner.Conf,
  287. ) (*batchv1.Job, error) {
  288. prov.Namespace = "default"
  289. job, err := prov.GetProvisionerJobTemplate()
  290. if err != nil {
  291. return nil, err
  292. }
  293. return a.Clientset.BatchV1().Jobs(prov.Namespace).Create(
  294. context.TODO(),
  295. job,
  296. metav1.CreateOptions{},
  297. )
  298. }