provision_handler.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. package api
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "net/http"
  7. "strconv"
  8. "github.com/go-chi/chi"
  9. redis "github.com/go-redis/redis/v8"
  10. "github.com/gorilla/websocket"
  11. "github.com/porter-dev/porter/internal/forms"
  12. "github.com/porter-dev/porter/internal/kubernetes"
  13. prov "github.com/porter-dev/porter/internal/kubernetes/provisioner"
  14. )
  15. // HandleProvisionTest will create a test resource by deploying a provisioner
  16. // container pod
  17. func (app *App) HandleProvisionTest(w http.ResponseWriter, r *http.Request) {
  18. projID, err := strconv.ParseUint(chi.URLParam(r, "project_id"), 0, 64)
  19. if err != nil || projID == 0 {
  20. app.handleErrorFormDecoding(err, ErrProjectDecode, w)
  21. return
  22. }
  23. // create a new agent
  24. agent, err := kubernetes.GetAgentInClusterConfig()
  25. if err != nil {
  26. app.handleErrorDataRead(err, w)
  27. return
  28. }
  29. _, err = agent.ProvisionTest(uint(projID))
  30. if err != nil {
  31. app.handleErrorInternal(err, w)
  32. return
  33. }
  34. w.WriteHeader(http.StatusOK)
  35. }
  36. // HandleProvisionAWSECRInfra provisions a new aws ECR instance for a project
  37. func (app *App) HandleProvisionAWSECRInfra(w http.ResponseWriter, r *http.Request) {
  38. projID, err := strconv.ParseUint(chi.URLParam(r, "project_id"), 0, 64)
  39. if err != nil || projID == 0 {
  40. app.handleErrorFormDecoding(err, ErrProjectDecode, w)
  41. return
  42. }
  43. form := &forms.CreateECRInfra{
  44. ProjectID: uint(projID),
  45. }
  46. // decode from JSON to form value
  47. if err := json.NewDecoder(r.Body).Decode(form); err != nil {
  48. app.handleErrorFormDecoding(err, ErrProjectDecode, w)
  49. return
  50. }
  51. // validate the form
  52. if err := app.validator.Struct(form); err != nil {
  53. app.handleErrorFormValidation(err, ErrProjectValidateFields, w)
  54. return
  55. }
  56. // convert the form to an aws infra instance
  57. infra, err := form.ToAWSInfra()
  58. if err != nil {
  59. app.handleErrorFormDecoding(err, ErrProjectDecode, w)
  60. return
  61. }
  62. // handle write to the database
  63. infra, err = app.Repo.AWSInfra.CreateAWSInfra(infra)
  64. if err != nil {
  65. app.handleErrorDataWrite(err, w)
  66. return
  67. }
  68. awsInt, err := app.Repo.AWSIntegration.ReadAWSIntegration(infra.AWSIntegrationID)
  69. if err != nil {
  70. app.handleErrorDataRead(err, w)
  71. return
  72. }
  73. // launch provisioning pod
  74. agent, err := kubernetes.GetAgentInClusterConfig()
  75. if err != nil {
  76. app.handleErrorDataRead(err, w)
  77. return
  78. }
  79. _, err = agent.ProvisionECR(
  80. uint(projID),
  81. awsInt,
  82. form.ECRName,
  83. )
  84. if err != nil {
  85. app.handleErrorInternal(err, w)
  86. return
  87. }
  88. app.Logger.Info().Msgf("New aws ecr infra created: %d", infra.ID)
  89. w.WriteHeader(http.StatusCreated)
  90. infraExt := infra.Externalize()
  91. if err := json.NewEncoder(w).Encode(infraExt); err != nil {
  92. app.handleErrorFormDecoding(err, ErrProjectDecode, w)
  93. return
  94. }
  95. }
  96. // HandleGetProvisioningLogs returns real-time logs of the provisioning process via websockets
  97. func (app *App) HandleGetProvisioningLogs(w http.ResponseWriter, r *http.Request) {
  98. // get path parameters
  99. kind := chi.URLParam(r, "kind")
  100. projectID := chi.URLParam(r, "project_id")
  101. infraID := chi.URLParam(r, "infra_id")
  102. streamName := fmt.Sprintf("%s-%s-%s", kind, projectID, infraID)
  103. upgrader.CheckOrigin = func(r *http.Request) bool { return true }
  104. // upgrade to websocket.
  105. conn, err := upgrader.Upgrade(w, r, nil)
  106. if err != nil {
  107. app.handleErrorUpgradeWebsocket(err, w)
  108. }
  109. err = StreamRedis(streamName, conn)
  110. if err != nil {
  111. app.handleErrorWebsocketWrite(err, w)
  112. return
  113. }
  114. }
  115. // helper functions
  116. // StreamRedis performs an XREAD operation on the given stream and outputs it to the given websocket conn.
  117. func StreamRedis(streamName string, conn *websocket.Conn) error {
  118. conf := &prov.RedisConf{
  119. Host: "redis",
  120. Port: "6379",
  121. }
  122. client, err := NewRedisClient(conf)
  123. if err != nil {
  124. return err
  125. }
  126. errorchan := make(chan error)
  127. go func() {
  128. // listens for websocket closing handshake
  129. for {
  130. _, _, err := conn.ReadMessage()
  131. if err != nil {
  132. defer conn.Close()
  133. errorchan <- err
  134. return
  135. }
  136. }
  137. }()
  138. go func() {
  139. lastID := "0-0"
  140. for {
  141. xstream, err := client.XRead(
  142. context.Background(),
  143. &redis.XReadArgs{
  144. Streams: []string{streamName, lastID},
  145. Block: 0,
  146. },
  147. ).Result()
  148. if err != nil {
  149. return
  150. }
  151. messages := xstream[0].Messages
  152. lastID = messages[len(messages)-1].ID
  153. if writeErr := conn.WriteMessage(websocket.TextMessage, []byte(fmt.Sprint(xstream))); writeErr != nil {
  154. errorchan <- writeErr
  155. return
  156. }
  157. }
  158. }()
  159. for {
  160. select {
  161. case err = <-errorchan:
  162. close(errorchan)
  163. client.Close()
  164. return err
  165. }
  166. }
  167. }
  168. // NewRedisClient returns a new redis client instance
  169. func NewRedisClient(conf *prov.RedisConf) (*redis.Client, error) {
  170. client := redis.NewClient(&redis.Options{
  171. Addr: fmt.Sprintf("%s:%s", conf.Host, conf.Port),
  172. // Username: conf.Username,
  173. // Password: conf.Password,
  174. // DB: conf.DB,
  175. })
  176. _, err := client.Ping(context.Background()).Result()
  177. return client, err
  178. }