2
0

global_stream.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359
  1. package provisioner
  2. import (
  3. "context"
  4. "encoding/base64"
  5. "encoding/json"
  6. "fmt"
  7. "regexp"
  8. "github.com/aws/aws-sdk-go/service/ecr"
  9. "github.com/porter-dev/porter/internal/repository"
  10. redis "github.com/go-redis/redis/v8"
  11. "github.com/porter-dev/porter/internal/models"
  12. )
  13. // GlobalStreamName is the name of the Redis stream for global operations
  14. const GlobalStreamName = "global"
  15. // GlobalStreamGroupName is the name of the Redis consumer group that this server
  16. // is a part of
  17. const GlobalStreamGroupName = "portersvr"
  18. // InitGlobalStream initializes the global stream if it does not exist, and the
  19. // global consumer group if it does not exist
  20. func InitGlobalStream(client *redis.Client) error {
  21. // determine if the stream exists
  22. x, err := client.Exists(
  23. context.Background(),
  24. GlobalStreamName,
  25. ).Result()
  26. // if it does not exist, create group and stream
  27. if x == 0 {
  28. _, err := client.XGroupCreateMkStream(
  29. context.Background(),
  30. GlobalStreamName,
  31. GlobalStreamGroupName,
  32. ">",
  33. ).Result()
  34. return err
  35. }
  36. // otherwise, check if the group exists
  37. xInfoGroups, err := client.XInfoGroups(
  38. context.Background(),
  39. GlobalStreamName,
  40. ).Result()
  41. fmt.Println(xInfoGroups, err)
  42. if err != nil {
  43. return err
  44. }
  45. for _, group := range xInfoGroups {
  46. // if the group exists, return with no error
  47. if group.Name == GlobalStreamGroupName {
  48. fmt.Println("group already exists")
  49. return nil
  50. }
  51. }
  52. // if the group does not exist, create it
  53. _, err = client.XGroupCreate(
  54. context.Background(),
  55. GlobalStreamName,
  56. GlobalStreamGroupName,
  57. "$",
  58. ).Result()
  59. fmt.Println("xgroup created", err)
  60. return err
  61. }
  62. // ResourceCRUDHandler is a handler for updates to an infra resource
  63. type ResourceCRUDHandler interface {
  64. OnCreate(id uint) error
  65. }
  66. // GlobalStreamListener performs an XREADGROUP operation on a given stream and
  67. // updates models in the database as necessary
  68. func GlobalStreamListener(
  69. client *redis.Client,
  70. repo repository.Repository,
  71. errorChan chan error,
  72. ) {
  73. fmt.Println("starting global stream listener")
  74. for {
  75. xstreams, err := client.XReadGroup(
  76. context.Background(),
  77. &redis.XReadGroupArgs{
  78. Group: GlobalStreamGroupName,
  79. Consumer: "portersvr-0", // just static consumer for now
  80. Streams: []string{GlobalStreamName, ">"},
  81. Block: 0,
  82. },
  83. ).Result()
  84. fmt.Println(xstreams, err)
  85. if err != nil {
  86. errorChan <- err
  87. return
  88. }
  89. // parse messages from the global stream
  90. for _, msg := range xstreams[0].Messages {
  91. // parse the id to identify the infra
  92. kind, projID, infraID, err := models.ParseUniqueName(fmt.Sprintf("%v", msg.Values["id"]))
  93. if fmt.Sprintf("%v", msg.Values["status"]) == "created" {
  94. infra, err := repo.Infra.ReadInfra(infraID)
  95. if err != nil {
  96. continue
  97. }
  98. infra.Status = models.StatusCreated
  99. infra, err = repo.Infra.UpdateInfra(infra)
  100. if err != nil {
  101. continue
  102. }
  103. // create ECR/EKS
  104. if kind == string(models.InfraECR) {
  105. reg := &models.Registry{
  106. ProjectID: projID,
  107. AWSIntegrationID: infra.AWSIntegrationID,
  108. InfraID: infra.ID,
  109. }
  110. // parse raw data into ECR type
  111. dataString, ok := msg.Values["data"].(string)
  112. if ok {
  113. json.Unmarshal([]byte(dataString), reg)
  114. }
  115. awsInt, err := repo.AWSIntegration.ReadAWSIntegration(reg.AWSIntegrationID)
  116. if err != nil {
  117. continue
  118. }
  119. sess, err := awsInt.GetSession()
  120. if err != nil {
  121. continue
  122. }
  123. ecrSvc := ecr.New(sess)
  124. output, err := ecrSvc.GetAuthorizationToken(&ecr.GetAuthorizationTokenInput{})
  125. if err != nil {
  126. continue
  127. }
  128. reg.URL = *output.AuthorizationData[0].ProxyEndpoint
  129. reg, err = repo.Registry.CreateRegistry(reg)
  130. if err != nil {
  131. continue
  132. }
  133. } else if kind == string(models.InfraEKS) {
  134. cluster := &models.Cluster{
  135. AuthMechanism: models.AWS,
  136. ProjectID: projID,
  137. AWSIntegrationID: infra.AWSIntegrationID,
  138. InfraID: infra.ID,
  139. }
  140. // parse raw data into ECR type
  141. dataString, ok := msg.Values["data"].(string)
  142. if ok {
  143. json.Unmarshal([]byte(dataString), cluster)
  144. }
  145. re := regexp.MustCompile(`^([A-Za-z0-9+/]{4})*([A-Za-z0-9+/]{3}=|[A-Za-z0-9+/]{2}==)?$`)
  146. // if it matches the base64 regex, decode it
  147. caData := string(cluster.CertificateAuthorityData)
  148. if re.MatchString(caData) {
  149. decoded, err := base64.StdEncoding.DecodeString(caData)
  150. if err != nil {
  151. continue
  152. }
  153. cluster.CertificateAuthorityData = []byte(decoded)
  154. }
  155. cluster, err := repo.Cluster.CreateCluster(cluster)
  156. if err != nil {
  157. continue
  158. }
  159. } else if kind == string(models.InfraGCR) {
  160. reg := &models.Registry{
  161. ProjectID: projID,
  162. GCPIntegrationID: infra.GCPIntegrationID,
  163. InfraID: infra.ID,
  164. Name: "gcr-registry",
  165. }
  166. // parse raw data into ECR type
  167. dataString, ok := msg.Values["data"].(string)
  168. if ok {
  169. json.Unmarshal([]byte(dataString), reg)
  170. }
  171. reg, err = repo.Registry.CreateRegistry(reg)
  172. if err != nil {
  173. continue
  174. }
  175. } else if kind == string(models.InfraGKE) {
  176. cluster := &models.Cluster{
  177. AuthMechanism: models.GCP,
  178. ProjectID: projID,
  179. GCPIntegrationID: infra.GCPIntegrationID,
  180. InfraID: infra.ID,
  181. }
  182. // parse raw data into GKE type
  183. dataString, ok := msg.Values["data"].(string)
  184. if ok {
  185. json.Unmarshal([]byte(dataString), cluster)
  186. }
  187. re := regexp.MustCompile(`^([A-Za-z0-9+/]{4})*([A-Za-z0-9+/]{3}=|[A-Za-z0-9+/]{2}==)?$`)
  188. // if it matches the base64 regex, decode it
  189. caData := string(cluster.CertificateAuthorityData)
  190. if re.MatchString(caData) {
  191. decoded, err := base64.StdEncoding.DecodeString(caData)
  192. if err != nil {
  193. continue
  194. }
  195. cluster.CertificateAuthorityData = []byte(decoded)
  196. }
  197. cluster, err := repo.Cluster.CreateCluster(cluster)
  198. if err != nil {
  199. continue
  200. }
  201. } else if kind == string(models.InfraDOCR) {
  202. reg := &models.Registry{
  203. ProjectID: projID,
  204. DOIntegrationID: infra.DOIntegrationID,
  205. InfraID: infra.ID,
  206. }
  207. // parse raw data into DOCR type
  208. dataString, ok := msg.Values["data"].(string)
  209. if ok {
  210. json.Unmarshal([]byte(dataString), reg)
  211. }
  212. reg, err = repo.Registry.CreateRegistry(reg)
  213. if err != nil {
  214. continue
  215. }
  216. } else if kind == string(models.InfraDOKS) {
  217. cluster := &models.Cluster{
  218. AuthMechanism: models.DO,
  219. ProjectID: projID,
  220. DOIntegrationID: infra.DOIntegrationID,
  221. InfraID: infra.ID,
  222. }
  223. // parse raw data into GKE type
  224. dataString, ok := msg.Values["data"].(string)
  225. if ok {
  226. json.Unmarshal([]byte(dataString), cluster)
  227. }
  228. re := regexp.MustCompile(`^([A-Za-z0-9+/]{4})*([A-Za-z0-9+/]{3}=|[A-Za-z0-9+/]{2}==)?$`)
  229. // if it matches the base64 regex, decode it
  230. caData := string(cluster.CertificateAuthorityData)
  231. if re.MatchString(caData) {
  232. decoded, err := base64.StdEncoding.DecodeString(caData)
  233. if err != nil {
  234. continue
  235. }
  236. cluster.CertificateAuthorityData = []byte(decoded)
  237. }
  238. cluster, err := repo.Cluster.CreateCluster(cluster)
  239. if err != nil {
  240. continue
  241. }
  242. }
  243. } else if fmt.Sprintf("%v", msg.Values["status"]) == "error" {
  244. infra, err := repo.Infra.ReadInfra(infraID)
  245. if err != nil {
  246. continue
  247. }
  248. infra.Status = models.StatusError
  249. infra, err = repo.Infra.UpdateInfra(infra)
  250. if err != nil {
  251. continue
  252. }
  253. } else if fmt.Sprintf("%v", msg.Values["status"]) == "destroyed" {
  254. infra, err := repo.Infra.ReadInfra(infraID)
  255. if err != nil {
  256. continue
  257. }
  258. infra.Status = models.StatusDestroyed
  259. infra, err = repo.Infra.UpdateInfra(infra)
  260. if err != nil {
  261. continue
  262. }
  263. }
  264. // acknowledge the message as read
  265. _, err = client.XAck(
  266. context.Background(),
  267. GlobalStreamName,
  268. GlobalStreamGroupName,
  269. msg.ID,
  270. ).Result()
  271. // if error, continue for now
  272. if err != nil {
  273. continue
  274. }
  275. }
  276. }
  277. }