瀏覽代碼

[POR-2218] CLI changes to support datastore connect (#4183)

Feroze Mohideen 2 年之前
父節點
當前提交
9cabdd2456

+ 29 - 0
api/client/datastore.go

@@ -0,0 +1,29 @@
+package client
+
+import (
+	"context"
+	"fmt"
+
+	"github.com/porter-dev/porter/api/types"
+)
+
+// CreateDatastoreProxy creates a proxy to connect to a datastore
+func (c *Client) CreateDatastoreProxy(
+	ctx context.Context,
+	projectID uint,
+	datastoreName string,
+	req *types.CreateDatastoreProxyRequest,
+) (*types.CreateDatastoreProxyResponse, error) {
+	resp := &types.CreateDatastoreProxyResponse{}
+
+	err := c.postRequest(
+		fmt.Sprintf(
+			"/projects/%d/datastores/%s/create-proxy",
+			projectID, datastoreName,
+		),
+		req,
+		resp,
+	)
+
+	return resp, err
+}

+ 129 - 0
api/server/handlers/datastore/create_proxy.go

@@ -0,0 +1,129 @@
+package datastore
+
+import (
+	"net/http"
+
+	"connectrpc.com/connect"
+	"github.com/google/uuid"
+	porterv1 "github.com/porter-dev/api-contracts/generated/go/porter/v1"
+	"github.com/porter-dev/porter/api/server/authz"
+	"github.com/porter-dev/porter/api/server/handlers"
+	"github.com/porter-dev/porter/api/server/shared"
+	"github.com/porter-dev/porter/api/server/shared/apierrors"
+	"github.com/porter-dev/porter/api/server/shared/config"
+	"github.com/porter-dev/porter/api/server/shared/requestutils"
+	"github.com/porter-dev/porter/api/types"
+	"github.com/porter-dev/porter/internal/models"
+	"github.com/porter-dev/porter/internal/telemetry"
+)
+
+// Credential has all information about connecting to a datastore
+type Credential struct {
+	Host         string
+	Port         int
+	Username     string
+	Password     string
+	DatabaseName string
+}
+
+// CreateDatastoreProxyResponse is the response body for the create datastore proxy endpoint
+type CreateDatastoreProxyResponse struct {
+	// PodName is the name of the pod that was created
+	PodName string `json:"pod_name"`
+	// Credential is the credential used to connect to the datastore
+	Credential Credential `json:"credential"`
+	// ClusterID is the ID of the cluster that the pod was created in
+	ClusterID uint `json:"cluster_id"`
+	// Namespace is the namespace that the pod was created in
+	Namespace string `json:"namespace"`
+	// Type is the type of datastore
+	Type string `json:"type"`
+}
+
+// CreateDatastoreProxyHandler is a handler for creating a datastore proxy pod which is used to connect to the datastore
+type CreateDatastoreProxyHandler struct {
+	handlers.PorterHandlerReadWriter
+	authz.KubernetesAgentGetter
+}
+
+// NewCreateDatastoreProxyHandler returns a CreateDatastoreProxyHandler
+func NewCreateDatastoreProxyHandler(
+	config *config.Config,
+	decoderValidator shared.RequestDecoderValidator,
+	writer shared.ResultWriter,
+) *CreateDatastoreProxyHandler {
+	return &CreateDatastoreProxyHandler{
+		PorterHandlerReadWriter: handlers.NewDefaultPorterHandler(config, decoderValidator, writer),
+		KubernetesAgentGetter:   authz.NewOutOfClusterAgentGetter(config),
+	}
+}
+
+// ServeHTTP creates a datastore proxy pod
+func (c *CreateDatastoreProxyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	ctx, span := telemetry.NewSpan(r.Context(), "serve-create-datastore-proxy")
+	defer span.End()
+
+	project, _ := ctx.Value(types.ProjectScope).(*models.Project)
+	if project.ID == 0 {
+		err := telemetry.Error(ctx, span, nil, "project not found")
+		c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusBadRequest))
+		return
+	}
+	projectId := int64(project.ID)
+
+	var resp CreateDatastoreProxyResponse
+
+	datastoreName, reqErr := requestutils.GetURLParamString(r, types.URLParamDatastoreName)
+	if reqErr != nil {
+		err := telemetry.Error(ctx, span, nil, "error parsing datastore name")
+		c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusBadRequest))
+		return
+	}
+	telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "datastore-name", Value: datastoreName})
+
+	datastoreRecord, err := c.Repo().Datastore().GetByProjectIDAndName(ctx, project.ID, datastoreName)
+	if err != nil {
+		err = telemetry.Error(ctx, span, err, "datastore record not found")
+		c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusInternalServerError))
+		return
+	}
+
+	if datastoreRecord == nil || datastoreRecord.ID == uuid.Nil {
+		err = telemetry.Error(ctx, span, nil, "datastore record does not exist")
+		c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusBadRequest))
+		return
+	}
+
+	message := porterv1.CreateDatastoreProxyRequest{
+		ProjectId:   projectId,
+		DatastoreId: datastoreRecord.ID.String(),
+	}
+	req := connect.NewRequest(&message)
+	ccpResp, err := c.Config().ClusterControlPlaneClient.CreateDatastoreProxy(ctx, req)
+	if err != nil {
+		err = telemetry.Error(ctx, span, err, "error creating datastore proxy")
+		c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusInternalServerError))
+		return
+	}
+	if ccpResp == nil || ccpResp.Msg == nil {
+		err = telemetry.Error(ctx, span, nil, "error creating datastore proxy")
+		c.HandleAPIError(w, r, apierrors.NewErrPassThroughToClient(err, http.StatusInternalServerError))
+		return
+	}
+
+	resp = CreateDatastoreProxyResponse{
+		PodName: ccpResp.Msg.PodName,
+		Credential: Credential{
+			Host:         ccpResp.Msg.Credential.Host,
+			Port:         int(ccpResp.Msg.Credential.Port),
+			Username:     ccpResp.Msg.Credential.Username,
+			Password:     ccpResp.Msg.Credential.Password,
+			DatabaseName: ccpResp.Msg.Credential.DatabaseName,
+		},
+		ClusterID: uint(ccpResp.Msg.ClusterId),
+		Namespace: ccpResp.Msg.Namespace,
+		Type:      datastoreRecord.Type,
+	}
+
+	c.WriteResult(w, r, resp)
+}

