resource_stream.go 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. package redis_stream
  2. import (
  3. "context"
  4. redis "github.com/go-redis/redis/v8"
  5. "github.com/porter-dev/porter/api/server/shared/websocket"
  6. )
  7. // ResourceStream performs an XREAD operation on the given stream and outputs it to the given websocket conn.
  8. func ResourceStream(client *redis.Client, streamName string, rw *websocket.WebsocketSafeReadWriter) error {
  9. errorchan := make(chan error)
  10. go func() {
  11. defer func() {
  12. if r := recover(); r != nil {
  13. // TODO: add method to alert on panic
  14. return
  15. }
  16. }()
  17. // listens for websocket closing handshake
  18. for {
  19. if _, _, err := rw.ReadMessage(); err != nil {
  20. errorchan <- nil
  21. return
  22. }
  23. }
  24. }()
  25. go func() {
  26. defer func() {
  27. if r := recover(); r != nil {
  28. // TODO: add method to alert on panic
  29. return
  30. }
  31. }()
  32. lastID := "0-0"
  33. for {
  34. xstream, err := client.XRead(
  35. context.Background(),
  36. &redis.XReadArgs{
  37. Streams: []string{streamName, lastID},
  38. Block: 0,
  39. },
  40. ).Result()
  41. if err != nil {
  42. return
  43. }
  44. messages := xstream[0].Messages
  45. lastID = messages[len(messages)-1].ID
  46. rw.WriteJSON(messages)
  47. }
  48. }()
  49. for {
  50. select {
  51. case err := <-errorchan:
  52. close(errorchan)
  53. client.Close()
  54. return err
  55. }
  56. }
  57. }