future.go 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. package source
  2. // ResultDecoder[T] is a function that decodes a `QueryResult` into a `*T` type.
  3. type ResultDecoder[T any] func(*QueryResult) *T
  4. // Future[T] is a async future/promise/task that will resolve into a []*T return or
  5. // error when awaited. This type provides a type-safe way of interfacing with `QueryResultsChan`
  6. // via a `ResultDecoder[T]` function.
  7. type Future[T any] struct {
  8. decoder ResultDecoder[T]
  9. resultsChan QueryResultsChan
  10. // results is set when we use a passthrough
  11. results []*T
  12. }
  13. // NewFuture Creates a new `Future[T]` with the given `ResultDecoder[T]` and `QueryResultsChan`.
  14. func NewFuture[T any](decoder ResultDecoder[T], resultsChan QueryResultsChan) *Future[T] {
  15. return &Future[T]{
  16. decoder: decoder,
  17. resultsChan: resultsChan,
  18. }
  19. }
  20. // NewFutureFrom accepts a result set to wrap in the a Future implementation for passthrough.
  21. func NewFutureFrom[T any](results []*T) *Future[T] {
  22. return &Future[T]{
  23. results: results,
  24. }
  25. }
  26. // awaitWith allows internal callers to pass an error collector for grouping futures
  27. func (f *Future[T]) awaitWith(errorCollector *QueryErrorCollector) ([]*T, error) {
  28. if f.results != nil {
  29. return f.results, nil
  30. }
  31. defer close(f.resultsChan)
  32. result := <-f.resultsChan
  33. q := result.Query
  34. err := result.Error
  35. if err != nil {
  36. errorCollector.AppendError(&QueryError{Query: q, Error: err})
  37. return nil, err
  38. }
  39. decoded := DecodeAll(result.Results, f.decoder)
  40. return decoded, nil
  41. }
  42. // Await blocks and waits for the `Future` to resolve, and returns the results if successful, or an error
  43. // otherwise.
  44. func (f *Future[T]) Await() ([]*T, error) {
  45. // in the event that we have a resolved future, we can return the results directly
  46. if f.results != nil {
  47. return f.results, nil
  48. }
  49. results, err := f.resultsChan.Await()
  50. if err != nil {
  51. return nil, err
  52. }
  53. decoded := DecodeAll(results, f.decoder)
  54. return decoded, nil
  55. }