k8s_handler.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. package api
  2. import (
  3. "encoding/json"
  4. "net/http"
  5. "net/url"
  6. "time"
  7. "github.com/go-chi/chi"
  8. "github.com/porter-dev/porter/internal/kubernetes"
  9. "github.com/porter-dev/porter/internal/models"
  10. v1 "k8s.io/api/core/v1"
  11. "github.com/gorilla/websocket"
  12. "github.com/porter-dev/porter/internal/forms"
  13. )
  14. // Enumeration of k8s API error codes, represented as int64
  15. const (
  16. ErrK8sDecode ErrorCode = iota + 600
  17. ErrK8sValidate
  18. )
  19. var upgrader = websocket.Upgrader{
  20. ReadBufferSize: 1024,
  21. WriteBufferSize: 1024,
  22. }
  23. // HandleListNamespaces retrieves a list of namespaces
  24. func (app *App) HandleListNamespaces(w http.ResponseWriter, r *http.Request) {
  25. vals, err := url.ParseQuery(r.URL.RawQuery)
  26. if err != nil {
  27. app.handleErrorFormDecoding(err, ErrReleaseDecode, w)
  28. return
  29. }
  30. // get the filter options
  31. form := &forms.K8sForm{
  32. OutOfClusterConfig: &kubernetes.OutOfClusterConfig{
  33. UpdateTokenCache: app.updateTokenCache,
  34. },
  35. }
  36. form.PopulateK8sOptionsFromQueryParams(vals, app.repo.ServiceAccount)
  37. // validate the form
  38. if err := app.validator.Struct(form); err != nil {
  39. app.handleErrorFormValidation(err, ErrK8sValidate, w)
  40. return
  41. }
  42. // create a new agent
  43. var agent *kubernetes.Agent
  44. if app.testing {
  45. agent = app.TestAgents.K8sAgent
  46. } else {
  47. agent, err = kubernetes.GetAgentOutOfClusterConfig(form.OutOfClusterConfig)
  48. }
  49. namespaces, err := agent.ListNamespaces()
  50. if err != nil {
  51. app.handleErrorDataRead(err, w)
  52. return
  53. }
  54. if err := json.NewEncoder(w).Encode(namespaces); err != nil {
  55. app.handleErrorFormDecoding(err, ErrK8sDecode, w)
  56. return
  57. }
  58. }
  59. func (app *App) updateTokenCache(token string, expiry time.Time) error {
  60. _, err := app.repo.ServiceAccount.UpdateServiceAccountTokenCache(
  61. &models.TokenCache{
  62. Token: []byte(token),
  63. Expiry: expiry,
  64. },
  65. )
  66. return err
  67. }
  68. // HandleGetPodLogs returns real-time logs of the pod via websockets
  69. // TODO: Refactor repeated calls.
  70. func (app *App) HandleGetPodLogs(w http.ResponseWriter, r *http.Request) {
  71. // get session to retrieve correct kubeconfig
  72. _, err := app.store.Get(r, app.cookieName)
  73. // get path parameters
  74. namespace := chi.URLParam(r, "namespace")
  75. podName := chi.URLParam(r, "name")
  76. if err != nil {
  77. app.handleErrorFormDecoding(err, ErrReleaseDecode, w)
  78. return
  79. }
  80. vals, err := url.ParseQuery(r.URL.RawQuery)
  81. if err != nil {
  82. app.handleErrorFormDecoding(err, ErrReleaseDecode, w)
  83. return
  84. }
  85. // get the filter options
  86. form := &forms.K8sForm{
  87. OutOfClusterConfig: &kubernetes.OutOfClusterConfig{
  88. UpdateTokenCache: app.updateTokenCache,
  89. },
  90. }
  91. form.PopulateK8sOptionsFromQueryParams(vals, app.repo.ServiceAccount)
  92. // validate the form
  93. if err := app.validator.Struct(form); err != nil {
  94. app.handleErrorFormValidation(err, ErrK8sValidate, w)
  95. return
  96. }
  97. // create a new agent
  98. var agent *kubernetes.Agent
  99. if app.testing {
  100. agent = app.TestAgents.K8sAgent
  101. } else {
  102. agent, err = kubernetes.GetAgentOutOfClusterConfig(form.OutOfClusterConfig)
  103. }
  104. upgrader.CheckOrigin = func(r *http.Request) bool { return true }
  105. // upgrade to websocket.
  106. conn, err := upgrader.Upgrade(w, r, nil)
  107. if err != nil {
  108. app.handleErrorUpgradeWebsocket(err, w)
  109. }
  110. err = agent.GetPodLogs(namespace, podName, conn)
  111. if err != nil {
  112. app.handleErrorWebsocketWrite(err, w)
  113. return
  114. }
  115. }
  116. // HandleListPods returns all pods that match the given selectors
  117. // TODO: Refactor repeated calls.
  118. func (app *App) HandleListPods(w http.ResponseWriter, r *http.Request) {
  119. // get session to retrieve correct kubeconfig
  120. _, err := app.store.Get(r, app.cookieName)
  121. if err != nil {
  122. app.handleErrorFormDecoding(err, ErrReleaseDecode, w)
  123. return
  124. }
  125. vals, err := url.ParseQuery(r.URL.RawQuery)
  126. if err != nil {
  127. app.handleErrorFormDecoding(err, ErrReleaseDecode, w)
  128. return
  129. }
  130. // get the filter options
  131. form := &forms.K8sForm{
  132. OutOfClusterConfig: &kubernetes.OutOfClusterConfig{
  133. UpdateTokenCache: app.updateTokenCache,
  134. },
  135. }
  136. form.PopulateK8sOptionsFromQueryParams(vals, app.repo.ServiceAccount)
  137. // validate the form
  138. if err := app.validator.Struct(form); err != nil {
  139. app.handleErrorFormValidation(err, ErrK8sValidate, w)
  140. return
  141. }
  142. // create a new agent
  143. var agent *kubernetes.Agent
  144. if app.testing {
  145. agent = app.TestAgents.K8sAgent
  146. } else {
  147. agent, err = kubernetes.GetAgentOutOfClusterConfig(form.OutOfClusterConfig)
  148. }
  149. pods := []v1.Pod{}
  150. for _, selector := range vals["selectors"] {
  151. podsList, err := agent.GetPodsByLabel(selector)
  152. if err != nil {
  153. app.handleErrorFormValidation(err, ErrK8sValidate, w)
  154. return
  155. }
  156. for _, pod := range podsList.Items {
  157. pods = append(pods, pod)
  158. }
  159. }
  160. if err := json.NewEncoder(w).Encode(pods); err != nil {
  161. app.handleErrorFormDecoding(err, ErrK8sDecode, w)
  162. return
  163. }
  164. }
  165. // HandleStreamDeployment test calls
  166. // TODO: Refactor repeated calls.
  167. func (app *App) HandleStreamDeployment(w http.ResponseWriter, r *http.Request) {
  168. // get session to retrieve correct kubeconfig
  169. _, err := app.store.Get(r, app.cookieName)
  170. if err != nil {
  171. app.handleErrorFormDecoding(err, ErrReleaseDecode, w)
  172. return
  173. }
  174. vals, err := url.ParseQuery(r.URL.RawQuery)
  175. if err != nil {
  176. app.handleErrorFormDecoding(err, ErrReleaseDecode, w)
  177. return
  178. }
  179. // get the filter options
  180. form := &forms.K8sForm{
  181. OutOfClusterConfig: &kubernetes.OutOfClusterConfig{
  182. UpdateTokenCache: app.updateTokenCache,
  183. },
  184. }
  185. form.PopulateK8sOptionsFromQueryParams(vals, app.repo.ServiceAccount)
  186. // validate the form
  187. if err := app.validator.Struct(form); err != nil {
  188. app.handleErrorFormValidation(err, ErrK8sValidate, w)
  189. return
  190. }
  191. // create a new agent
  192. var agent *kubernetes.Agent
  193. if app.testing {
  194. agent = app.TestAgents.K8sAgent
  195. } else {
  196. agent, err = kubernetes.GetAgentOutOfClusterConfig(form.OutOfClusterConfig)
  197. }
  198. upgrader.CheckOrigin = func(r *http.Request) bool { return true }
  199. // upgrade to websocket.
  200. conn, err := upgrader.Upgrade(w, r, nil)
  201. if err != nil {
  202. app.handleErrorUpgradeWebsocket(err, w)
  203. }
  204. err = agent.StreamDeploymentStatus(conn)
  205. if err != nil {
  206. app.handleErrorWebsocketWrite(err, w)
  207. return
  208. }
  209. }