agent.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. package kubernetes
  2. import (
  3. "bufio"
  4. "context"
  5. "fmt"
  6. "io"
  7. "strings"
  8. "github.com/gorilla/websocket"
  9. "github.com/porter-dev/porter/internal/helm/grapher"
  10. appsv1 "k8s.io/api/apps/v1"
  11. v1 "k8s.io/api/core/v1"
  12. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  13. "k8s.io/cli-runtime/pkg/genericclioptions"
  14. "k8s.io/client-go/informers"
  15. "k8s.io/client-go/kubernetes"
  16. "k8s.io/client-go/tools/cache"
  17. )
  18. // Agent is a Kubernetes agent for performing operations that interact with the
  19. // api server
  20. type Agent struct {
  21. RESTClientGetter genericclioptions.RESTClientGetter
  22. Clientset kubernetes.Interface
  23. }
  24. type Message struct {
  25. EventType string
  26. Object interface{}
  27. Kind string
  28. }
  29. type ListOptions struct {
  30. FieldSelector string
  31. }
  32. // ListNamespaces simply lists namespaces
  33. func (a *Agent) ListNamespaces() (*v1.NamespaceList, error) {
  34. return a.Clientset.CoreV1().Namespaces().List(
  35. context.TODO(),
  36. metav1.ListOptions{},
  37. )
  38. }
  39. // GetDeployment gets the depployment given the name and namespace
  40. func (a *Agent) GetDeployment(c grapher.Object) (*appsv1.Deployment, error) {
  41. return a.Clientset.AppsV1().Deployments(c.Namespace).Get(
  42. context.TODO(),
  43. c.Name,
  44. metav1.GetOptions{},
  45. )
  46. }
  47. // GetStatefulSet gets the statefulset given the name and namespace
  48. func (a *Agent) GetStatefulSet(c grapher.Object) (*appsv1.StatefulSet, error) {
  49. return a.Clientset.AppsV1().StatefulSets(c.Namespace).Get(
  50. context.TODO(),
  51. c.Name,
  52. metav1.GetOptions{},
  53. )
  54. }
  55. // GetReplicaSet gets the replicaset given the name and namespace
  56. func (a *Agent) GetReplicaSet(c grapher.Object) (*appsv1.ReplicaSet, error) {
  57. return a.Clientset.AppsV1().ReplicaSets(c.Namespace).Get(
  58. context.TODO(),
  59. c.Name,
  60. metav1.GetOptions{},
  61. )
  62. }
  63. // GetDaemonSet gets the daemonset by name and namespace
  64. func (a *Agent) GetDaemonSet(c grapher.Object) (*appsv1.DaemonSet, error) {
  65. return a.Clientset.AppsV1().DaemonSets(c.Namespace).Get(
  66. context.TODO(),
  67. c.Name,
  68. metav1.GetOptions{},
  69. )
  70. }
  71. // GetPodsByLabel retrieves pods with matching labels
  72. func (a *Agent) GetPodsByLabel(selector string) (*v1.PodList, error) {
  73. // Search in all namespaces for matching pods
  74. return a.Clientset.CoreV1().Pods("").List(
  75. context.TODO(),
  76. metav1.ListOptions{
  77. LabelSelector: selector,
  78. },
  79. )
  80. }
  81. // GetPodLogs streams real-time logs from a given pod.
  82. func (a *Agent) GetPodLogs(namespace string, name string, conn *websocket.Conn) error {
  83. // follow logs
  84. tails := int64(30)
  85. podLogOpts := v1.PodLogOptions{
  86. Follow: true,
  87. TailLines: &tails,
  88. }
  89. req := a.Clientset.CoreV1().Pods(namespace).GetLogs(name, &podLogOpts)
  90. podLogs, err := req.Stream(context.TODO())
  91. if err != nil {
  92. return fmt.Errorf("Cannot open log stream for pod %s", name)
  93. }
  94. defer podLogs.Close()
  95. r := bufio.NewReader(podLogs)
  96. errorchan := make(chan error)
  97. go func() {
  98. // listens for websocket closing handshake
  99. for {
  100. if _, _, err := conn.ReadMessage(); err != nil {
  101. conn.Close()
  102. errorchan <- nil
  103. fmt.Println("Successfully closed log stream")
  104. return
  105. }
  106. }
  107. }()
  108. go func() {
  109. for {
  110. select {
  111. case <-errorchan:
  112. defer close(errorchan)
  113. return
  114. default:
  115. }
  116. bytes, err := r.ReadBytes('\n')
  117. if writeErr := conn.WriteMessage(websocket.TextMessage, bytes); writeErr != nil {
  118. errorchan <- writeErr
  119. return
  120. }
  121. if err != nil {
  122. if err != io.EOF {
  123. errorchan <- err
  124. return
  125. }
  126. errorchan <- nil
  127. return
  128. }
  129. }
  130. }()
  131. for {
  132. select {
  133. case err = <-errorchan:
  134. return err
  135. }
  136. }
  137. }
  138. // StreamControllerStatus streams controller status. Supports Deployment, StatefulSet, ReplicaSet, and DaemonSet
  139. // TODO: Support Jobs
  140. func (a *Agent) StreamControllerStatus(conn *websocket.Conn, kind string) error {
  141. factory := informers.NewSharedInformerFactory(
  142. a.Clientset,
  143. 10,
  144. )
  145. var informer cache.SharedInformer
  146. // Spins up an informer depending on kind. Convert to lowercase for robustness
  147. switch strings.ToLower(kind) {
  148. case "deployment":
  149. informer = factory.Apps().V1().Deployments().Informer()
  150. case "statefulset":
  151. informer = factory.Apps().V1().StatefulSets().Informer()
  152. case "replicaset":
  153. informer = factory.Apps().V1().ReplicaSets().Informer()
  154. case "daemonset":
  155. informer = factory.Apps().V1().DaemonSets().Informer()
  156. }
  157. stopper := make(chan struct{})
  158. errorchan := make(chan error)
  159. defer close(errorchan)
  160. informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
  161. UpdateFunc: func(oldObj, newObj interface{}) {
  162. msg := Message{
  163. EventType: "UPDATE",
  164. Object: newObj,
  165. Kind: strings.ToLower(kind),
  166. }
  167. if writeErr := conn.WriteJSON(msg); writeErr != nil {
  168. errorchan <- writeErr
  169. return
  170. }
  171. },
  172. })
  173. go func() {
  174. // listens for websocket closing handshake
  175. for {
  176. if _, _, err := conn.ReadMessage(); err != nil {
  177. defer conn.Close()
  178. defer close(stopper)
  179. defer fmt.Println("Successfully closed controller status stream")
  180. errorchan <- nil
  181. return
  182. }
  183. }
  184. }()
  185. go informer.Run(stopper)
  186. for {
  187. select {
  188. case err := <-errorchan:
  189. return err
  190. }
  191. }
  192. }