Alexander Belanger 4 лет назад
Родитель
Сommit
95475e0437

+ 54 - 0
api/server/handlers/database/update.go

@@ -0,0 +1,54 @@
+package database
+
+import (
+	"net/http"
+
+	"github.com/porter-dev/porter/api/server/handlers"
+	"github.com/porter-dev/porter/api/server/shared"
+	"github.com/porter-dev/porter/api/server/shared/apierrors"
+	"github.com/porter-dev/porter/api/server/shared/config"
+	"github.com/porter-dev/porter/api/types"
+	"github.com/porter-dev/porter/internal/models"
+)
+
+type DatabaseUpdateStatusHandler struct {
+	handlers.PorterHandlerReader
+}
+
+func NewDatabaseUpdateStatusHandler(
+	config *config.Config,
+	decoderValidator shared.RequestDecoderValidator,
+) *DatabaseUpdateStatusHandler {
+	return &DatabaseUpdateStatusHandler{
+		PorterHandlerReader: handlers.NewDefaultPorterHandler(config, decoderValidator, nil),
+	}
+}
+
+func (p *DatabaseUpdateStatusHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	// read the project from context
+	proj, _ := r.Context().Value(types.ProjectScope).(*models.Project)
+	infra, _ := r.Context().Value(types.InfraScope).(*models.Infra)
+
+	req := &types.UpdateDatabaseStatusRequest{}
+
+	if ok := p.DecodeAndValidate(w, r, req); !ok {
+		return
+	}
+
+	// read all clusters for this project
+	db, err := p.Repo().Database().ReadDatabaseByInfraID(proj.ID, infra.ID)
+
+	if err != nil {
+		p.HandleAPIError(w, r, apierrors.NewErrInternal(err))
+		return
+	}
+
+	db.Status = req.Status
+
+	db, err = p.Repo().Database().UpdateDatabase(db)
+
+	if err != nil {
+		p.HandleAPIError(w, r, apierrors.NewErrInternal(err))
+		return
+	}
+}

+ 29 - 0
api/server/router/infra.go

@@ -4,6 +4,7 @@ import (
 	"fmt"
 
 	"github.com/go-chi/chi"
+	"github.com/porter-dev/porter/api/server/handlers/database"
 	"github.com/porter-dev/porter/api/server/handlers/infra"
 	"github.com/porter-dev/porter/api/server/shared"
 	"github.com/porter-dev/porter/api/server/shared/config"
@@ -485,5 +486,33 @@ func getInfraRoutes(
 		Router:   r,
 	})
 
+	// POST /api/projects/{project_id}/infras/{infra_id}/database -> database.NewDatabaseUpdateStatusHandler
+	updateDBStatusEndpoint := factory.NewAPIEndpoint(
+		&types.APIRequestMetadata{
+			Verb:   types.APIVerbUpdate,
+			Method: types.HTTPVerbPost,
+			Path: &types.Path{
+				Parent:       basePath,
+				RelativePath: relPath + "/database",
+			},
+			Scopes: []types.PermissionScope{
+				types.UserScope,
+				types.ProjectScope,
+				types.InfraScope,
+			},
+		},
+	)
+
+	updateDBStatusHandler := database.NewDatabaseUpdateStatusHandler(
+		config,
+		factory.GetDecoderValidator(),
+	)
+
+	routes = append(routes, &Route{
+		Endpoint: updateDBStatusEndpoint,
+		Handler:  updateDBStatusHandler,
+		Router:   r,
+	})
+
 	return routes, newPath
 }

+ 4 - 0
api/types/database.go

@@ -21,3 +21,7 @@ type Database struct {
 }
 
 type ListDatabaseResponse []*Database
+
+type UpdateDatabaseStatusRequest struct {
+	Status string `json:"status" form:"required,oneof=deleting creating"`
+}

+ 7 - 0
dashboard/src/main/home/cluster-dashboard/dashboard/NodeList.tsx

