resource_stream.go 1.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. package provisioner
  2. import (
  3. "context"
  4. "fmt"
  5. redis "github.com/go-redis/redis/v8"
  6. "github.com/gorilla/websocket"
  7. )
  8. // ResourceStream performs an XREAD operation on the given stream and outputs it to the given websocket conn.
  9. func ResourceStream(client *redis.Client, streamName string, conn *websocket.Conn) error {
  10. errorchan := make(chan error)
  11. go func() {
  12. // listens for websocket closing handshake
  13. for {
  14. _, _, err := conn.ReadMessage()
  15. if err != nil {
  16. defer conn.Close()
  17. errorchan <- err
  18. return
  19. }
  20. }
  21. }()
  22. go func() {
  23. lastID := "0-0"
  24. for {
  25. xstream, err := client.XRead(
  26. context.Background(),
  27. &redis.XReadArgs{
  28. Streams: []string{streamName, lastID},
  29. Block: 0,
  30. },
  31. ).Result()
  32. if err != nil {
  33. fmt.Println("ERROR XREAD", err)
  34. return
  35. }
  36. messages := xstream[0].Messages
  37. lastID = messages[len(messages)-1].ID
  38. if writeErr := conn.WriteJSON(messages); writeErr != nil {
  39. errorchan <- writeErr
  40. return
  41. }
  42. }
  43. }()
  44. for {
  45. select {
  46. case err := <-errorchan:
  47. close(errorchan)
  48. client.Close()
  49. return err
  50. }
  51. }
  52. }