+ 17 - 12
api/server/handlers/datastore/list.go

@@ -64,6 +64,12 @@ type Datastore struct {
 
 
 	// CreatedAtUTC is the time the datastore was created in UTC
 	// CreatedAtUTC is the time the datastore was created in UTC
 	CreatedAtUTC time.Time `json:"created_at"`
 	CreatedAtUTC time.Time `json:"created_at"`
+
+	// CloudProvider is the cloud provider associated with the datastore
+	CloudProvider string `json:"cloud_provider"`
+
+	// CloudProviderCredentialIdentifier is the cloud provider credential identifier associated with the datastore
+	CloudProviderCredentialIdentifier string `json:"cloud_provider_credential_identifier"`
 }
 }
 
 
 // ListDatastoresHandler is a struct for listing all datastores for a given project
 // ListDatastoresHandler is a struct for listing all datastores for a given project
@@ -177,24 +183,23 @@ func Datastores(ctx context.Context, inp DatastoresInput) ([]Datastore, error) {
 	}
 	}
 
 
 	for _, datastore := range resp.Msg.Datastores {
 	for _, datastore := range resp.Msg.Datastores {
-		encodedDatastore := Datastore{
-			Name:     datastore.Name,
-			Metadata: datastore.Metadata,
-			Env:      datastore.Env,
-		}
-
 		datastoreRecord, err := inp.DatastoreRepository.GetByProjectIDAndName(ctx, inp.ProjectID, datastore.Name)
 		datastoreRecord, err := inp.DatastoreRepository.GetByProjectIDAndName(ctx, inp.ProjectID, datastore.Name)
 		if err != nil {
 		if err != nil {
 			telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "err-datastore-name", Value: datastore.Name})
 			telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "err-datastore-name", Value: datastore.Name})
 			return datastores, telemetry.Error(ctx, span, err, "datastore record not found")
 			return datastores, telemetry.Error(ctx, span, err, "datastore record not found")
 		}
 		}
 
 
