|
|
@@ -2,7 +2,6 @@ package provisioner
|
|
|
|
|
|
import (
|
|
|
"context"
|
|
|
- "encoding/json"
|
|
|
"fmt"
|
|
|
|
|
|
"github.com/porter-dev/porter/internal/repository"
|
|
|
@@ -46,6 +45,8 @@ func InitGlobalStream(client *redis.Client) error {
|
|
|
GlobalStreamName,
|
|
|
).Result()
|
|
|
|
|
|
+ fmt.Println(xInfoGroups, err)
|
|
|
+
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
@@ -53,6 +54,7 @@ func InitGlobalStream(client *redis.Client) error {
|
|
|
for _, group := range xInfoGroups {
|
|
|
// if the group exists, return with no error
|
|
|
if group.Name == GlobalStreamGroupName {
|
|
|
+ fmt.Println("group already exists")
|
|
|
return nil
|
|
|
}
|
|
|
}
|
|
|
@@ -62,9 +64,11 @@ func InitGlobalStream(client *redis.Client) error {
|
|
|
context.Background(),
|
|
|
GlobalStreamName,
|
|
|
GlobalStreamGroupName,
|
|
|
- ">",
|
|
|
+ "$",
|
|
|
).Result()
|
|
|
|
|
|
+ fmt.Println("xgroup created", err)
|
|
|
+
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
@@ -80,6 +84,8 @@ func GlobalStreamListener(
|
|
|
repo repository.Repository,
|
|
|
errorChan chan error,
|
|
|
) {
|
|
|
+ fmt.Println("starting global stream listener")
|
|
|
+
|
|
|
for {
|
|
|
xstreams, err := client.XReadGroup(
|
|
|
context.Background(),
|
|
|
@@ -91,6 +97,8 @@ func GlobalStreamListener(
|
|
|
},
|
|
|
).Result()
|
|
|
|
|
|
+ fmt.Println(xstreams, err)
|
|
|
+
|
|
|
if err != nil {
|
|
|
errorChan <- err
|
|
|
return
|
|
|
@@ -98,6 +106,8 @@ func GlobalStreamListener(
|
|
|
|
|
|
// parse messages from the global stream
|
|
|
for _, msg := range xstreams[0].Messages {
|
|
|
+ fmt.Println(msg.Values)
|
|
|
+
|
|
|
// parse the id to identify the infra
|
|
|
kind, projID, infraID, err := models.ParseWorkspaceID(fmt.Sprintf("%v", msg.Values["id"]))
|
|
|
|
|
|
@@ -124,9 +134,12 @@ func GlobalStreamListener(
|
|
|
}
|
|
|
|
|
|
// parse raw data into ECR type
|
|
|
- bytes, _ := msg.Values["data"].([]byte)
|
|
|
+ dataMap, ok := msg.Values["data"].(map[string]interface{})
|
|
|
|
|
|
- json.Unmarshal(bytes, reg)
|
|
|
+ if ok {
|
|
|
+ name, _ := dataMap["name"].(string)
|
|
|
+ reg.Name = name
|
|
|
+ }
|
|
|
|
|
|
reg, err := repo.Registry.CreateRegistry(reg)
|
|
|
|