querygroup.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. package source
  2. // QueryGroupAsyncResult is a representation of a single async query in a group.
  3. type QueryGroupAsyncResult struct {
  4. errorCollector *QueryErrorCollector
  5. resultsChan QueryResultsChan
  6. }
  7. // newQueryGroupAsyncResult creates a new QueryGroupAsyncResult with the given error collector and results channel.
  8. func newQueryGroupAsyncResult(collector *QueryErrorCollector, resultsChan QueryResultsChan) *QueryGroupAsyncResult {
  9. return &QueryGroupAsyncResult{
  10. errorCollector: collector,
  11. resultsChan: resultsChan,
  12. }
  13. }
  14. // Await blocks and waits for the `QueryGroupAsyncResult` to resolve, and returns a slice of generic `QueryResult`
  15. // instances if successful, or an error otherwise.
  16. func (qgar *QueryGroupAsyncResult) Await() ([]*QueryResult, error) {
  17. defer close(qgar.resultsChan)
  18. result := <-qgar.resultsChan
  19. q := result.Query
  20. err := result.Error
  21. if err != nil {
  22. qgar.errorCollector.AppendError(&QueryError{Query: q, Error: err})
  23. return nil, err
  24. }
  25. return result.Results, nil
  26. }
  27. // QueryGroupFuture[T] is a representation of a single async query in a group with a typed result.
  28. type QueryGroupFuture[T any] struct {
  29. errorCollector *QueryErrorCollector
  30. future *Future[T]
  31. }
  32. // WithGroup creates a new QueryGroupFuture[T] instance with the given QueryGroup and Future instances.
  33. // This is the specific way to add a typed `Future[T]` to a `QueryGroup`.
  34. func WithGroup[T any](g *QueryGroup, f *Future[T]) *QueryGroupFuture[T] {
  35. return &QueryGroupFuture[T]{
  36. errorCollector: g.errorCollector,
  37. future: f,
  38. }
  39. }
  40. // Await blocks and waits for the `QueryGroupFuture[T]` to resolve, and returns a slice of `*T` instances if successful,
  41. // or an error otherwise.
  42. func (qgf *QueryGroupFuture[T]) Await() ([]*T, error) {
  43. return qgf.future.awaitWith(qgf.errorCollector)
  44. }
  45. // QueryGroup is a representation of multiple async queries. It provides a shared error collector
  46. // for all queries in the group.
  47. //
  48. // Example:
  49. //
  50. // grp := NewQueryGroup()
  51. // q1 := WithGroup(grp, QueryFoo())
  52. // q2 := WithGroup(grp, QueryBar())
  53. //
  54. // results1, _ := q1.Await()
  55. // results2, _ := q2.Await()
  56. //
  57. // if grp.HasErrors() {
  58. // return grp.Error() // <-- error return type
  59. // }
  60. type QueryGroup struct {
  61. errorCollector *QueryErrorCollector
  62. }
  63. // NewQueryGroup creates a new QueryGroup instance which can be used to group non-typed async queries with
  64. // the `With(QueryResultsChan)` instance method, or with the package function `WithGroup[T](*QueryGroup, *Future[T])`
  65. func NewQueryGroup() *QueryGroup {
  66. var errorCollector QueryErrorCollector
  67. return &QueryGroup{
  68. errorCollector: &errorCollector,
  69. }
  70. }
  71. // With adds the given `QueryResultsChan` to the QueryGroup instance and returns a `QueryGroupAsyncResult` instance to be
  72. // awaited
  73. func (qg *QueryGroup) With(resultsChan QueryResultsChan) *QueryGroupAsyncResult {
  74. return newQueryGroupAsyncResult(qg.errorCollector, resultsChan)
  75. }
  76. // HasErrors returns true if any of the async queries in the group have errored. Note that all results must be awaited
  77. // in order to be checked for errors.
  78. func (qg *QueryGroup) HasErrors() bool {
  79. return qg.errorCollector.IsError()
  80. }
  81. // Error returns nil if there were no errors in the group. Otherwise, it returns all of the errors that occurred grouped
  82. // into a single error.
  83. func (qg *QueryGroup) Error() error {
  84. if !qg.errorCollector.IsError() {
  85. var err error
  86. return err
  87. }
  88. return qg.errorCollector
  89. }
  90. // Errors returns all of individual errors that occurred in the group.
  91. func (qg *QueryGroup) Errors() []*QueryError {
  92. return qg.errorCollector.Errors()
  93. }