nats.go 1.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. package nats
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "github.com/nats-io/nats.go"
  7. )
  8. // NATS holds a connection to a NATS cluster
  9. type NATS struct {
  10. NatsConnection *nats.Conn
  11. JetStream nats.JetStreamContext
  12. }
  13. // Config contains the config required to setup a connection to a NATS cluster
  14. type Config struct {
  15. URL string
  16. }
  17. // NewConnection creates a new nats and JetStream connection
  18. func NewConnection(ctx context.Context, conf Config) (NATS, error) {
  19. var n NATS
  20. url := conf.URL
  21. if url == "" {
  22. url = nats.DefaultURL
  23. }
  24. nc, err := nats.Connect(conf.URL)
  25. if err != nil {
  26. return n, err
  27. }
  28. if nc == nil {
  29. return n, errors.New("nats connection was not obtained")
  30. }
  31. if len(nc.Servers()) == 0 {
  32. return n, errors.New("nats connection was not obtained, no servers added")
  33. }
  34. n.NatsConnection = nc
  35. js, err := nc.JetStream()
  36. if err != nil {
  37. return n, fmt.Errorf("jetstream connection was not obtained - %w", err)
  38. }
  39. ai, err := js.AccountInfo()
  40. if err != nil {
  41. return n, fmt.Errorf("jetstream connection was not obtained, no account info returned - %w", err)
  42. }
  43. if ai == nil {
  44. return n, fmt.Errorf("unable to get jetsteam")
  45. }
  46. n.JetStream = js
  47. return n, nil
  48. }