resource_stream.go 1.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
  1. package provisioner
  2. import (
  3. "context"
  4. redis "github.com/go-redis/redis/v8"
  5. "github.com/porter-dev/porter/api/server/shared/websocket"
  6. )
  7. // ResourceStream performs an XREAD operation on the given stream and outputs it to the given websocket conn.
  8. func ResourceStream(client *redis.Client, streamName string, rw *websocket.WebsocketSafeReadWriter) error {
  9. errorchan := make(chan error)
  10. go func() {
  11. // listens for websocket closing handshake
  12. for {
  13. if _, _, err := rw.ReadMessage(); err != nil {
  14. errorchan <- nil
  15. return
  16. }
  17. }
  18. }()
  19. go func() {
  20. lastID := "0-0"
  21. for {
  22. xstream, err := client.XRead(
  23. context.Background(),
  24. &redis.XReadArgs{
  25. Streams: []string{streamName, lastID},
  26. Block: 0,
  27. },
  28. ).Result()
  29. if err != nil {
  30. return
  31. }
  32. messages := xstream[0].Messages
  33. lastID = messages[len(messages)-1].ID
  34. rw.WriteJSONWithChannel(messages, errorchan)
  35. }
  36. }()
  37. for {
  38. select {
  39. case err := <-errorchan:
  40. close(errorchan)
  41. client.Close()
  42. return err
  43. }
  44. }
  45. }