future.go 953 B

123456789101112131415161718192021222324252627282930313233343536373839404142
  1. package source
  2. type ResultDecoder[T any] func(*QueryResult) *T
  3. type Future[T any] struct {
  4. decoder ResultDecoder[T]
  5. resultsChan QueryResultsChan
  6. }
  7. func NewFuture[T any](decoder ResultDecoder[T], resultsChan QueryResultsChan) *Future[T] {
  8. return &Future[T]{
  9. decoder: decoder,
  10. resultsChan: resultsChan,
  11. }
  12. }
  13. // awaitWith allows internal callers to pass an error collector for grouping futures
  14. func (f *Future[T]) awaitWith(errorCollector *QueryErrorCollector) ([]*T, error) {
  15. defer close(f.resultsChan)
  16. result := <-f.resultsChan
  17. q := result.Query
  18. err := result.Error
  19. if err != nil {
  20. errorCollector.AppendError(&QueryError{Query: q, Error: err})
  21. return nil, err
  22. }
  23. decoded := DecodeAll(result.Results, f.decoder)
  24. return decoded, nil
  25. }
  26. func (f *Future[T]) Await() ([]*T, error) {
  27. results, err := f.resultsChan.Await()
  28. if err != nil {
  29. return nil, err
  30. }
  31. decoded := DecodeAll(results, f.decoder)
  32. return decoded, nil
  33. }