2
0
Эх сурвалжийг харах

global redis stream listener

Alexander Belanger 5 жил өмнө
parent
commit
771d66ff64

+ 18 - 3
cmd/app/main.go

@@ -15,6 +15,7 @@ import (
 	lr "github.com/porter-dev/porter/internal/logger"
 	"github.com/porter-dev/porter/server/router"
 
+	prov "github.com/porter-dev/porter/internal/kubernetes/provisioner"
 	ints "github.com/porter-dev/porter/internal/models/integrations"
 )
 
@@ -29,6 +30,14 @@ func main() {
 		return
 	}
 
+	redis, err := adapter.NewRedisClient(&appConf.Redis)
+	prov.InitGlobalStream(redis)
+
+	if err != nil {
+		logger.Fatal().Err(err).Msg("")
+		return
+	}
+
 	err = db.AutoMigrate(
 		&models.Project{},
 		&models.Role{},
@@ -67,9 +76,10 @@ func main() {
 	repo := gorm.NewRepository(db, &key)
 
 	a, _ := api.New(&api.AppConfig{
-		Logger:     logger,
-		Repository: repo,
-		ServerConf: appConf.Server,
+		Logger:      logger,
+		Repository:  repo,
+		ServerConf:  appConf.Server,
+		RedisClient: redis,
 	})
 
 	appRouter := router.New(a)
@@ -86,7 +96,12 @@ func main() {
 		IdleTimeout:  appConf.Server.TimeoutIdle,
 	}
 
+	errorChan := make(chan error)
+
+	go prov.GlobalStreamListener(redis, repo.AWSInfra, errorChan)
+
 	if err := s.ListenAndServe(); err != nil && err != http.ErrServerClosed {
 		log.Fatal("Server startup failed", err)
 	}
+
 }

+ 19 - 19
go.mod

@@ -5,35 +5,31 @@ go 1.14
 require (
 	cloud.google.com/go v0.65.0
 	github.com/Azure/go-autorest/autorest v0.11.1 // indirect
+	github.com/Azure/go-autorest/autorest/adal v0.9.5 // indirect
 	github.com/DATA-DOG/go-sqlmock v1.5.0
 	github.com/Masterminds/semver v1.5.0 // indirect
 	github.com/aws/aws-sdk-go v1.31.6
-	github.com/containerd/containerd v1.4.1
-	github.com/cosmtrek/air v1.21.2 // indirect
-	github.com/creack/pty v1.1.11 // indirect
-	github.com/danieljoos/wincred v1.1.0 // indirect
+	github.com/containerd/containerd v1.4.1 // indirect
+	github.com/coreos/rkt v1.30.0
 	github.com/docker/docker v1.4.2-0.20200203170920-46ec8731fbce
-	github.com/docker/docker-credential-helpers v0.6.3
 	github.com/docker/go-connections v0.4.0
-	github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815 // indirect
 	github.com/evanphx/json-patch v4.9.0+incompatible // indirect
 	github.com/fatih/color v1.9.0
 	github.com/go-chi/chi v4.1.2+incompatible
-	github.com/go-chi/cors v1.1.1
 	github.com/go-playground/locales v0.13.0
 	github.com/go-playground/universal-translator v0.17.0
 	github.com/go-playground/validator/v10 v10.3.0
 	github.com/go-redis/redis v6.15.9+incompatible
-	github.com/go-redis/redis/v8 v8.4.4
+	github.com/go-redis/redis/v7 v7.4.0
+	github.com/go-redis/redis/v8 v8.3.1
 	github.com/go-test/deep v1.0.7
-	github.com/google/go-cmp v0.5.4
-	github.com/google/go-containerregistry v0.1.4
 	github.com/google/go-github v17.0.0+incompatible
-	github.com/google/go-github/v32 v32.1.0
+	github.com/google/go-querystring v1.0.0 // indirect
+	github.com/googleapis/gnostic v0.2.2 // indirect
 	github.com/gorilla/securecookie v1.1.1
 	github.com/gorilla/sessions v1.2.1
 	github.com/gorilla/websocket v1.4.2
-	github.com/hashicorp/consul/api v1.3.0
+	github.com/hashicorp/golang-lru v0.5.3 // indirect
 	github.com/imdario/mergo v0.3.11 // indirect
 	github.com/itchyny/gojq v0.11.1
 	github.com/itchyny/timefmt-go v0.1.1 // indirect
@@ -41,40 +37,44 @@ require (
 	github.com/joeshaw/envdecode v0.0.0-20200121155833-099f1fc765bd
 	github.com/json-iterator/go v1.1.10 // indirect
 	github.com/kr/pretty v0.2.0 // indirect
+	github.com/kr/text v0.2.0 // indirect
 	github.com/kris-nova/logger v0.0.0-20181127235838-fd0d87064b06
 	github.com/kris-nova/lolgopher v0.0.0-20180921204813-313b3abb0d9b // indirect
-	github.com/mattn/go-colorable v0.1.7 // indirect
+	github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
+	github.com/onsi/ginkgo v1.14.2 // indirect
+	github.com/opentracing/opentracing-go v1.2.0 // indirect
 	github.com/pelletier/go-toml v1.8.1 // indirect
 	github.com/pkg/errors v0.9.1
 	github.com/rs/zerolog v1.20.0
-	github.com/sirupsen/logrus v1.7.0
+	github.com/sirupsen/logrus v1.7.0 // indirect
 	github.com/spf13/cobra v1.0.0
 	github.com/spf13/viper v1.4.0
 	github.com/stretchr/testify v1.6.1
+	go.opentelemetry.io/otel v0.13.0 // indirect
 	golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0
+	golang.org/x/exp v0.0.0-20200908183739-ae8ad444f925 // indirect
 	golang.org/x/oauth2 v0.0.0-20200902213428-5d25da1a8d43
-	golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f
+	golang.org/x/sys v0.0.0-20200826173525-f9321e4c35a6 // indirect
 	golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e // indirect
 	google.golang.org/api v0.30.0
 	google.golang.org/genproto v0.0.0-20201014134559-03b6142f0dc9
 	google.golang.org/grpc v1.33.0 // indirect
-	gopkg.in/go-playground/validator.v9 v9.31.0
+	gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
 	gopkg.in/yaml.v2 v2.3.0
 	gorm.io/driver/postgres v1.0.2
 	gorm.io/driver/sqlite v1.1.3
 	gorm.io/gorm v1.20.2
-	helm.sh/helm v2.16.12+incompatible
+	gotest.tools/v3 v3.0.3 // indirect
 	helm.sh/helm/v3 v3.3.4
 	k8s.io/api v0.18.8
 	k8s.io/apimachinery v0.18.8
 	k8s.io/cli-runtime v0.18.8
 	k8s.io/client-go v0.18.8
-	k8s.io/gengo v0.0.0-20200413195148-3a45101e95ac // indirect
 	k8s.io/helm v2.16.12+incompatible
 	k8s.io/klog/v2 v2.2.0 // indirect
 	k8s.io/utils v0.0.0-20200912215256-4140de9c8800 // indirect
+	rsc.io/letsencrypt v0.0.3 // indirect
 	sigs.k8s.io/aws-iam-authenticator v0.5.2
-	sigs.k8s.io/structured-merge-diff/v4 v4.0.1 // indirect
 	sigs.k8s.io/yaml v1.2.0
 )
 

Файлын зөрүү хэтэрхий том тул дарагдсан байна
+ 54 - 339
go.sum


+ 22 - 0
internal/adapter/redis.go

@@ -0,0 +1,22 @@
+package adapter
+
+import (
+	"context"
+	"fmt"
+
+	redis "github.com/go-redis/redis/v8"
+	"github.com/porter-dev/porter/internal/config"
+)
+
+// NewRedisClient returns a new redis client instance
+func NewRedisClient(conf *config.RedisConf) (*redis.Client, error) {
+	client := redis.NewClient(&redis.Options{
+		Addr: fmt.Sprintf("%s:%s", conf.Host, conf.Port),
+		// Username: conf.Username,
+		// Password: conf.Password,
+		// DB:       conf.DB,
+	})
+
+	_, err := client.Ping(context.Background()).Result()
+	return client, err
+}

+ 1 - 0
internal/config/config.go

@@ -13,6 +13,7 @@ type Conf struct {
 	Server ServerConf
 	Db     DBConf
 	K8s    K8sConf
+	Redis  RedisConf
 }
 
 // ServerConf is the server configuration

+ 7 - 0
internal/config/redis.go

@@ -0,0 +1,7 @@
+package config
+
+// RedisConf is the redis config required for the provisioner container
+type RedisConf struct {
+	Host string `env:"REDIS_HOST,default=redis"`
+	Port string `env:"REDIS_PORT,default=5432"`
+}

+ 2 - 1
internal/kubernetes/agent.go

@@ -7,6 +7,7 @@ import (
 	"io"
 	"strings"
 
+	"github.com/porter-dev/porter/internal/config"
 	"github.com/porter-dev/porter/internal/kubernetes/provisioner"
 	"github.com/porter-dev/porter/internal/kubernetes/provisioner/aws"
 	"github.com/porter-dev/porter/internal/kubernetes/provisioner/aws/ecr"
@@ -261,7 +262,7 @@ func (a *Agent) provision(
 ) (*batchv1.Job, error) {
 	prov.Namespace = "default"
 
-	prov.Redis = &provisioner.RedisConf{
+	prov.Redis = &config.RedisConf{
 		Host: "redis-master.default.svc.cluster.local",
 		Port: "6379",
 	}

+ 133 - 0
internal/kubernetes/provisioner/global_stream.go

@@ -0,0 +1,133 @@
+package provisioner
+
+import (
+	"context"
+	"fmt"
+
+	"github.com/porter-dev/porter/internal/repository"
+
+	redis "github.com/go-redis/redis/v8"
+
+	"github.com/porter-dev/porter/internal/models"
+)
+
+// GlobalStreamName is the name of the Redis stream for global operations
+const GlobalStreamName = "global"
+
+// GlobalStreamGroupName is the name of the Redis consumer group that this server
+// is a part of
+const GlobalStreamGroupName = "portersvr"
+
+// InitGlobalStream initializes the global stream if it does not exist, and the
+// global consumer group if it does not exist
+func InitGlobalStream(client *redis.Client) error {
+	// determine if the stream exists
+	x, err := client.Exists(
+		context.Background(),
+		GlobalStreamName,
+	).Result()
+
+	// if it does not exist, create group and stream
+	if x == 0 {
+		_, err := client.XGroupCreateMkStream(
+			context.Background(),
+			GlobalStreamName,
+			GlobalStreamGroupName,
+			">",
+		).Result()
+
+		return err
+	}
+
+	// otherwise, check if the group exists
+	xInfoGroups, err := client.XInfoGroups(
+		context.Background(),
+		GlobalStreamName,
+	).Result()
+
+	if err != nil {
+		return err
+	}
+
+	for _, group := range xInfoGroups {
+		// if the group exists, return with no error
+		if group.Name == GlobalStreamGroupName {
+			return nil
+		}
+	}
+
+	// if the group does not exist, create it
+	_, err = client.XGroupCreate(
+		context.Background(),
+		GlobalStreamName,
+		GlobalStreamGroupName,
+		">",
+	).Result()
+
+	return err
+}
+
+// ResourceCRUDHandler is a handler for updates to an infra resource
+type ResourceCRUDHandler interface {
+	OnCreate(id uint) error
+}
+
+// GlobalStreamListener performs an XREADGROUP operation on a given stream
+// and sends a GlobalStreamMessage to the msgChan
+func GlobalStreamListener(
+	client *redis.Client,
+	infraRepo repository.AWSInfraRepository,
+	errorChan chan error,
+) {
+	for {
+		xstreams, err := client.XReadGroup(
+			context.Background(),
+			&redis.XReadGroupArgs{
+				Group:    GlobalStreamGroupName,
+				Consumer: "portersvr-0", // just static consumer for now
+				Streams:  []string{GlobalStreamName, ">"},
+				Block:    0,
+			},
+		).Result()
+
+		if err != nil {
+			errorChan <- err
+			return
+		}
+
+		// parse messages from the global stream
+		for _, msg := range xstreams[0].Messages {
+			// parse the id to identify the infra
+			infraID, err := models.GetInfraIDFromWorkspaceID(fmt.Sprintf("%v", msg.Values["id"]))
+
+			if fmt.Sprintf("%v", msg.Values["status"]) == "created" {
+				infra, err := infraRepo.ReadAWSInfra(infraID)
+
+				if err != nil {
+					continue
+				}
+
+				infra.Status = models.StatusCreated
+
+				infra, err = infraRepo.UpdateAWSInfra(infra)
+
+				if err != nil {
+					continue
+				}
+			}
+
+			// acknowledge the message as read
+			_, err = client.XAck(
+				context.Background(),
+				GlobalStreamName,
+				GlobalStreamGroupName,
+				msg.ID,
+			).Result()
+
+			// if error, continue for now
+			if err != nil {
+				continue
+			}
+		}
+	}
+}

+ 3 - 7
internal/kubernetes/provisioner/provisioner.go

@@ -8,6 +8,8 @@ import (
 	"github.com/porter-dev/porter/internal/kubernetes/provisioner/aws"
 	"github.com/porter-dev/porter/internal/kubernetes/provisioner/aws/ecr"
 	"github.com/porter-dev/porter/internal/kubernetes/provisioner/aws/eks"
+
+	"github.com/porter-dev/porter/internal/config"
 )
 
 // InfraOption is a type of infrastructure that can be provisioned
@@ -26,7 +28,7 @@ type Conf struct {
 	Name      string
 	Namespace string
 	ID        string
-	Redis     *RedisConf
+	Redis     *config.RedisConf
 	Postgres  *PostgresConf
 
 	// provider-specific configurations
@@ -35,12 +37,6 @@ type Conf struct {
 	EKS *eks.Conf
 }
 
-// RedisConf is the redis config required for the provisioner container
-type RedisConf struct {
-	Host string
-	Port string
-}
-
 // PostgresConf is the postgres config for the provisioner container
 type PostgresConf struct {
 	Host string

+ 63 - 0
internal/kubernetes/provisioner/resource_stream.go

@@ -0,0 +1,63 @@
+package provisioner
+
+import (
+	"context"
+	"fmt"
+
+	redis "github.com/go-redis/redis/v8"
+	"github.com/gorilla/websocket"
+)
+
+// ResourceStream performs an XREAD operation on the given stream and outputs it to the given websocket conn.
+func ResourceStream(client *redis.Client, streamName string, conn *websocket.Conn) error {
+	errorchan := make(chan error)
+
+	go func() {
+		// listens for websocket closing handshake
+		for {
+			_, _, err := conn.ReadMessage()
+
+			if err != nil {
+				defer conn.Close()
+				errorchan <- err
+				return
+			}
+		}
+	}()
+
+	go func() {
+		lastID := "0-0"
+
+		for {
+
+			xstream, err := client.XRead(
+				context.Background(),
+				&redis.XReadArgs{
+					Streams: []string{streamName, lastID},
+					Block:   0,
+				},
+			).Result()
+
+			if err != nil {
+				return
+			}
+
+			messages := xstream[0].Messages
+			lastID = messages[len(messages)-1].ID
+
+			if writeErr := conn.WriteMessage(websocket.TextMessage, []byte(fmt.Sprint(xstream))); writeErr != nil {
+				errorchan <- writeErr
+				return
+			}
+		}
+	}()
+
+	for {
+		select {
+		case err := <-errorchan:
+			close(errorchan)
+			client.Close()
+			return err
+		}
+	}
+}

+ 19 - 1
internal/models/infra.go

@@ -2,6 +2,8 @@ package models
 
 import (
 	"fmt"
+	"strconv"
+	"strings"
 
 	"gorm.io/gorm"
 )
@@ -13,7 +15,6 @@ type InfraStatus string
 const (
 	StatusCreating InfraStatus = "creating"
 	StatusCreated  InfraStatus = "created"
-	StatusUpdating InfraStatus = "updating"
 )
 
 // AWSInfraKind is the kind that aws infra can be
@@ -71,3 +72,20 @@ func (ai *AWSInfra) Externalize() *AWSInfraExternal {
 func (ai *AWSInfra) GetWorkspaceID() string {
 	return fmt.Sprintf("%s-%d-%d", ai.Kind, ai.ProjectID, ai.ID)
 }
+
+// GetInfraIDFromWorkspaceID returns the infra id given a workspace id
+func GetInfraIDFromWorkspaceID(workspaceID string) (uint, error) {
+	strArr := strings.Split(workspaceID, "-")
+
+	if len(strArr) != 3 {
+		return 0, fmt.Errorf("workspace id improperly formatted")
+	}
+
+	u, err := strconv.ParseUint(strArr[2], 10, 64)
+
+	if err != nil {
+		return 0, err
+	}
+
+	return uint(u), nil
+}

+ 11 - 0
internal/repository/gorm/infra.go

@@ -62,3 +62,14 @@ func (repo *AWSInfraRepository) ListAWSInfrasByProjectID(
 
 	return infras, nil
 }
+
+// UpdateAWSInfra modifies an existing AWSInfra in the database
+func (repo *AWSInfraRepository) UpdateAWSInfra(
+	ai *models.AWSInfra,
+) (*models.AWSInfra, error) {
+	if err := repo.db.Save(ai).Error; err != nil {
+		return nil, err
+	}
+
+	return ai, nil
+}

+ 1 - 0
internal/repository/infra.go

@@ -9,4 +9,5 @@ type AWSInfraRepository interface {
 	CreateAWSInfra(repo *models.AWSInfra) (*models.AWSInfra, error)
 	ReadAWSInfra(id uint) (*models.AWSInfra, error)
 	ListAWSInfrasByProjectID(projectID uint) ([]*models.AWSInfra, error)
+	UpdateAWSInfra(repo *models.AWSInfra) (*models.AWSInfra, error)
 }

+ 18 - 0
internal/repository/memory/infra.go

@@ -71,3 +71,21 @@ func (repo *AWSInfraRepository) ListAWSInfrasByProjectID(
 
 	return res, nil
 }
+
+// UpdateAWSInfra modifies an existing AWSInfra in the database
+func (repo *AWSInfraRepository) UpdateAWSInfra(
+	ai *models.AWSInfra,
+) (*models.AWSInfra, error) {
+	if !repo.canQuery {
+		return nil, errors.New("Cannot write database")
+	}
+
+	if int(ai.ID-1) >= len(repo.awsInfras) || repo.awsInfras[ai.ID-1] == nil {
+		return nil, gorm.ErrRecordNotFound
+	}
+
+	index := int(ai.ID - 1)
+	repo.awsInfras[index] = ai
+
+	return ai, nil
+}

+ 17 - 70
server/api/api.go

@@ -6,6 +6,7 @@ import (
 	"github.com/go-playground/locales/en"
 	ut "github.com/go-playground/universal-translator"
 	vr "github.com/go-playground/validator/v10"
+	"github.com/go-redis/redis/v8"
 	sessionstore "github.com/porter-dev/porter/internal/auth"
 	"github.com/porter-dev/porter/internal/oauth"
 	"golang.org/x/oauth2"
@@ -32,10 +33,11 @@ type TestAgents struct {
 
 // AppConfig is the configuration required for creating a new App
 type AppConfig struct {
-	DB         *gorm.DB
-	Logger     *lr.Logger
-	Repository *repository.Repository
-	ServerConf config.ServerConf
+	DB          *gorm.DB
+	Logger      *lr.Logger
+	Repository  *repository.Repository
+	ServerConf  config.ServerConf
+	RedisClient *redis.Client
 
 	// TestAgents if API is in testing mode
 	TestAgents *TestAgents
@@ -59,6 +61,9 @@ type App struct {
 	// agents exposed for testing
 	TestAgents *TestAgents
 
+	// redis conf for redis connection
+	RedisClient *redis.Client
+
 	// oauth-specific clients
 	GithubConf *oauth2.Config
 
@@ -81,13 +86,14 @@ func New(conf *AppConfig) (*App, error) {
 	}
 
 	app := &App{
-		Logger:     conf.Logger,
-		Repo:       conf.Repository,
-		ServerConf: conf.ServerConf,
-		TestAgents: conf.TestAgents,
-		db:         conf.DB,
-		validator:  validator,
-		translator: &translator,
+		Logger:      conf.Logger,
+		Repo:        conf.Repository,
+		ServerConf:  conf.ServerConf,
+		RedisClient: conf.RedisClient,
+		TestAgents:  conf.TestAgents,
+		db:          conf.DB,
+		validator:   validator,
+		translator:  &translator,
 	}
 
 	// if repository not specified, default to in-memory
@@ -116,62 +122,3 @@ func New(conf *AppConfig) (*App, error) {
 
 	return app, nil
 }
-
-// // New returns a new App instance
-// // TODO -- this should accept an app/server config
-// func New(
-// 	logger *lr.Logger,
-// 	db *gorm.DB,
-// 	repo *repository.Repository,
-// 	validator *validator.Validate,
-// 	store sessions.Store,
-// 	cookieName string,
-// 	testing bool,
-// 	isLocal bool,
-// 	githubConfig *oauth.Config,
-// 	serverConf config.ServerConf,
-// ) *App {
-// 	// for now, will just support the english translator from the
-// 	// validator/translations package
-// 	en := en.New()
-// 	uni := ut.New(en, en)
-// 	trans, _ := uni.GetTranslator("en")
-
-// 	var testAgents *TestAgents = nil
-
-// 	if testing {
-// 		memStorage := helm.StorageMap["memory"](nil, nil, "")
-
-// 		testAgents = &TestAgents{
-// 			HelmAgent:             helm.GetAgentTesting(&helm.Form{}, nil, logger),
-// 			HelmTestStorageDriver: memStorage,
-// 			K8sAgent:              kubernetes.GetAgentTesting(),
-// 		}
-// 	}
-
-// 	var oauthGithubConf *oauth2.Config
-
-// 	if githubConfig != nil {
-// 		oauthGithubConf = oauth.NewGithubClient(githubConfig)
-// 	}
-
-// 	return &App{
-// 		db:           db,
-// 		logger:       logger,
-// 		repo:         repo,
-// 		validator:    validator,
-// 		store:        store,
-// 		translator:   &trans,
-// 		cookieName:   cookieName,
-// 		testing:      testing,
-// 		isLocal:      isLocal,
-// 		TestAgents:   testAgents,
-// 		GithubConfig: oauthGithubConf,
-// 		ServerConf:   serverConf,
-// 	}
-// }
-
-// // Logger returns the logger instance in use by App
-// func (app *App) Logger() *lr.Logger {
-// 	return app.logger
-// }

+ 2 - 85
server/api/provision_handler.go

@@ -1,19 +1,16 @@
 package api
 
 import (
-	"context"
 	"encoding/json"
 	"fmt"
 	"net/http"
 	"strconv"
 
 	"github.com/go-chi/chi"
-	redis "github.com/go-redis/redis/v8"
-	"github.com/gorilla/websocket"
 
 	"github.com/porter-dev/porter/internal/forms"
 	"github.com/porter-dev/porter/internal/kubernetes"
-	prov "github.com/porter-dev/porter/internal/kubernetes/provisioner"
+	"github.com/porter-dev/porter/internal/kubernetes/provisioner"
 )
 
 // HandleProvisionTest will create a test resource by deploying a provisioner
@@ -141,90 +138,10 @@ func (app *App) HandleGetProvisioningLogs(w http.ResponseWriter, r *http.Request
 		app.handleErrorUpgradeWebsocket(err, w)
 	}
 
-	err = StreamRedis(streamName, conn)
+	err = provisioner.ResourceStream(app.RedisClient, streamName, conn)
 
 	if err != nil {
 		app.handleErrorWebsocketWrite(err, w)
 		return
 	}
 }
-
-// helper functions
-
-// StreamRedis performs an XREAD operation on the given stream and outputs it to the given websocket conn.
-func StreamRedis(streamName string, conn *websocket.Conn) error {
-	conf := &prov.RedisConf{
-		Host: "redis",
-		Port: "6379",
-	}
-
-	client, err := NewRedisClient(conf)
-
-	if err != nil {
-		return err
-	}
-
-	errorchan := make(chan error)
-
-	go func() {
-		// listens for websocket closing handshake
-		for {
-			_, _, err := conn.ReadMessage()
-
-			if err != nil {
-				defer conn.Close()
-				errorchan <- err
-				return
-			}
-		}
-	}()
-
-	go func() {
-		lastID := "0-0"
-
-		for {
-
-			xstream, err := client.XRead(
-				context.Background(),
-				&redis.XReadArgs{
-					Streams: []string{streamName, lastID},
-					Block:   0,
-				},
-			).Result()
-
-			if err != nil {
-				return
-			}
-
-			messages := xstream[0].Messages
-			lastID = messages[len(messages)-1].ID
-
-			if writeErr := conn.WriteMessage(websocket.TextMessage, []byte(fmt.Sprint(xstream))); writeErr != nil {
-				errorchan <- writeErr
-				return
-			}
-		}
-	}()
-
-	for {
-		select {
-		case err = <-errorchan:
-			close(errorchan)
-			client.Close()
-			return err
-		}
-	}
-}
-
-// NewRedisClient returns a new redis client instance
-func NewRedisClient(conf *prov.RedisConf) (*redis.Client, error) {
-	client := redis.NewClient(&redis.Options{
-		Addr: fmt.Sprintf("%s:%s", conf.Host, conf.Port),
-		// Username: conf.Username,
-		// Password: conf.Password,
-		// DB:       conf.DB,
-	})
-
-	_, err := client.Ping(context.Background()).Result()
-	return client, err
-}

Энэ ялгаанд хэт олон файл өөрчлөгдсөн тул зарим файлыг харуулаагүй болно