future.go 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
  1. package source
  2. // ResultDecoder[T] is a function that decodes a `QueryResult` into a `*T` type.
  3. type ResultDecoder[T any] func(*QueryResult) *T
  4. // Future[T] is a async future/promise/task that will resolve into a []*T return or
  5. // error when awaited. This type provides a type-safe way of interfacing with `QueryResultsChan`
  6. // via a `ResultDecoder[T]` function.
  7. type Future[T any] struct {
  8. decoder ResultDecoder[T]
  9. resultsChan QueryResultsChan
  10. }
  11. // NewFuture Creates a new `Future[T]` with the given `ResultDecoder[T]` and `QueryResultsChan`.
  12. func NewFuture[T any](decoder ResultDecoder[T], resultsChan QueryResultsChan) *Future[T] {
  13. return &Future[T]{
  14. decoder: decoder,
  15. resultsChan: resultsChan,
  16. }
  17. }
  18. // awaitWith allows internal callers to pass an error collector for grouping futures
  19. func (f *Future[T]) awaitWith(errorCollector *QueryErrorCollector) ([]*T, error) {
  20. defer close(f.resultsChan)
  21. result := <-f.resultsChan
  22. q := result.Query
  23. err := result.Error
  24. if err != nil {
  25. errorCollector.AppendError(&QueryError{Query: q, Error: err})
  26. return nil, err
  27. }
  28. decoded := DecodeAll(result.Results, f.decoder)
  29. return decoded, nil
  30. }
  31. // Await blocks and waits for the `Future` to resolve, and returns the results if successful, or an error
  32. // otherwise.
  33. func (f *Future[T]) Await() ([]*T, error) {
  34. results, err := f.resultsChan.Await()
  35. if err != nil {
  36. return nil, err
  37. }
  38. decoded := DecodeAll(results, f.decoder)
  39. return decoded, nil
  40. }