|
|
@@ -2,7 +2,10 @@ package provisioner
|
|
|
|
|
|
import (
|
|
|
"context"
|
|
|
+ "encoding/base64"
|
|
|
+ "encoding/json"
|
|
|
"fmt"
|
|
|
+ "regexp"
|
|
|
|
|
|
"github.com/porter-dev/porter/internal/repository"
|
|
|
|
|
|
@@ -45,6 +48,8 @@ func InitGlobalStream(client *redis.Client) error {
|
|
|
GlobalStreamName,
|
|
|
).Result()
|
|
|
|
|
|
+ fmt.Println(xInfoGroups, err)
|
|
|
+
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
@@ -52,6 +57,7 @@ func InitGlobalStream(client *redis.Client) error {
|
|
|
for _, group := range xInfoGroups {
|
|
|
// if the group exists, return with no error
|
|
|
if group.Name == GlobalStreamGroupName {
|
|
|
+ fmt.Println("group already exists")
|
|
|
return nil
|
|
|
}
|
|
|
}
|
|
|
@@ -61,9 +67,11 @@ func InitGlobalStream(client *redis.Client) error {
|
|
|
context.Background(),
|
|
|
GlobalStreamName,
|
|
|
GlobalStreamGroupName,
|
|
|
- ">",
|
|
|
+ "$",
|
|
|
).Result()
|
|
|
|
|
|
+ fmt.Println("xgroup created", err)
|
|
|
+
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
@@ -72,13 +80,15 @@ type ResourceCRUDHandler interface {
|
|
|
OnCreate(id uint) error
|
|
|
}
|
|
|
|
|
|
-// GlobalStreamListener performs an XREADGROUP operation on a given stream
|
|
|
-// and sends a GlobalStreamMessage to the msgChan
|
|
|
+// GlobalStreamListener performs an XREADGROUP operation on a given stream and
|
|
|
+// updates models in the database as necessary
|
|
|
func GlobalStreamListener(
|
|
|
client *redis.Client,
|
|
|
- infraRepo repository.AWSInfraRepository,
|
|
|
+ repo repository.Repository,
|
|
|
errorChan chan error,
|
|
|
) {
|
|
|
+ fmt.Println("starting global stream listener")
|
|
|
+
|
|
|
for {
|
|
|
xstreams, err := client.XReadGroup(
|
|
|
context.Background(),
|
|
|
@@ -90,6 +100,8 @@ func GlobalStreamListener(
|
|
|
},
|
|
|
).Result()
|
|
|
|
|
|
+ fmt.Println(xstreams, err)
|
|
|
+
|
|
|
if err != nil {
|
|
|
errorChan <- err
|
|
|
return
|
|
|
@@ -98,10 +110,10 @@ func GlobalStreamListener(
|
|
|
// parse messages from the global stream
|
|
|
for _, msg := range xstreams[0].Messages {
|
|
|
// parse the id to identify the infra
|
|
|
- infraID, err := models.GetInfraIDFromWorkspaceID(fmt.Sprintf("%v", msg.Values["id"]))
|
|
|
+ kind, projID, infraID, err := models.ParseWorkspaceID(fmt.Sprintf("%v", msg.Values["id"]))
|
|
|
|
|
|
if fmt.Sprintf("%v", msg.Values["status"]) == "created" {
|
|
|
- infra, err := infraRepo.ReadAWSInfra(infraID)
|
|
|
+ infra, err := repo.AWSInfra.ReadAWSInfra(infraID)
|
|
|
|
|
|
if err != nil {
|
|
|
continue
|
|
|
@@ -109,7 +121,75 @@ func GlobalStreamListener(
|
|
|
|
|
|
infra.Status = models.StatusCreated
|
|
|
|
|
|
- infra, err = infraRepo.UpdateAWSInfra(infra)
|
|
|
+ infra, err = repo.AWSInfra.UpdateAWSInfra(infra)
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ // create ECR/EKS
|
|
|
+ if kind == string(models.AWSInfraECR) {
|
|
|
+ reg := &models.Registry{
|
|
|
+ ProjectID: projID,
|
|
|
+ AWSIntegrationID: infra.AWSIntegrationID,
|
|
|
+ }
|
|
|
+
|
|
|
+ // parse raw data into ECR type
|
|
|
+ dataString, ok := msg.Values["data"].(string)
|
|
|
+
|
|
|
+ if ok {
|
|
|
+ json.Unmarshal([]byte(dataString), reg)
|
|
|
+ }
|
|
|
+
|
|
|
+ reg, err := repo.Registry.CreateRegistry(reg)
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ } else if kind == string(models.AWSInfraEKS) {
|
|
|
+ cluster := &models.Cluster{
|
|
|
+ AuthMechanism: models.AWS,
|
|
|
+ ProjectID: projID,
|
|
|
+ AWSIntegrationID: infra.AWSIntegrationID,
|
|
|
+ }
|
|
|
+
|
|
|
+ // parse raw data into ECR type
|
|
|
+ dataString, ok := msg.Values["data"].(string)
|
|
|
+
|
|
|
+ if ok {
|
|
|
+ json.Unmarshal([]byte(dataString), cluster)
|
|
|
+ }
|
|
|
+
|
|
|
+ re := regexp.MustCompile(`^([A-Za-z0-9+/]{4})*([A-Za-z0-9+/]{3}=|[A-Za-z0-9+/]{2}==)?$`)
|
|
|
+
|
|
|
+ // if it matches the base64 regex, decode it
|
|
|
+ caData := string(cluster.CertificateAuthorityData)
|
|
|
+ if re.MatchString(caData) {
|
|
|
+ decoded, err := base64.StdEncoding.DecodeString(caData)
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ cluster.CertificateAuthorityData = []byte(decoded)
|
|
|
+ }
|
|
|
+
|
|
|
+ cluster, err := repo.Cluster.CreateCluster(cluster)
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else if fmt.Sprintf("%v", msg.Values["status"]) == "error" {
|
|
|
+ infra, err := repo.AWSInfra.ReadAWSInfra(infraID)
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ infra.Status = models.StatusError
|
|
|
+
|
|
|
+ infra, err = repo.AWSInfra.UpdateAWSInfra(infra)
|
|
|
|
|
|
if err != nil {
|
|
|
continue
|