-		encodedDatastore.CreatedAtUTC = datastoreRecord.CreatedAt
-		encodedDatastore.Type = datastoreRecord.Type
-		encodedDatastore.Engine = datastoreRecord.Engine
-		encodedDatastore.Status = string(datastoreRecord.Status)
-
-		datastores = append(datastores, encodedDatastore)
+		datastores = append(datastores, Datastore{
+			Name:                              datastore.Name,
+			Type:                              datastoreRecord.Type,
+			Engine:                            datastoreRecord.Engine,
+			CreatedAtUTC:                      datastoreRecord.CreatedAt,
+			Status:                            string(datastoreRecord.Status),
+			Metadata:                          datastore.Metadata,
+			Env:                               datastore.Env,
+			CloudProvider:                     datastoreRecord.CloudProvider,
+			CloudProviderCredentialIdentifier: datastoreRecord.CloudProviderCredentialIdentifier,
+		})
 	}
 	}
 
 
 	return datastores, nil
 	return datastores, nil

+ 29 - 0
api/server/router/project.go

@@ -527,6 +527,35 @@ func getProjectRoutes(
 		Router:   r,
 		Router:   r,
 	})
 	})
 
 
+	// POST /api/projects/{project_id}/datastores/{datastore_name}/create-proxy -> cluster.NewCreateDatastoreProxyHandler
+	createDatastoreProxyEndpoint := factory.NewAPIEndpoint(
+		&types.APIRequestMetadata{
+			Verb:   types.APIVerbUpdate,
+			Method: types.HTTPVerbPost,
+			Path: &types.Path{
+				Parent:       basePath,
+				RelativePath: fmt.Sprintf("%s/datastores/{%s}/create-proxy", relPath, types.URLParamDatastoreName),
+			},
+			Scopes: []types.PermissionScope{
+				types.UserScope,
+				types.ProjectScope,
+				types.ClusterScope,
+			},
+		},
+	)
+
+	createDatastoreProxyHandler := datastore.NewCreateDatastoreProxyHandler(
+		config,
+		factory.GetDecoderValidator(),
+		factory.GetResultWriter(),
+	)
+
+	routes = append(routes, &router.Route{
+		Endpoint: createDatastoreProxyEndpoint,
+		Handler:  createDatastoreProxyHandler,
+		Router:   r,
+	})
+
 	// DELETE /api/projects/{project_id}/datastores/{datastore_name} -> cloud_provider.NewDeleteDatastoreHandler
 	// DELETE /api/projects/{project_id}/datastores/{datastore_name} -> cloud_provider.NewDeleteDatastoreHandler
 	deleteDatastoreEndpoint := factory.NewAPIEndpoint(
 	deleteDatastoreEndpoint := factory.NewAPIEndpoint(
 		&types.APIRequestMetadata{
 		&types.APIRequestMetadata{

+ 37 - 0
api/types/datastore.go

@@ -0,0 +1,37 @@
+package types
+
+// DatastoreType represents the type of the datastore
+type DatastoreType string
+
+const (
+	// DatastoreType_RDS is the RDS datastore type
+	DatastoreType_RDS DatastoreType = "RDS"
+	// DatastoreType_ElastiCache is the ElastiCache datastore type
+	DatastoreType_ElastiCache DatastoreType = "ELASTICACHE"
+)
+
+// CreateDatastoreProxyRequest is the request body for the create datastore proxy endpoint
+type CreateDatastoreProxyRequest struct{}
+
+// CreateDatastoreProxyResponse is the response body for the create datastore proxy endpoint
+type CreateDatastoreProxyResponse struct {
+	// PodName is the name of the pod that was created
+	PodName string `json:"pod_name"`
+	// Credential is the credential used to connect to the datastore
+	Credential DatastoreCredential `json:"credential"`
+	// ClusterID is the ID of the cluster that the pod was created in
+	ClusterID uint `json:"cluster_id"`
+	// Namespace is the namespace that the pod was created in
+	Namespace string `json:"namespace"`
+	// Type is the type of datastore
+	Type string `json:"type"`
+}
+
+// DatastoreCredential has all information about connecting to a datastore
+type DatastoreCredential struct {
+	Host         string `json:"host"`
+	Port         int    `json:"port"`
+	Username     string `json:"username"`
+	Password     string `json:"password"`
+	DatabaseName string `json:"database_name"`
+}

+ 1 - 0
cli/cmd/commands/all.go

@@ -49,6 +49,7 @@ func RegisterCommands() (*cobra.Command, error) {
 	rootCmd.AddCommand(registerCommand_Update(cliConf))
 	rootCmd.AddCommand(registerCommand_Update(cliConf))
 	rootCmd.AddCommand(registerCommand_Version(cliConf))
 	rootCmd.AddCommand(registerCommand_Version(cliConf))
 	rootCmd.AddCommand(registerCommand_Env(cliConf))
 	rootCmd.AddCommand(registerCommand_Env(cliConf))
+	rootCmd.AddCommand(registerCommand_Datastore(cliConf))
 	return rootCmd, nil
 	return rootCmd, nil
 }
 }
 
 

+ 18 - 17
cli/cmd/commands/app.go

@@ -380,7 +380,7 @@ func appRun(ctx context.Context, _ *types.GetAuthenticatedUserResponse, client a
 		selectedContainerName = selectedContainer
 		selectedContainerName = selectedContainer
 	}
 	}
 
 
-	config := &AppPorterRunSharedConfig{
+	config := &KubernetesSharedConfig{
 		Client:    client,
 		Client:    client,
 		CLIConfig: cliConfig,
 		CLIConfig: cliConfig,
 	}
 	}
@@ -422,7 +422,7 @@ func getImageNameFromPod(ctx context.Context, clientset *kubernetes.Clientset, n
 }
 }
 
 
 func appCleanup(ctx context.Context, _ *types.GetAuthenticatedUserResponse, client api.Client, cliConfig config.CLIConfig, _ config.FeatureFlags, _ *cobra.Command, _ []string) error {
 func appCleanup(ctx context.Context, _ *types.GetAuthenticatedUserResponse, client api.Client, cliConfig config.CLIConfig, _ config.FeatureFlags, _ *cobra.Command, _ []string) error {
-	config := &AppPorterRunSharedConfig{
+	config := &KubernetesSharedConfig{
 		Client:    client,
 		Client:    client,
 		CLIConfig: cliConfig,
 		CLIConfig: cliConfig,
 	}
 	}
@@ -510,7 +510,8 @@ func appGetEphemeralPods(ctx context.Context, namespace string, clientset *kuber
 	return podNames, nil
 	return podNames, nil
 }
 }
 
 
-type AppPorterRunSharedConfig struct {
+// KubernetesSharedConfig allows for interacting with a kubernetes cluster
+type KubernetesSharedConfig struct {
 	Client     api.Client
 	Client     api.Client
 	RestConf   *rest.Config
 	RestConf   *rest.Config
 	Clientset  *kubernetes.Clientset
 	Clientset  *kubernetes.Clientset
@@ -518,7 +519,7 @@ type AppPorterRunSharedConfig struct {
 	CLIConfig  config.CLIConfig
 	CLIConfig  config.CLIConfig
 }
 }
 
 
-func (p *AppPorterRunSharedConfig) setSharedConfig(ctx context.Context) error {
+func (p *KubernetesSharedConfig) setSharedConfig(ctx context.Context) error {
 	pID := p.CLIConfig.Project
 	pID := p.CLIConfig.Project
 	cID := p.CLIConfig.Cluster
 	cID := p.CLIConfig.Cluster
 
 
@@ -663,7 +664,7 @@ func appGetPodsV2PorterYaml(ctx context.Context, cliConfig config.CLIConfig, cli
 	return res, namespace, containerHasLauncherStartCommand, nil
 	return res, namespace, containerHasLauncherStartCommand, nil
 }
 }
 
 
-func appExecuteRun(config *AppPorterRunSharedConfig, namespace, name, container string, args []string) error {
+func appExecuteRun(config *KubernetesSharedConfig, namespace, name, container string, args []string) error {
 	req := config.RestClient.Post().
 	req := config.RestClient.Post().
 		Resource("pods").
 		Resource("pods").
 		Name(name).
 		Name(name).
@@ -703,7 +704,7 @@ func appExecuteRun(config *AppPorterRunSharedConfig, namespace, name, container
 	})
 	})
 }
 }
 
 
-func appExecuteRunEphemeral(ctx context.Context, config *AppPorterRunSharedConfig, namespace, name, container string, args []string) error {
+func appExecuteRunEphemeral(ctx context.Context, config *KubernetesSharedConfig, namespace, name, container string, args []string) error {
 	existing, err := appGetExistingPod(ctx, config, name, namespace)
 	existing, err := appGetExistingPod(ctx, config, name, namespace)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
@@ -796,7 +797,7 @@ func appExecuteRunEphemeral(ctx context.Context, config *AppPorterRunSharedConfi
 	return err
 	return err
 }
 }
 
 
-func appCheckForPodDeletionCronJob(ctx context.Context, config *AppPorterRunSharedConfig) error {
+func appCheckForPodDeletionCronJob(ctx context.Context, config *KubernetesSharedConfig) error {
 	// try and create the cron job and all of the other required resources as necessary,
 	// try and create the cron job and all of the other required resources as necessary,
 	// starting with the service account, then role and then a role binding
 	// starting with the service account, then role and then a role binding
 
 
@@ -886,7 +887,7 @@ func appCheckForPodDeletionCronJob(ctx context.Context, config *AppPorterRunShar
 	return nil
 	return nil
 }
 }
 
 
-func appCheckForServiceAccount(ctx context.Context, config *AppPorterRunSharedConfig) error {
+func appCheckForServiceAccount(ctx context.Context, config *KubernetesSharedConfig) error {
 	namespaces, err := config.Clientset.CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
 	namespaces, err := config.Clientset.CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
 	if err != nil {
 	if err != nil {
 		return err
 		return err
@@ -935,7 +936,7 @@ func appCheckForServiceAccount(ctx context.Context, config *AppPorterRunSharedCo
 	return nil
 	return nil
 }
 }
 
 
-func appCheckForClusterRole(ctx context.Context, config *AppPorterRunSharedConfig) error {
+func appCheckForClusterRole(ctx context.Context, config *KubernetesSharedConfig) error {
 	roles, err := config.Clientset.RbacV1().ClusterRoles().List(
 	roles, err := config.Clientset.RbacV1().ClusterRoles().List(
 		ctx, metav1.ListOptions{},
 		ctx, metav1.ListOptions{},
 	)
 	)
@@ -976,7 +977,7 @@ func appCheckForClusterRole(ctx context.Context, config *AppPorterRunSharedConfi
 	return nil
 	return nil
 }
 }
 
 
-func appCheckForRoleBinding(ctx context.Context, config *AppPorterRunSharedConfig) error {
+func appCheckForRoleBinding(ctx context.Context, config *KubernetesSharedConfig) error {
 	bindings, err := config.Clientset.RbacV1().ClusterRoleBindings().List(
 	bindings, err := config.Clientset.RbacV1().ClusterRoleBindings().List(
 		ctx, metav1.ListOptions{},
 		ctx, metav1.ListOptions{},
 	)
 	)
@@ -1018,7 +1019,7 @@ func appCheckForRoleBinding(ctx context.Context, config *AppPorterRunSharedConfi
 	return nil
 	return nil
 }
 }
 
 
-func appWaitForPod(ctx context.Context, config *AppPorterRunSharedConfig, pod *v1.Pod) error {
+func appWaitForPod(ctx context.Context, config *KubernetesSharedConfig, pod *v1.Pod) error {
 	var (
 	var (
 		w   watch.Interface
 		w   watch.Interface
 		err error
 		err error
@@ -1082,7 +1083,7 @@ func appIsPodExited(pod *v1.Pod) bool {
 	return pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed
 	return pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed
 }
 }
 
 
-func appHandlePodAttachError(ctx context.Context, err error, config *AppPorterRunSharedConfig, namespace, podName, container string) error {
+func appHandlePodAttachError(ctx context.Context, err error, config *KubernetesSharedConfig, namespace, podName, container string) error {
 	if appVerbose {
 	if appVerbose {
 		color.New(color.FgYellow).Fprintf(os.Stderr, "Error: %s\n", err)
 		color.New(color.FgYellow).Fprintf(os.Stderr, "Error: %s\n", err)
 	}
 	}
@@ -1098,7 +1099,7 @@ func appHandlePodAttachError(ctx context.Context, err error, config *AppPorterRu
 	return err
 	return err
 }
 }
 
 
-func appPipePodLogsToStdout(ctx context.Context, config *AppPorterRunSharedConfig, namespace, name, container string, follow bool) (int64, error) {
+func appPipePodLogsToStdout(ctx context.Context, config *KubernetesSharedConfig, namespace, name, container string, follow bool) (int64, error) {
 	podLogOpts := v1.PodLogOptions{
 	podLogOpts := v1.PodLogOptions{
 		Container: container,
 		Container: container,
 		Follow:    follow,
 		Follow:    follow,
@@ -1118,7 +1119,7 @@ func appPipePodLogsToStdout(ctx context.Context, config *AppPorterRunSharedConfi
 	return io.Copy(os.Stdout, podLogs)
 	return io.Copy(os.Stdout, podLogs)
 }
 }
 
 
-func appPipeEventsToStdout(ctx context.Context, config *AppPorterRunSharedConfig, namespace, name, _ string, _ bool) error {
+func appPipeEventsToStdout(ctx context.Context, config *KubernetesSharedConfig, namespace, name, _ string, _ bool) error {
 	// update the config in case the operation has taken longer than token expiry time
 	// update the config in case the operation has taken longer than token expiry time
 	config.setSharedConfig(ctx) //nolint:errcheck,gosec // do not want to change logic of CLI. New linter error
 	config.setSharedConfig(ctx) //nolint:errcheck,gosec // do not want to change logic of CLI. New linter error
 
 
@@ -1140,7 +1141,7 @@ func appPipeEventsToStdout(ctx context.Context, config *AppPorterRunSharedConfig
 	return nil
 	return nil
 }
 }
 
 
-func appGetExistingPod(ctx context.Context, config *AppPorterRunSharedConfig, name, namespace string) (*v1.Pod, error) {
+func appGetExistingPod(ctx context.Context, config *KubernetesSharedConfig, name, namespace string) (*v1.Pod, error) {
 	return config.Clientset.CoreV1().Pods(namespace).Get(
 	return config.Clientset.CoreV1().Pods(namespace).Get(
 		ctx,
 		ctx,
 		name,
 		name,
@@ -1148,7 +1149,7 @@ func appGetExistingPod(ctx context.Context, config *AppPorterRunSharedConfig, na
 	)
 	)
 }
 }
 
 
-func appDeletePod(ctx context.Context, config *AppPorterRunSharedConfig, name, namespace string) error {
+func appDeletePod(ctx context.Context, config *KubernetesSharedConfig, name, namespace string) error {
 	// update the config in case the operation has taken longer than token expiry time
 	// update the config in case the operation has taken longer than token expiry time
 	config.setSharedConfig(ctx) //nolint:errcheck,gosec // do not want to change logic of CLI. New linter error
 	config.setSharedConfig(ctx) //nolint:errcheck,gosec // do not want to change logic of CLI. New linter error
 
 
@@ -1169,7 +1170,7 @@ func appDeletePod(ctx context.Context, config *AppPorterRunSharedConfig, name, n
 
 
 func appCreateEphemeralPodFromExisting(
 func appCreateEphemeralPodFromExisting(
 	ctx context.Context,
 	ctx context.Context,
-	config *AppPorterRunSharedConfig,
+	config *KubernetesSharedConfig,
 	existing *v1.Pod,
 	existing *v1.Pod,
 	container string,
 	container string,
 	args []string,
 	args []string,

+ 195 - 0
cli/cmd/commands/datastore.go

@@ -0,0 +1,195 @@
+package commands
+
+import (
+	"context"
+	"fmt"
+	"net/http"
+	"net/url"
+	"os"
+	"os/signal"
+	"time"
+
+	"github.com/briandowns/spinner"
+	"github.com/fatih/color"
+	api "github.com/porter-dev/porter/api/client"
+	"github.com/porter-dev/porter/api/types"
+	"github.com/porter-dev/porter/cli/cmd/config"
+	"github.com/spf13/cobra"
+	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+	"k8s.io/client-go/rest"
+	"k8s.io/client-go/tools/portforward"
+	"k8s.io/client-go/transport/spdy"
+)
+
+var port int
+
+const (
+	// Address_Localhost is the localhost address
+	Address_Localhost = "localhost"
+)
+
+func registerCommand_Datastore(cliConf config.CLIConfig) *cobra.Command {
+	datastoreCmd := &cobra.Command{
+		Use:   "datastore",
+		Short: "Runs a command for your datastore.",
+	}
+
+	datastoreConnectCmd := &cobra.Command{
+		Use:   "connect <DATASTORE_NAME>",
+		Short: "Forward a local port to a remote datastore.",
+		Args:  cobra.MinimumNArgs(1),
+		Run: func(cmd *cobra.Command, args []string) {
+			err := checkLoginAndRunWithConfig(cmd, cliConf, args, datastoreConnect)
+			if err != nil {
+				os.Exit(1)
+			}
+		},
+	}
+
+	datastoreConnectCmd.PersistentFlags().IntVarP(
+		&port,
+		"port",
+		"p",
+		8122,
+		"the local port to forward",
+	)
+
+	datastoreCmd.AddCommand(datastoreConnectCmd)
+
+	return datastoreCmd
+}
+
+func forwardPorts(
+	method string,
+	url *url.URL,
+	kubeConfig *rest.Config,
+	ports []string,
+	stopChan <-chan struct{},
+	readyChan chan struct{},
+) error {
+	transport, upgrader, err := spdy.RoundTripperFor(kubeConfig)
+	if err != nil {
+		return err
+	}
+
+	dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, method, url)
+	fw, err := portforward.NewOnAddresses(
+		dialer, []string{Address_Localhost}, ports, stopChan, readyChan, os.Stdout, os.Stderr)
+	if err != nil {
+		return err
+	}
+
+	return fw.ForwardPorts()
+}
+
+func datastoreConnect(ctx context.Context, _ *types.GetAuthenticatedUserResponse, client api.Client, cliConf config.CLIConfig, ff config.FeatureFlags, _ *cobra.Command, args []string) error {
+	if cliConf.Project == 0 {
+		return fmt.Errorf("project not set; please select a project with porter config set-project and try again")
+	}
+	projectId := cliConf.Project
+	datastoreName := args[0]
+	if datastoreName == "" {
+		return fmt.Errorf("no datastore name provided")
+	}
+
+	if port == 0 {
+		return fmt.Errorf("port must be provided")
+	}
+
+	s := spinner.New(spinner.CharSets[9], 100*time.Millisecond)
+	s.Color("cyan") // nolint:errcheck,gosec
+	s.Suffix = fmt.Sprintf(" Creating secure tunnel to datastore named %s in project %d...", datastoreName, projectId)
+
+	s.Start()
+	resp, err := client.CreateDatastoreProxy(ctx, projectId, datastoreName, &types.CreateDatastoreProxyRequest{})
+	if err != nil {
+		return fmt.Errorf("could not create secure tunnel: %s", err.Error())
+	}
+	s.Stop()
+
+	datastoreCredential := resp.Credential
+	cliConf.Cluster = resp.ClusterID
+	config := &KubernetesSharedConfig{
+		Client:    client,
+		CLIConfig: cliConf,
+	}
+
+	err = config.setSharedConfig(ctx)
+	if err != nil {
+		return fmt.Errorf("could not retrieve kube credentials: %s", err.Error())
+	}
+
+	proxyPod, err := config.Clientset.CoreV1().Pods(resp.Namespace).Get(
+		ctx,
+		resp.PodName,
+		metav1.GetOptions{},
+	)
+	if err != nil {
+		return fmt.Errorf("could not connect to secure tunnel: %s", err.Error())
+	}
+
+	defer appDeletePod(ctx, config, resp.PodName, resp.Namespace) //nolint:errcheck,gosec
+
+	s = spinner.New(spinner.CharSets[9], 100*time.Millisecond)
+	s.Color("green") // nolint:errcheck,gosec
+	s.Suffix = " Waiting for secure tunnel to datastore to be ready..."
+
+	s.Start()
+	if err = appWaitForPod(ctx, config, proxyPod); err != nil {
+		color.New(color.FgRed).Println("error occurred while waiting for secure tunnel to be ready") // nolint:errcheck,gosec
+		return err
+	}
+	s.Stop()
+
+	stopChannel := make(chan struct{}, 1)
+	readyChannel := make(chan struct{})
+
+	signals := make(chan os.Signal, 1)
+	signal.Notify(signals, os.Interrupt)
+	defer signal.Stop(signals)
+
+	go func() {
+		<-signals
+		if stopChannel != nil {
+			close(stopChannel)
+		}
+	}()
+
+	req := config.RestClient.Post().
+		Resource("pods").
+		Namespace(namespace).
+		Name(proxyPod.Name).
+		SubResource("portforward")
+
+	printDatastoreConnectionInformation(resp.Type, port, datastoreCredential)
+
+	color.New(color.FgGreen).Println("Starting proxy...[CTRL-C to exit]") // nolint:errcheck,gosec
+	return forwardPorts("POST", req.URL(), config.RestConf, []string{fmt.Sprintf("%d:%d", port, datastoreCredential.Port)}, stopChannel, readyChannel)
+}
+
+func printDatastoreConnectionInformation(datastoreType string, port int, credential types.DatastoreCredential) {
+	color.New(color.FgGreen).Println("Secure tunnel setup complete! While the tunnel is running, you can connect to your datastore using the following credentials:") //nolint:errcheck,gosec
+
+	fmt.Printf(" Host: 127.0.0.1\n")
+	fmt.Printf(" Port: %d\n", port)
+	if credential.DatabaseName != "" {
+		fmt.Printf(" Database name: %s\n", credential.DatabaseName)
+	}
+	if credential.Username != "" {
+		fmt.Printf(" Username: %s\n", credential.Username)
+	}
+	if credential.Password != "" {
+		fmt.Printf(" Password: %s\n", credential.Password)
+	}
+	switch datastoreType {
+	case string(types.DatastoreType_ElastiCache):
+		fmt.Println()
+		color.New(color.FgGreen).Println("For example, you can connect to your datastore using the following command:") //nolint:errcheck,gosec
+		fmt.Printf(" redis-cli -p %d -a %s --tls\n", port, credential.Password)
+	case string(types.DatastoreType_RDS):
+		fmt.Println()
+		color.New(color.FgGreen).Println("For example, you can connect to your datastore using the following command:") //nolint:errcheck,gosec
+		fmt.Printf(" PGPASSWORD=%s psql -h 127.0.0.1 -p %d -U %s -d %s\n", credential.Password, port, credential.Username, credential.DatabaseName)
+	}
+	fmt.Println()
+}

+ 1 - 1
go.mod

@@ -83,7 +83,7 @@ require (
 	github.com/matryer/is v1.4.0
 	github.com/matryer/is v1.4.0
 	github.com/nats-io/nats.go v1.24.0
 	github.com/nats-io/nats.go v1.24.0
 	github.com/open-policy-agent/opa v0.44.0
 	github.com/open-policy-agent/opa v0.44.0
-	github.com/porter-dev/api-contracts v0.2.90
+	github.com/porter-dev/api-contracts v0.2.91
 	github.com/riandyrn/otelchi v0.5.1
 	github.com/riandyrn/otelchi v0.5.1
 	github.com/santhosh-tekuri/jsonschema/v5 v5.0.1
 	github.com/santhosh-tekuri/jsonschema/v5 v5.0.1
 	github.com/stefanmcshane/helm v0.0.0-20221213002717-88a4a2c6e77d
 	github.com/stefanmcshane/helm v0.0.0-20221213002717-88a4a2c6e77d

+ 2 - 0
go.sum

@@ -1525,6 +1525,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
 github.com/polyfloyd/go-errorlint v0.0.0-20210722154253-910bb7978349/go.mod h1:wi9BfjxjF/bwiZ701TzmfKu6UKC357IOAtNr0Td0Lvw=
 github.com/polyfloyd/go-errorlint v0.0.0-20210722154253-910bb7978349/go.mod h1:wi9BfjxjF/bwiZ701TzmfKu6UKC357IOAtNr0Td0Lvw=
 github.com/porter-dev/api-contracts v0.2.90 h1:0ceIXz0xWNQpqVqhUMt3/RDeEawccfXx3KgM/tRg638=
 github.com/porter-dev/api-contracts v0.2.90 h1:0ceIXz0xWNQpqVqhUMt3/RDeEawccfXx3KgM/tRg638=
 github.com/porter-dev/api-contracts v0.2.90/go.mod h1:fX6JmP5QuzxDLvqP3evFOTXjI4dHxsG0+VKNTjImZU8=
 github.com/porter-dev/api-contracts v0.2.90/go.mod h1:fX6JmP5QuzxDLvqP3evFOTXjI4dHxsG0+VKNTjImZU8=
+github.com/porter-dev/api-contracts v0.2.91 h1:jiFWQ+WISAtfjXalOmWJdSr1ZOm7/ov+3ozrCeYA9Ws=
+github.com/porter-dev/api-contracts v0.2.91/go.mod h1:fX6JmP5QuzxDLvqP3evFOTXjI4dHxsG0+VKNTjImZU8=
 github.com/porter-dev/switchboard v0.0.3 h1:dBuYkiVLa5Ce7059d6qTe9a1C2XEORFEanhbtV92R+M=
 github.com/porter-dev/switchboard v0.0.3 h1:dBuYkiVLa5Ce7059d6qTe9a1C2XEORFEanhbtV92R+M=
 github.com/porter-dev/switchboard v0.0.3/go.mod h1:xSPzqSFMQ6OSbp42fhCi4AbGbQbsm6nRvOkrblFeXU4=
 github.com/porter-dev/switchboard v0.0.3/go.mod h1:xSPzqSFMQ6OSbp42fhCi4AbGbQbsm6nRvOkrblFeXU4=
 github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
 github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=