stream_form.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. package release
  2. import (
  3. "encoding/json"
  4. "net/http"
  5. "strings"
  6. "github.com/porter-dev/porter/api/server/authz"
  7. "github.com/porter-dev/porter/api/server/handlers"
  8. "github.com/porter-dev/porter/api/server/shared"
  9. "github.com/porter-dev/porter/api/server/shared/apierrors"
  10. "github.com/porter-dev/porter/api/server/shared/config"
  11. "github.com/porter-dev/porter/api/server/shared/websocket"
  12. "github.com/porter-dev/porter/api/types"
  13. "github.com/porter-dev/porter/internal/models"
  14. "github.com/porter-dev/porter/internal/templater/parser"
  15. "helm.sh/helm/v3/pkg/release"
  16. )
  17. type StreamFormHandler struct {
  18. handlers.PorterHandlerReadWriter
  19. authz.KubernetesAgentGetter
  20. }
  21. func NewStreamFormHandler(
  22. config *config.Config,
  23. decoderValidator shared.RequestDecoderValidator,
  24. writer shared.ResultWriter,
  25. ) *StreamFormHandler {
  26. return &StreamFormHandler{
  27. PorterHandlerReadWriter: handlers.NewDefaultPorterHandler(config, decoderValidator, writer),
  28. KubernetesAgentGetter: authz.NewOutOfClusterAgentGetter(config),
  29. }
  30. }
  31. func getStreamWriter(rw *websocket.WebsocketSafeReadWriter) func(val map[string]interface{}) error {
  32. return func(val map[string]interface{}) error {
  33. // parse value into json
  34. bytes, err := json.Marshal(val)
  35. if err != nil {
  36. return err
  37. }
  38. _, err = rw.Write(bytes)
  39. return err
  40. }
  41. }
  42. func (c *StreamFormHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  43. helmRelease, _ := r.Context().Value(types.ReleaseScope).(*release.Release)
  44. cluster, _ := r.Context().Value(types.ClusterScope).(*models.Cluster)
  45. safeRW := r.Context().Value(types.RequestCtxWebsocketKey).(*websocket.WebsocketSafeReadWriter)
  46. request := &types.StreamCRDRequest{}
  47. if ok := c.DecodeAndValidate(w, r, request); !ok {
  48. return
  49. }
  50. // look for the form using the dynamic client
  51. dynClient, err := c.GetDynamicClient(r, cluster)
  52. if err != nil {
  53. c.HandleAPIError(w, r, apierrors.NewErrInternal(err))
  54. return
  55. }
  56. parserDef := &parser.ClientConfigDefault{
  57. DynamicClient: dynClient,
  58. HelmChart: helmRelease.Chart,
  59. HelmRelease: helmRelease,
  60. }
  61. var formData []byte
  62. for _, file := range helmRelease.Chart.Files {
  63. if strings.Contains(file.Name, "form.yaml") {
  64. formData = file.Data
  65. break
  66. }
  67. }
  68. // if form data isn't found, look for common charts
  69. if formData == nil {
  70. // for now just case by name
  71. if helmRelease.Chart.Name() == "cert-manager" {
  72. formData = []byte(certManagerForm)
  73. }
  74. }
  75. stopper := make(chan struct{})
  76. errorchan := make(chan error)
  77. defer close(stopper)
  78. go func() {
  79. // listens for websocket closing handshake
  80. for {
  81. if _, _, err := safeRW.ReadMessage(); err != nil {
  82. errorchan <- nil
  83. return
  84. }
  85. }
  86. }()
  87. onData := getStreamWriter(safeRW)
  88. err = parser.FormStreamer(parserDef, formData, "", &types.FormContext{
  89. Type: "cluster",
  90. Config: map[string]string{
  91. "group": request.Group,
  92. "resource": request.Resource,
  93. "version": request.Version,
  94. },
  95. }, onData, stopper)
  96. if err != nil {
  97. c.HandleAPIError(w, r, apierrors.NewErrInternal(err))
  98. return
  99. }
  100. for {
  101. select {
  102. case err := <-errorchan:
  103. c.HandleAPIError(w, r, apierrors.NewErrInternal(err))
  104. return
  105. }
  106. }
  107. }