resource_stream.go 1.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. package provisioner
  2. import (
  3. "context"
  4. redis "github.com/go-redis/redis/v8"
  5. "github.com/gorilla/websocket"
  6. )
  7. // ResourceStream performs an XREAD operation on the given stream and outputs it to the given websocket conn.
  8. func ResourceStream(client *redis.Client, streamName string, conn *websocket.Conn) error {
  9. errorchan := make(chan error)
  10. go func() {
  11. // listens for websocket closing handshake
  12. for {
  13. _, _, err := conn.ReadMessage()
  14. if err != nil {
  15. defer conn.Close()
  16. errorchan <- err
  17. return
  18. }
  19. }
  20. }()
  21. go func() {
  22. lastID := "0-0"
  23. for {
  24. xstream, err := client.XRead(
  25. context.Background(),
  26. &redis.XReadArgs{
  27. Streams: []string{streamName, lastID},
  28. Block: 0,
  29. },
  30. ).Result()
  31. if err != nil {
  32. return
  33. }
  34. messages := xstream[0].Messages
  35. lastID = messages[len(messages)-1].ID
  36. if writeErr := conn.WriteJSON(messages); writeErr != nil {
  37. errorchan <- writeErr
  38. return
  39. }
  40. }
  41. }()
  42. for {
  43. select {
  44. case err := <-errorchan:
  45. close(errorchan)
  46. client.Close()
  47. return err
  48. }
  49. }
  50. }