global.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453
  1. package redis_stream
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "strings"
  10. "time"
  11. "github.com/porter-dev/porter/internal/analytics"
  12. redis "github.com/go-redis/redis/v8"
  13. "github.com/porter-dev/porter/internal/models"
  14. "github.com/porter-dev/porter/internal/repository"
  15. "github.com/porter-dev/porter/provisioner/integrations/storage"
  16. "github.com/porter-dev/porter/provisioner/server/config"
  17. "github.com/porter-dev/porter/provisioner/types"
  18. )
  19. // GlobalStreamName is the name of the Redis stream for global operations
  20. const GlobalStreamName = "global"
  21. // GlobalStreamGroupName is the name of the Redis consumer group that this server
  22. // is a part of
  23. const GlobalStreamGroupName = "portersvr"
  24. // InitGlobalStream initializes the global stream if it does not exist, and the
  25. // global consumer group if it does not exist
  26. func InitGlobalStream(client *redis.Client) error {
  27. // determine if the stream exists
  28. x, err := client.Exists(
  29. context.Background(),
  30. GlobalStreamName,
  31. ).Result()
  32. // if it does not exist, create group and stream
  33. if x == 0 {
  34. _, err := client.XGroupCreateMkStream(
  35. context.Background(),
  36. GlobalStreamName,
  37. GlobalStreamGroupName,
  38. ">",
  39. ).Result()
  40. return err
  41. }
  42. // otherwise, check if the group exists
  43. xInfoGroups, err := client.XInfoGroups(
  44. context.Background(),
  45. GlobalStreamName,
  46. ).Result()
  47. // if error is not NOGROUP error, return
  48. if err != nil && !strings.Contains(err.Error(), "NOGROUP") {
  49. return err
  50. }
  51. for _, group := range xInfoGroups {
  52. // if the group exists, return with no error
  53. if group.Name == GlobalStreamGroupName {
  54. return nil
  55. }
  56. }
  57. // if the group does not exist, create it
  58. _, err = client.XGroupCreate(
  59. context.Background(),
  60. GlobalStreamName,
  61. GlobalStreamGroupName,
  62. "$",
  63. ).Result()
  64. return err
  65. }
  66. func PushToGlobalStream(
  67. client *redis.Client,
  68. infra *models.Infra,
  69. operation *models.Operation,
  70. status string,
  71. ) error {
  72. // pushes a new operation to the global stream
  73. _, err := client.XAdd(context.TODO(), &redis.XAddArgs{
  74. Stream: GlobalStreamName,
  75. ID: "*",
  76. Values: map[string]interface{}{
  77. "id": models.GetWorkspaceID(infra, operation),
  78. "status": status,
  79. },
  80. }).Result()
  81. return err
  82. }
  83. func GlobalStreamListener(
  84. client *redis.Client,
  85. config *config.Config,
  86. repo repository.Repository,
  87. analyticsClient analytics.AnalyticsSegmentClient,
  88. errorChan chan error,
  89. ) {
  90. for {
  91. xstreams, err := client.XReadGroup(
  92. context.Background(),
  93. &redis.XReadGroupArgs{
  94. Group: GlobalStreamGroupName,
  95. Consumer: "portersvr-0", // just static consumer for now
  96. Streams: []string{GlobalStreamName, ">"},
  97. Block: 0,
  98. },
  99. ).Result()
  100. if err != nil {
  101. errorChan <- err
  102. return
  103. }
  104. // parse messages from the global stream
  105. for _, msg := range xstreams[0].Messages {
  106. // ensure that the msg contains the value id
  107. id, exists := msg.Values["id"]
  108. if !exists {
  109. config.Logger.Debug().Msg("skipping message parsing as id does not exist")
  110. continue
  111. }
  112. workspaceID, ok := id.(string)
  113. if !ok {
  114. config.Logger.Debug().Msg("skipping message parsing as workspace id does not exist")
  115. continue
  116. }
  117. // parse the id to identify the infra
  118. name, err := models.ParseWorkspaceID(workspaceID)
  119. if err != nil {
  120. config.Logger.Debug().Msg(fmt.Sprintf("could not parse workspace ID: %s %s", workspaceID, err.Error()))
  121. continue
  122. }
  123. config.Logger.Debug().Msg(fmt.Sprintf("reading infra %d and operation %s for project %d", name.InfraID, name.OperationUID, name.ProjectID))
  124. infra, err := repo.Infra().ReadInfra(name.ProjectID, name.InfraID)
  125. if err != nil {
  126. config.Logger.Debug().Msg(fmt.Sprintf("could not read infra %d in project %d: %s", name.InfraID, name.ProjectID, err.Error()))
  127. continue
  128. }
  129. operation, err := repo.Infra().ReadOperation(name.InfraID, name.OperationUID)
  130. if err != nil {
  131. config.Logger.Debug().Msg(fmt.Sprintf("could not read operation %s, infra %d in project %d: %s", name.OperationUID, name.InfraID, name.ProjectID, err.Error()))
  132. continue
  133. }
  134. statusVal, exists := msg.Values["status"]
  135. if !exists {
  136. config.Logger.Debug().Msg("skipping message parsing as status does not exist")
  137. continue
  138. }
  139. config.Logger.Debug().Msg(fmt.Sprintf("pushing state and log file for %s with status %v", workspaceID, statusVal))
  140. switch fmt.Sprintf("%v", statusVal) {
  141. case "created", "error", "destroyed":
  142. err := cleanupOperation(config, client, infra, operation, workspaceID)
  143. if err != nil {
  144. config.Alerter.SendAlert(context.Background(), err, map[string]interface{}{
  145. "workspace_id": workspaceID,
  146. })
  147. }
  148. }
  149. }
  150. }
  151. }
  152. func cleanupOperation(config *config.Config, client *redis.Client, infra *models.Infra, operation *models.Operation, workspaceID string) error {
  153. l := config.Logger
  154. l.Debug().Msg(fmt.Sprintf("pushing state for %s", workspaceID))
  155. err := pushNewStateToStorage(config, client, infra, operation, workspaceID)
  156. if err != nil {
  157. return err
  158. }
  159. l.Debug().Msg(fmt.Sprintf("cleaning state stream for %s", workspaceID))
  160. err = cleanupStateStream(config, client, workspaceID)
  161. if err != nil {
  162. return nil
  163. }
  164. l.Debug().Msg(fmt.Sprintf("pushing logs for %s", workspaceID))
  165. err = pushLogsToStorage(config, client, infra, workspaceID)
  166. if err != nil {
  167. return err
  168. }
  169. l.Debug().Msg(fmt.Sprintf("cleaning logs for %s", workspaceID))
  170. err = cleanupLogStream(config, client, infra, workspaceID)
  171. if err != nil {
  172. return err
  173. }
  174. return nil
  175. }
  176. func pushNewStateToStorage(config *config.Config, client *redis.Client, infra *models.Infra, operation *models.Operation, workspaceID string) error {
  177. // read the current state from S3
  178. currState := &types.TFState{}
  179. currStateBytes, err := config.StorageManager.ReadFile(infra, "current_state.json", true)
  180. if err != nil && errors.Is(err, storage.FileDoesNotExist) {
  181. currState.Resources = make(map[string]*types.TFResourceState)
  182. } else if err != nil {
  183. return err
  184. } else {
  185. err := json.Unmarshal(currStateBytes, currState)
  186. if err != nil {
  187. return err
  188. }
  189. }
  190. currState.OperationID = operation.UID
  191. currState.LastUpdated = time.Now()
  192. // read the corresponding stream and push all updates to create the new state
  193. lastID := "0-0"
  194. var processed int64 = 0
  195. streamName := fmt.Sprintf("%s-state", workspaceID)
  196. // get the length of the stream being read
  197. length, err := client.XLen(context.Background(), streamName).Result()
  198. if err != nil {
  199. return err
  200. }
  201. for processed != length {
  202. xstream, err := client.XRead(
  203. context.Background(),
  204. &redis.XReadArgs{
  205. Streams: []string{streamName, lastID},
  206. Block: 0,
  207. Count: 50,
  208. },
  209. ).Result()
  210. if err != nil {
  211. return err
  212. }
  213. messages := xstream[0].Messages
  214. lastID = messages[len(messages)-1].ID
  215. // compute the new state
  216. for _, msg := range messages {
  217. processed++
  218. stateData := &types.TFResourceStateEntry{}
  219. dataInter, ok := msg.Values["data"]
  220. if !ok {
  221. continue
  222. }
  223. dataString, ok := dataInter.(string)
  224. if !ok {
  225. continue
  226. }
  227. err := json.Unmarshal([]byte(dataString), stateData)
  228. if err != nil {
  229. continue
  230. }
  231. // the state data requires at least a name and status to be valid
  232. if stateData.ID != "" && stateData.Status != "" {
  233. // if the state is deleted, remove it from the current state
  234. if stateData.Status == types.TFResourceDeleted {
  235. delete(currState.Resources, stateData.ID)
  236. } else {
  237. // if the state data already exists, update the updated_at and status fields
  238. if _, exists := currState.Resources[stateData.ID]; exists {
  239. // currState.Resources[stateData.ID].UpdatedAt = time.
  240. currState.Resources[stateData.ID].UpdatedAt = stateData.PushedAt
  241. currState.Resources[stateData.ID].Status = stateData.Status
  242. currState.Resources[stateData.ID].Error = stateData.Error
  243. } else {
  244. currState.Resources[stateData.ID] = stateData.TFResourceState
  245. currState.Resources[stateData.ID].CreatedAt = stateData.PushedAt
  246. currState.Resources[stateData.ID].UpdatedAt = stateData.PushedAt
  247. }
  248. }
  249. }
  250. }
  251. }
  252. // determine the status of the operation based on the resources
  253. currState.Status = getOperationStatus(currState.Status, currState.Resources)
  254. // push the new state to S3
  255. newStateBytes, err := json.Marshal(currState)
  256. if err != nil {
  257. return err
  258. }
  259. return config.StorageManager.WriteFile(infra, "current_state.json", newStateBytes, true)
  260. }
  261. func getOperationStatus(oldState types.TFStateStatus, resources map[string]*types.TFResourceState) types.TFStateStatus {
  262. created := len(resources) >= 1
  263. deleted := oldState != types.TFStateStatusDeleted
  264. errored := false
  265. for _, resource := range resources {
  266. created = created && resource.Status == types.TFResourceCreated && resource.Error == nil
  267. deleted = deleted && resource.Status == types.TFResourceDeleted && resource.Error == nil
  268. errored = errored || resource.Error != nil
  269. }
  270. if created {
  271. return types.TFStateStatusCreated
  272. } else if deleted {
  273. return types.TFStateStatusDeleted
  274. } else if errored {
  275. return types.TFStateStatusErrored
  276. }
  277. // if unknown, return previous state status
  278. return oldState
  279. }
  280. func cleanupStateStream(config *config.Config, client *redis.Client, workspaceID string) error {
  281. streamName := fmt.Sprintf("%s-state", workspaceID)
  282. count, err := client.Del(
  283. context.Background(),
  284. streamName,
  285. ).Result()
  286. if err != nil {
  287. return err
  288. }
  289. if count != 1 {
  290. return fmt.Errorf("count of deleted stream keys was not 1")
  291. }
  292. return nil
  293. }
  294. func pushLogsToStorage(config *config.Config, client *redis.Client, infra *models.Infra, workspaceID string) error {
  295. // read all logs from the corresponding stream
  296. lastID := "0-0"
  297. var processed int64 = 0
  298. streamName := fmt.Sprintf("%s-logs", workspaceID)
  299. bytesBuffer := &bytes.Buffer{}
  300. // get the length of the stream being read
  301. length, err := client.XLen(context.Background(), streamName).Result()
  302. if err != nil {
  303. return err
  304. }
  305. for processed != length {
  306. xstream, err := client.XRead(
  307. context.Background(),
  308. &redis.XReadArgs{
  309. Streams: []string{streamName, lastID},
  310. Block: 0,
  311. Count: 50,
  312. },
  313. ).Result()
  314. if err != nil {
  315. return err
  316. }
  317. messages := xstream[0].Messages
  318. lastID = messages[len(messages)-1].ID
  319. // compute the new state
  320. for _, msg := range messages {
  321. processed++
  322. logInter, ok := msg.Values["log"]
  323. if !ok {
  324. continue
  325. }
  326. logBytes, ok := logInter.(string)
  327. if !ok {
  328. continue
  329. }
  330. bytesBuffer.Write([]byte(logBytes))
  331. }
  332. }
  333. // push the logs for that operation to S3
  334. fileBytes, err := io.ReadAll(bytesBuffer)
  335. if err != nil {
  336. return err
  337. }
  338. return config.StorageManager.WriteFile(infra, fmt.Sprintf("%s-logs.txt", workspaceID), fileBytes, false)
  339. }
  340. func cleanupLogStream(config *config.Config, client *redis.Client, infra *models.Infra, workspaceID string) error {
  341. streamName := fmt.Sprintf("%s-logs", workspaceID)
  342. count, err := client.Del(
  343. context.Background(),
  344. streamName,
  345. ).Result()
  346. if err != nil {
  347. return err
  348. }
  349. if count != 1 {
  350. return fmt.Errorf("count of deleted stream keys was not 1")
  351. }
  352. return nil
  353. }