@@ -20,6 +20,9 @@ const NodeList: React.FC = () => {
       {
         Header: "Node Name",
         accessor: "name",
+        Cell: ({ row }) => {
+          return <NodeName>{row.values.name}</NodeName>;
+        },
       },
       {
         Header: "Machine Type",
@@ -185,3 +188,7 @@ const StatusButton = styled.div`
       props.success ? "#405eddbb" : "#e83162"};
   }
 `;
+
+const NodeName = styled.div`
+  min-width: 250px;
+`;

+ 12 - 0
dashboard/src/main/home/cluster-dashboard/databases/DatabasesList.tsx

@@ -75,6 +75,18 @@ const DatabasesList = () => {
         }
       );
 
+      // call an endpoint for updating the database status
+      await api.updateDatabaseStatus(
+        "<token>",
+        {
+          status: "deleting",
+        },
+        {
+          project_id,
+          infra_id,
+        }
+      );
+
       setCurrentOverlay(null);
       pushQueryParams({ current_tab: "provisioner-status" });
     } catch (error) {

+ 1 - 1
dashboard/src/main/home/infrastructure/InfrastructureList.tsx

@@ -202,7 +202,7 @@ export default InfrastructureList;
 const KindContainer = styled.div`
   display: flex;
   align-items: center;
-  min-width: 200px;
+  min-width: 250px;
 `;
 
 const Kind = styled.div`

+ 13 - 0
dashboard/src/shared/api.tsx

@@ -985,6 +985,18 @@ const destroyInfra = baseApi<
   return `/api/projects/${pathParams.project_id}/infras/${pathParams.infra_id}`;
 });
 
+const updateDatabaseStatus = baseApi<
+  {
+    status: string;
+  },
+  {
+    project_id: number;
+    infra_id: number;
+  }
+>("POST", (pathParams) => {
+  return `/api/projects/${pathParams.project_id}/infras/${pathParams.infra_id}/database`;
+});
+
 const getRepoIntegrations = baseApi("GET", "/api/integrations/repo");
 
 const getRepos = baseApi<{}, { id: number }>("GET", (pathParams) => {
@@ -1583,6 +1595,7 @@ export default {
   deployTemplate,
   deployAddon,
   destroyInfra,
+  updateDatabaseStatus,
   detectBuildpack,
   getBranchContents,
   getBranches,

+ 10 - 1
docker/Dockerfile

@@ -5,12 +5,17 @@
 FROM golang:1.17-alpine as base
 WORKDIR /porter
 
-RUN apk update && apk add --no-cache gcc musl-dev git
+RUN apk update && apk add --no-cache gcc musl-dev git protoc
 
 COPY go.mod go.sum ./
 COPY /cmd ./cmd
 COPY /internal ./internal
 COPY /api ./api
+COPY /scripts ./scripts
+COPY /provisioner ./provisioner
+
+RUN go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.26
+RUN go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.1
 
 RUN --mount=type=cache,target=$GOPATH/pkg/mod \
     go mod download
@@ -21,6 +26,10 @@ FROM base AS build-go
 
 ARG version=production
 
+
+# build proto files
+RUN sh ./scripts/build/proto.sh
+
 RUN --mount=type=cache,target=/root/.cache/go-build \
     --mount=type=cache,target=$GOPATH/pkg/mod \
     go build -ldflags="-w -s -X 'main.Version=${version}'" -a -o ./bin/app ./cmd/app && \

+ 6 - 1
ee/docker/ee.Dockerfile

@@ -5,13 +5,15 @@
 FROM golang:1.17-alpine as base
 WORKDIR /porter
 
-RUN apk update && apk add --no-cache gcc musl-dev git
+RUN apk update && apk add --no-cache gcc musl-dev git protoc
 
 COPY go.mod go.sum ./
 COPY /cmd ./cmd
 COPY /internal ./internal
 COPY /api ./api
 COPY /ee ./ee
+COPY /provisioner ./provisioner
+COPY /scripts ./scripts
 
 RUN --mount=type=cache,target=$GOPATH/pkg/mod \
     go mod download
@@ -20,6 +22,9 @@ RUN --mount=type=cache,target=$GOPATH/pkg/mod \
 # --------------------
 FROM base AS build-go
 
+# build proto files
+RUN sh ./scripts/build/proto.sh
+
 ARG version=production
 
 RUN --mount=type=cache,target=/root/.cache/go-build \

+ 51 - 0
ee/docker/provisioner.Dockerfile

@@ -0,0 +1,51 @@
+# syntax=docker/dockerfile:1.1.7-experimental
+
+# Base Go environment
+# -------------------
+FROM golang:1.17-alpine as base
+WORKDIR /porter
+
+RUN apk update && apk add --no-cache gcc musl-dev git protoc
+
+COPY go.mod go.sum ./
+COPY /cmd ./cmd
+COPY /internal ./internal
+COPY /api ./api
+COPY /ee ./ee
+COPY /scripts ./scripts
+COPY /provisioner ./provisioner
+
+RUN go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.26
+RUN go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.1
+
+RUN --mount=type=cache,target=$GOPATH/pkg/mod \
+    go mod download
+
+# Go build environment
+# --------------------
+FROM base AS build-go
+
+# build proto files
+RUN sh ./scripts/build/proto.sh
+
+RUN --mount=type=cache,target=/root/.cache/go-build \
+    --mount=type=cache,target=$GOPATH/pkg/mod \
+    go build -ldflags '-w -s' -a -tags ee -o ./bin/provisioner ./cmd/provisioner
+
+# Go test environment
+# -------------------
+FROM base AS porter-test
+
+RUN --mount=type=cache,target=/root/.cache/go-build \
+    --mount=type=cache,target=$GOPATH/pkg/mod \
+    go test ./...
+
+# Deployment environment
+# ----------------------
+FROM alpine
+RUN apk update
+
+COPY --from=build-go /porter/bin/provisioner /porter/
+
+EXPOSE 8080
+CMD /porter/provisioner

+ 0 - 584
internal/redis_stream/global_stream.go

@@ -1,584 +0,0 @@
-package redis_stream
-
-import (
-	"context"
-	"encoding/base64"
-	"encoding/json"
-	"fmt"
-	"regexp"
-	"strings"
-
-	"github.com/aws/aws-sdk-go/service/ecr"
-	"github.com/porter-dev/porter/internal/analytics"
-	"github.com/porter-dev/porter/internal/kubernetes"
-	"github.com/porter-dev/porter/internal/kubernetes/envgroup"
-	"gorm.io/gorm"
-
-	redis "github.com/go-redis/redis/v8"
-
-	"github.com/porter-dev/porter/api/server/shared/config"
-	"github.com/porter-dev/porter/api/types"
-	"github.com/porter-dev/porter/internal/models"
-	"github.com/porter-dev/porter/internal/repository"
-)
-
-// GlobalStreamName is the name of the Redis stream for global operations
-const GlobalStreamName = "global"
-
-// GlobalStreamGroupName is the name of the Redis consumer group that this server
-// is a part of
-const GlobalStreamGroupName = "portersvr"
-
-// InitGlobalStream initializes the global stream if it does not exist, and the
-// global consumer group if it does not exist
-func InitGlobalStream(client *redis.Client) error {
-	// determine if the stream exists
-	x, err := client.Exists(
-		context.Background(),
-		GlobalStreamName,
-	).Result()
-
-	// if it does not exist, create group and stream
-	if x == 0 {
-		_, err := client.XGroupCreateMkStream(
-			context.Background(),
-			GlobalStreamName,
-			GlobalStreamGroupName,
-			">",
-		).Result()
-
-		return err
-	}
-
-	// otherwise, check if the group exists
-	xInfoGroups, err := client.XInfoGroups(
-		context.Background(),
-		GlobalStreamName,
-	).Result()
-
-	// if error is not NOGROUP error, return
-	if err != nil && !strings.Contains(err.Error(), "NOGROUP") {
-		return err
-	}
-
-	for _, group := range xInfoGroups {
-		// if the group exists, return with no error
-		if group.Name == GlobalStreamGroupName {
-			return nil
-		}
-	}
-
-	// if the group does not exist, create it
-	_, err = client.XGroupCreate(
-		context.Background(),
-		GlobalStreamName,
-		GlobalStreamGroupName,
-		"$",
-	).Result()
-
-	return err
-}
-
-// ResourceCRUDHandler is a handler for updates to an infra resource
-type ResourceCRUDHandler interface {
-	OnCreate(id uint) error
-}
-
-// GlobalStreamListener performs an XREADGROUP operation on a given stream and
-// updates models in the database as necessary
-func GlobalStreamListener(
-	client *redis.Client,
-	config *config.Config,
-	repo repository.Repository,
-	analyticsClient analytics.AnalyticsSegmentClient,
-	errorChan chan error,
-) {
-	for {
-		xstreams, err := client.XReadGroup(
-			context.Background(),
-			&redis.XReadGroupArgs{
-				Group:    GlobalStreamGroupName,
-				Consumer: "portersvr-0", // just static consumer for now
-				Streams:  []string{GlobalStreamName, ">"},
-				Block:    0,
-			},
-		).Result()
-
-		if err != nil {
-			errorChan <- err
-			return
-		}
-
-		// parse messages from the global stream
-		for _, msg := range xstreams[0].Messages {
-			// parse the id to identify the infra
-			kind, projID, infraID, err := models.ParseUniqueName(fmt.Sprintf("%v", msg.Values["id"]))
-
-			if fmt.Sprintf("%v", msg.Values["status"]) == "created" {
-				infra, err := repo.Infra().ReadInfra(projID, infraID)
-
-				if err != nil {
-					continue
-				}
-
-				infra.Status = types.StatusCreated
-
-				infra, err = repo.Infra().UpdateInfra(infra)
-
-				if err != nil {
-					continue
-				}
-
-				// create ECR/EKS
-				if kind == string(types.InfraECR) {
-					reg := &models.Registry{
-						ProjectID:        projID,
-						AWSIntegrationID: infra.AWSIntegrationID,
-						InfraID:          infra.ID,
-					}
-
-					// parse raw data into ECR type
-					dataString, ok := msg.Values["data"].(string)
-
-					if ok {
-						json.Unmarshal([]byte(dataString), reg)
-					}
-
-					awsInt, err := repo.AWSIntegration().ReadAWSIntegration(reg.ProjectID, reg.AWSIntegrationID)
-
-					if err != nil {
-						continue
-					}
-
-					sess, err := awsInt.GetSession()
-
-					if err != nil {
-						continue
-					}
-
-					ecrSvc := ecr.New(sess)
-
-					output, err := ecrSvc.GetAuthorizationToken(&ecr.GetAuthorizationTokenInput{})
-
-					if err != nil {
-						continue
-					}
-
-					reg.URL = *output.AuthorizationData[0].ProxyEndpoint
-
-					reg, err = repo.Registry().CreateRegistry(reg)
-
-					if err != nil {
-						continue
-					}
-
-					analyticsClient.Track(analytics.RegistryProvisioningSuccessTrack(
-						&analytics.RegistryProvisioningSuccessTrackOpts{
-							RegistryScopedTrackOpts: analytics.GetRegistryScopedTrackOpts(infra.CreatedByUserID, infra.ProjectID, reg.ID),
-							RegistryType:            infra.Kind,
-							InfraID:                 infra.ID,
-						},
-					))
-				} else if kind == string(types.InfraRDS) {
-					// parse the last applied field to get the cluster id
-					rdsRequest := &types.RDSInfraLastApplied{}
-					err := json.Unmarshal(infra.LastApplied, rdsRequest)
-
-					if err != nil {
-						continue
-					}
-
-					database := &models.Database{
-						Status: "running",
-					}
-
-					// parse raw data into ECR type
-					dataString, ok := msg.Values["data"].(string)
-
-					if ok {
-						err = json.Unmarshal([]byte(dataString), database)
-
-						if err != nil {
-						}
-					}
-
-					database.Model = gorm.Model{}
-					database.ProjectID = projID
-					database.ClusterID = rdsRequest.ClusterID
-					database.InfraID = infra.ID
-
-					database, err = repo.Database().CreateDatabase(database)
-
-					if err != nil {
-						continue
-					}
-
-					infra.DatabaseID = database.ID
-					infra, err = repo.Infra().UpdateInfra(infra)
-
-					if err != nil {
-						continue
-					}
-
-					err = createRDSEnvGroup(repo, config, infra, database, rdsRequest)
-
-					if err != nil {
-						continue
-					}
-				} else if kind == string(types.InfraEKS) {
-					cluster := &models.Cluster{
-						AuthMechanism:    models.AWS,
-						ProjectID:        projID,
-						AWSIntegrationID: infra.AWSIntegrationID,
-						InfraID:          infra.ID,
-					}
-
-					// parse raw data into ECR type
-					dataString, ok := msg.Values["data"].(string)
-
-					if ok {
-						json.Unmarshal([]byte(dataString), cluster)
-					}
-
-					re := regexp.MustCompile(`^([A-Za-z0-9+/]{4})*([A-Za-z0-9+/]{3}=|[A-Za-z0-9+/]{2}==)?$`)
-
-					// if it matches the base64 regex, decode it
-					caData := string(cluster.CertificateAuthorityData)
-					if re.MatchString(caData) {
-						decoded, err := base64.StdEncoding.DecodeString(caData)
-
-						if err != nil {
-							continue
-						}
-
-						cluster.CertificateAuthorityData = []byte(decoded)
-					}
-
-					cluster, err := repo.Cluster().CreateCluster(cluster)
-
-					if err != nil {
-						continue
-					}
-
-					analyticsClient.Track(analytics.ClusterProvisioningSuccessTrack(
-						&analytics.ClusterProvisioningSuccessTrackOpts{
-							ClusterScopedTrackOpts: analytics.GetClusterScopedTrackOpts(infra.CreatedByUserID, infra.ProjectID, cluster.ID),
-							ClusterType:            infra.Kind,
-							InfraID:                infra.ID,
-						},
-					))
-				} else if kind == string(types.InfraGCR) {
-					reg := &models.Registry{
-						ProjectID:        projID,
-						GCPIntegrationID: infra.GCPIntegrationID,
-						InfraID:          infra.ID,
-						Name:             "gcr-registry",
-					}
-
-					// parse raw data into ECR type
-					dataString, ok := msg.Values["data"].(string)
-
-					if ok {
-						json.Unmarshal([]byte(dataString), reg)
-					}
-
-					reg, err = repo.Registry().CreateRegistry(reg)
-
-					if err != nil {
-						continue
-					}
-
-					analyticsClient.Track(analytics.RegistryProvisioningSuccessTrack(
-						&analytics.RegistryProvisioningSuccessTrackOpts{
-							RegistryScopedTrackOpts: analytics.GetRegistryScopedTrackOpts(infra.CreatedByUserID, infra.ProjectID, reg.ID),
-							RegistryType:            infra.Kind,
-							InfraID:                 infra.ID,
-						},
-					))
-				} else if kind == string(types.InfraGKE) {
-					cluster := &models.Cluster{
-						AuthMechanism:    models.GCP,
-						ProjectID:        projID,
-						GCPIntegrationID: infra.GCPIntegrationID,
-						InfraID:          infra.ID,
-					}
-
-					// parse raw data into GKE type
-					dataString, ok := msg.Values["data"].(string)
-
-					if ok {
-						json.Unmarshal([]byte(dataString), cluster)
-					}
-
-					re := regexp.MustCompile(`^([A-Za-z0-9+/]{4})*([A-Za-z0-9+/]{3}=|[A-Za-z0-9+/]{2}==)?$`)
-
-					// if it matches the base64 regex, decode it
-					caData := string(cluster.CertificateAuthorityData)
-					if re.MatchString(caData) {
-						decoded, err := base64.StdEncoding.DecodeString(caData)
-
-						if err != nil {
-							continue
-						}
-
-						cluster.CertificateAuthorityData = []byte(decoded)
-					}
-
-					cluster, err := repo.Cluster().CreateCluster(cluster)
-
-					if err != nil {
-						continue
-					}
-
-					analyticsClient.Track(analytics.ClusterProvisioningSuccessTrack(
-						&analytics.ClusterProvisioningSuccessTrackOpts{
-							ClusterScopedTrackOpts: analytics.GetClusterScopedTrackOpts(infra.CreatedByUserID, infra.ProjectID, cluster.ID),
-							ClusterType:            infra.Kind,
-							InfraID:                infra.ID,
-						},
-					))
-				} else if kind == string(types.InfraDOCR) {
-					reg := &models.Registry{
-						ProjectID:       projID,
-						DOIntegrationID: infra.DOIntegrationID,
-						InfraID:         infra.ID,
-					}
-
-					// parse raw data into DOCR type
-					dataString, ok := msg.Values["data"].(string)
-
-					if ok {
-						json.Unmarshal([]byte(dataString), reg)
-					}
-
-					reg, err = repo.Registry().CreateRegistry(reg)
-
-					if err != nil {
-						continue
-					}
-
-					analyticsClient.Track(analytics.RegistryProvisioningSuccessTrack(
-						&analytics.RegistryProvisioningSuccessTrackOpts{
-							RegistryScopedTrackOpts: analytics.GetRegistryScopedTrackOpts(infra.CreatedByUserID, infra.ProjectID, reg.ID),
-							RegistryType:            infra.Kind,
-							InfraID:                 infra.ID,
-						},
-					))
-				} else if kind == string(types.InfraDOKS) {
-					cluster := &models.Cluster{
-						AuthMechanism:   models.DO,
-						ProjectID:       projID,
-						DOIntegrationID: infra.DOIntegrationID,
-						InfraID:         infra.ID,
-					}
-
-					// parse raw data into GKE type
-					dataString, ok := msg.Values["data"].(string)
-
-					if ok {
-						json.Unmarshal([]byte(dataString), cluster)
-					}
-
-					re := regexp.MustCompile(`^([A-Za-z0-9+/]{4})*([A-Za-z0-9+/]{3}=|[A-Za-z0-9+/]{2}==)?$`)
-
-					// if it matches the base64 regex, decode it
-					caData := string(cluster.CertificateAuthorityData)
-					if re.MatchString(caData) {
-						decoded, err := base64.StdEncoding.DecodeString(caData)
-
-						if err != nil {
-							continue
-						}
-
-						cluster.CertificateAuthorityData = []byte(decoded)
-					}
-
-					cluster, err := repo.Cluster().CreateCluster(cluster)
-
-					if err != nil {
-						continue
-					}
-
-					analyticsClient.Track(analytics.ClusterProvisioningSuccessTrack(
-						&analytics.ClusterProvisioningSuccessTrackOpts{
-							ClusterScopedTrackOpts: analytics.GetClusterScopedTrackOpts(infra.CreatedByUserID, infra.ProjectID, cluster.ID),
-							ClusterType:            infra.Kind,
-							InfraID:                infra.ID,
-						},
-					))
-				}
-			} else if fmt.Sprintf("%v", msg.Values["status"]) == "error" {
-				infra, err := repo.Infra().ReadInfra(projID, infraID)
-
-				if err != nil {
-					continue
-				}
-
-				infra.Status = types.StatusError
-
-				infra, err = repo.Infra().UpdateInfra(infra)
-
-				if err != nil {
-					continue
-				}
-
-				if infra.Kind == types.InfraDOKS || infra.Kind == types.InfraGKE || infra.Kind == types.InfraEKS {
-					analyticsClient.Track(analytics.ClusterProvisioningErrorTrack(
-						&analytics.ClusterProvisioningErrorTrackOpts{
-							ProjectScopedTrackOpts: analytics.GetProjectScopedTrackOpts(infra.CreatedByUserID, infra.ProjectID),
-							ClusterType:            infra.Kind,
-							InfraID:                infra.ID,
-						},
-					))
-				} else if infra.Kind == types.InfraDOCR || infra.Kind == types.InfraGCR || infra.Kind == types.InfraECR {
-					analyticsClient.Track(analytics.RegistryProvisioningErrorTrack(
-						&analytics.RegistryProvisioningErrorTrackOpts{
-							ProjectScopedTrackOpts: analytics.GetProjectScopedTrackOpts(infra.CreatedByUserID, infra.ProjectID),
-							RegistryType:           infra.Kind,
-							InfraID:                infra.ID,
-						},
-					))
-				}
-			} else if fmt.Sprintf("%v", msg.Values["status"]) == "destroyed" {
-				infra, err := repo.Infra().ReadInfra(projID, infraID)
-
-				if err != nil {
-					continue
-				}
-
-				infra.Status = types.StatusDestroyed
-
-				infra, err = repo.Infra().UpdateInfra(infra)
-
-				if err != nil {
-					continue
-				}
-
-				if infra.Kind == types.InfraDOKS || infra.Kind == types.InfraGKE || infra.Kind == types.InfraEKS {
-					analyticsClient.Track(analytics.ClusterDestroyingSuccessTrack(
-						&analytics.ClusterDestroyingSuccessTrackOpts{
-							ClusterScopedTrackOpts: analytics.GetClusterScopedTrackOpts(infra.CreatedByUserID, infra.ProjectID, 0),
-							ClusterType:            infra.Kind,
-							InfraID:                infra.ID,
-						},
-					))
-				} else if infra.Kind == types.InfraRDS && infra.DatabaseID != 0 {
-					rdsRequest := &types.RDSInfraLastApplied{}
-					err := json.Unmarshal(infra.LastApplied, rdsRequest)
-
-					if err != nil {
-						continue
-					}
-
-					database, err := repo.Database().ReadDatabase(infra.ProjectID, rdsRequest.ClusterID, infra.DatabaseID)
-
-					if err != nil {
-						continue
-					}
-
-					err = deleteRDSEnvGroup(repo, config, infra, database, rdsRequest)
-
-					if err != nil {
-						continue
-					}
-
-					// delete the database
-					err = repo.Database().DeleteDatabase(infra.ProjectID, rdsRequest.ClusterID, infra.DatabaseID)
-
-					if err != nil {
-						continue
-					}
-				}
-			}
-
-			// acknowledge the message as read
-			_, err = client.XAck(
-				context.Background(),
-				GlobalStreamName,
-				GlobalStreamGroupName,
-				msg.ID,
-			).Result()
-
-			// if error, continue for now
-			if err != nil {
-				continue
-			}
-		}
-	}
-}
-
-func createRDSEnvGroup(repo repository.Repository, config *config.Config, infra *models.Infra, database *models.Database, rdsConfig *types.RDSInfraLastApplied) error {
-
-	cluster, err := repo.Cluster().ReadCluster(infra.ProjectID, rdsConfig.ClusterID)
-
-	if err != nil {
-		return err
-	}
-
-	ooc := &kubernetes.OutOfClusterConfig{
-		Repo:              config.Repo,
-		DigitalOceanOAuth: config.DOConf,
-		Cluster:           cluster,
-	}
-
-	agent, err := kubernetes.GetAgentOutOfClusterConfig(ooc)
-
-	if err != nil {
-		return fmt.Errorf("failed to get agent: %s", err.Error())
-	}
-
-	// split the instance endpoint on the port
-	port := "5432"
-	host := database.InstanceEndpoint
-
-	if strArr := strings.Split(database.InstanceEndpoint, ":"); len(strArr) == 2 {
-		host = strArr[0]
-		port = strArr[1]
-	}
-
-	_, err = envgroup.CreateEnvGroup(agent, types.ConfigMapInput{
-		Name:      fmt.Sprintf("rds-credentials-%s", rdsConfig.DBName),
-		Namespace: rdsConfig.Namespace,
-		Variables: map[string]string{},
-		SecretVariables: map[string]string{
-			"PGPORT":     port,
-			"PGHOST":     host,
-			"PGPASSWORD": rdsConfig.Password,
-			"PGUSER":     rdsConfig.Username,
-		},
-	})
-
-	if err != nil {
-		return fmt.Errorf("failed to create RDS env group: %s", err.Error())
-	}
-
-	return nil
-}
-
-func deleteRDSEnvGroup(repo repository.Repository, config *config.Config, infra *models.Infra, database *models.Database, rdsConfig *types.RDSInfraLastApplied) error {
-	cluster, err := repo.Cluster().ReadCluster(infra.ProjectID, rdsConfig.ClusterID)
-
-	if err != nil {
-		return err
-	}
-
-	ooc := &kubernetes.OutOfClusterConfig{
-		Repo:              config.Repo,
-		DigitalOceanOAuth: config.DOConf,
-		Cluster:           cluster,
-	}
-
-	agent, err := kubernetes.GetAgentOutOfClusterConfig(ooc)
-
-	if err != nil {
-		return fmt.Errorf("failed to get agent: %s", err.Error())
-	}
-
-	err = envgroup.DeleteEnvGroup(agent, fmt.Sprintf("rds-credentials-%s", rdsConfig.DBName), rdsConfig.Namespace)
-
-	if err != nil {
-		return fmt.Errorf("failed to create RDS env group: %s", err.Error())
-	}
-
-	return nil
-}

+ 0 - 69
internal/redis_stream/resource_stream.go

@@ -1,69 +0,0 @@
-package redis_stream
-
-import (
-	"context"
-
-	redis "github.com/go-redis/redis/v8"
-	"github.com/porter-dev/porter/api/server/shared/websocket"
-)
-
-// ResourceStream performs an XREAD operation on the given stream and outputs it to the given websocket conn.
-func ResourceStream(client *redis.Client, streamName string, rw *websocket.WebsocketSafeReadWriter) error {
-	errorchan := make(chan error)
-
-	go func() {
-		defer func() {
-			if r := recover(); r != nil {
-				// TODO: add method to alert on panic
-				return
-			}
-		}()
-
-		// listens for websocket closing handshake
-		for {
-			if _, _, err := rw.ReadMessage(); err != nil {
-				errorchan <- nil
-				return
-			}
-		}
-	}()
-
-	go func() {
-		defer func() {
-			if r := recover(); r != nil {
-				// TODO: add method to alert on panic
-				return
-			}
-		}()
-
-		lastID := "0-0"
-
-		for {
-			xstream, err := client.XRead(
-				context.Background(),
-				&redis.XReadArgs{
-					Streams: []string{streamName, lastID},
-					Block:   0,
-				},
-			).Result()
-
-			if err != nil {
-				return
-			}
-
-			messages := xstream[0].Messages
-			lastID = messages[len(messages)-1].ID
-
-			rw.WriteJSON(messages)
-		}
-	}()
-
-	for {
-		select {
-		case err := <-errorchan:
-			close(errorchan)
-			client.Close()
-			return err
-		}
-	}
-}

+ 20 - 23
provisioner/server/config/config.go

@@ -2,6 +2,7 @@ package config
 
 import (
 	"fmt"
+	"log"
 	"time"
 
 	redis "github.com/go-redis/redis/v8"
@@ -9,7 +10,6 @@ import (
 	"github.com/joeshaw/envdecode"
 	"github.com/porter-dev/porter/api/server/shared/apierrors/alerter"
 	"github.com/porter-dev/porter/api/server/shared/config/env"
-	"github.com/porter-dev/porter/ee/integrations/vault"
 	"github.com/porter-dev/porter/internal/adapter"
 	"github.com/porter-dev/porter/internal/kubernetes"
 	klocal "github.com/porter-dev/porter/internal/kubernetes/local"
@@ -28,6 +28,23 @@ import (
 	_gorm "gorm.io/gorm"
 )
 
+var InstanceCredentialBackend credentials.CredentialStorage
+var InstanceEnvConf *EnvConf
+
+func sharedInit() {
+	var envDecoderConf EnvDecoderConf = EnvDecoderConf{}
+
+	if err := envdecode.StrictDecode(&envDecoderConf); err != nil {
+		log.Fatalf("Failed to decode server conf: %s", err)
+	}
+
+	InstanceEnvConf = &EnvConf{
+		ProvisionerConf: &envDecoderConf.ProvisionerConf,
+		DBConf:          &envDecoderConf.DBConf,
+		RedisConf:       envDecoderConf.RedisConf,
+	}
+}
+
 type Config struct {
 	ProvisionerConf *ProvisionerConf
 	DBConf          *env.DBConf
@@ -101,17 +118,7 @@ type EnvDecoderConf struct {
 
 // FromEnv generates a configuration from environment variables
 func FromEnv() (*EnvConf, error) {
-	var envDecoderConf EnvDecoderConf = EnvDecoderConf{}
-
-	if err := envdecode.StrictDecode(&envDecoderConf); err != nil {
-		return nil, fmt.Errorf("Failed to decode server conf: %s", err)
-	}
-
-	return &EnvConf{
-		ProvisionerConf: &envDecoderConf.ProvisionerConf,
-		DBConf:          &envDecoderConf.DBConf,
-		RedisConf:       envDecoderConf.RedisConf,
-	}, nil
+	return InstanceEnvConf, nil
 }
 
 func GetConfig(envConf *EnvConf) (*Config, error) {
@@ -136,17 +143,7 @@ func GetConfig(envConf *EnvConf) (*Config, error) {
 		key[i] = b
 	}
 
-	var credBackend credentials.CredentialStorage
-
-	if envConf.DBConf.VaultAPIKey != "" && envConf.DBConf.VaultServerURL != "" && envConf.DBConf.VaultPrefix != "" {
-		credBackend = vault.NewClient(
-			envConf.DBConf.VaultServerURL,
-			envConf.DBConf.VaultAPIKey,
-			envConf.DBConf.VaultPrefix,
-		)
-	}
-
-	res.Repo = gorm.NewRepository(db, &key, credBackend)
+	res.Repo = gorm.NewRepository(db, &key, InstanceCredentialBackend)
 
 	if envConf.ProvisionerConf.SentryDSN != "" {
 		res.Alerter, err = alerter.NewSentryAlerter(envConf.ProvisionerConf.SentryDSN, envConf.ProvisionerConf.SentryEnv)

+ 7 - 0
provisioner/server/config/init_ce.go

@@ -0,0 +1,7 @@
+// +build !ee
+
+package config
+
+func init() {
+	sharedInit()
+}

+ 25 - 0
provisioner/server/config/init_ee.go

@@ -0,0 +1,25 @@
+// +build ee
+
+package config
+
+import (
+	"github.com/porter-dev/porter/ee/integrations/vault"
+)
+
+func init() {
+	sharedInit()
+
+	var key [32]byte
+
+	for i, b := range []byte(InstanceEnvConf.DBConf.EncryptionKey) {
+		key[i] = b
+	}
+
+	if InstanceEnvConf.DBConf.VaultAPIKey != "" && InstanceEnvConf.DBConf.VaultServerURL != "" && InstanceEnvConf.DBConf.VaultPrefix != "" {
+		InstanceCredentialBackend = vault.NewClient(
+			InstanceEnvConf.DBConf.VaultServerURL,
+			InstanceEnvConf.DBConf.VaultAPIKey,
+			InstanceEnvConf.DBConf.VaultPrefix,
+		)
+	}
+}

+ 24 - 0
provisioner/server/handlers/credentials/get_credentials_ce.go

@@ -0,0 +1,24 @@
+// +build !ee
+
+package credentials
+
+import (
+	"net/http"
+
+	"github.com/porter-dev/porter/api/server/handlers"
+	"github.com/porter-dev/porter/api/server/shared"
+	"github.com/porter-dev/porter/api/server/shared/config"
+)
+
+type GetCredentialsHandler struct {
+	handlers.PorterHandlerReader
+	handlers.Unavailable
+}
+
+func NewGetCredentialsHandler(
+	config *config.Config,
+	decoderValidator shared.RequestDecoderValidator,
+	writer shared.ResultWriter,
+) http.Handler {
+	return handlers.NewUnavailable(config, "get_credential")
+}

+ 2 - 0
provisioner/server/handlers/credentials/get_credentials.go → provisioner/server/handlers/credentials/get_credentials_ee.go

@@ -1,3 +1,5 @@
+// +build ee
+
 package credentials
 
 import (