querygroup.go 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. package source
  2. type QueryGroup struct {
  3. errorCollector *QueryErrorCollector
  4. }
  5. type QueryGroupAsyncResult struct {
  6. errorCollector *QueryErrorCollector
  7. resultsChan QueryResultsChan
  8. }
  9. type QueryGroupFuture[T any] struct {
  10. errorCollector *QueryErrorCollector
  11. future *Future[T]
  12. }
  13. func WithGroup[T any](g *QueryGroup, f *Future[T]) *QueryGroupFuture[T] {
  14. return &QueryGroupFuture[T]{
  15. errorCollector: g.errorCollector,
  16. future: f,
  17. }
  18. }
  19. func (qgf *QueryGroupFuture[T]) Await() ([]*T, error) {
  20. return qgf.future.awaitWith(qgf.errorCollector)
  21. }
  22. func NewQueryGroup() *QueryGroup {
  23. var errorCollector QueryErrorCollector
  24. return &QueryGroup{
  25. errorCollector: &errorCollector,
  26. }
  27. }
  28. func (qg *QueryGroup) With(resultsChan QueryResultsChan) *QueryGroupAsyncResult {
  29. return newQueryGroupAsyncResult(qg.errorCollector, resultsChan)
  30. }
  31. func (qg *QueryGroup) HasErrors() bool {
  32. return qg.errorCollector.IsError()
  33. }
  34. func (qg *QueryGroup) Error() error {
  35. return qg.errorCollector
  36. }
  37. func (qg *QueryGroup) Errors() []*QueryError {
  38. return qg.errorCollector.Errors()
  39. }
  40. func newQueryGroupAsyncResult(collector *QueryErrorCollector, resultsChan QueryResultsChan) *QueryGroupAsyncResult {
  41. return &QueryGroupAsyncResult{
  42. errorCollector: collector,
  43. resultsChan: resultsChan,
  44. }
  45. }
  46. func (qgar *QueryGroupAsyncResult) Await() ([]*QueryResult, error) {
  47. defer close(qgar.resultsChan)
  48. result := <-qgar.resultsChan
  49. q := result.Query
  50. err := result.Error
  51. if err != nil {
  52. qgar.errorCollector.AppendError(&QueryError{Query: q, Error: err})
  53. return nil, err
  54. }
  55. return result.Results, nil
  56. }