|
|
@@ -2,6 +2,7 @@ package provisioner
|
|
|
|
|
|
import (
|
|
|
"context"
|
|
|
+ "encoding/json"
|
|
|
"fmt"
|
|
|
|
|
|
"github.com/porter-dev/porter/internal/repository"
|
|
|
@@ -72,11 +73,11 @@ type ResourceCRUDHandler interface {
|
|
|
OnCreate(id uint) error
|
|
|
}
|
|
|
|
|
|
-// GlobalStreamListener performs an XREADGROUP operation on a given stream
|
|
|
-// and sends a GlobalStreamMessage to the msgChan
|
|
|
+// GlobalStreamListener performs an XREADGROUP operation on a given stream and
|
|
|
+// updates models in the database as necessary
|
|
|
func GlobalStreamListener(
|
|
|
client *redis.Client,
|
|
|
- infraRepo repository.AWSInfraRepository,
|
|
|
+ repo repository.Repository,
|
|
|
errorChan chan error,
|
|
|
) {
|
|
|
for {
|
|
|
@@ -98,10 +99,10 @@ func GlobalStreamListener(
|
|
|
// parse messages from the global stream
|
|
|
for _, msg := range xstreams[0].Messages {
|
|
|
// parse the id to identify the infra
|
|
|
- infraID, err := models.GetInfraIDFromWorkspaceID(fmt.Sprintf("%v", msg.Values["id"]))
|
|
|
+ kind, projID, infraID, err := models.ParseWorkspaceID(fmt.Sprintf("%v", msg.Values["id"]))
|
|
|
|
|
|
if fmt.Sprintf("%v", msg.Values["status"]) == "created" {
|
|
|
- infra, err := infraRepo.ReadAWSInfra(infraID)
|
|
|
+ infra, err := repo.AWSInfra.ReadAWSInfra(infraID)
|
|
|
|
|
|
if err != nil {
|
|
|
continue
|
|
|
@@ -109,11 +110,30 @@ func GlobalStreamListener(
|
|
|
|
|
|
infra.Status = models.StatusCreated
|
|
|
|
|
|
- infra, err = infraRepo.UpdateAWSInfra(infra)
|
|
|
+ infra, err = repo.AWSInfra.UpdateAWSInfra(infra)
|
|
|
|
|
|
if err != nil {
|
|
|
continue
|
|
|
}
|
|
|
+
|
|
|
+ // create ECR/EKS
|
|
|
+ if kind == string(models.AWSInfraECR) {
|
|
|
+ reg := &models.Registry{
|
|
|
+ ProjectID: projID,
|
|
|
+ AWSIntegrationID: infra.AWSIntegrationID,
|
|
|
+ }
|
|
|
+
|
|
|
+ // parse raw data into ECR type
|
|
|
+ bytes, _ := msg.Values["data"].([]byte)
|
|
|
+
|
|
|
+ json.Unmarshal(bytes, reg)
|
|
|
+
|
|
|
+ reg, err := repo.Registry.CreateRegistry(reg)
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// acknowledge the message as read
|