testing.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. package eventstreamtest
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "net/http"
  7. "net/http/httptest"
  8. "testing"
  9. "github.com/aws/aws-sdk-go/aws"
  10. "github.com/aws/aws-sdk-go/aws/session"
  11. "github.com/aws/aws-sdk-go/awstesting/unit"
  12. "github.com/aws/aws-sdk-go/private/protocol"
  13. "github.com/aws/aws-sdk-go/private/protocol/eventstream"
  14. "github.com/aws/aws-sdk-go/private/protocol/eventstream/eventstreamapi"
  15. )
  16. // ServeEventStream provides serving EventStream messages from a HTTP server to
  17. // the client. The events are sent sequentially to the client without delay.
  18. type ServeEventStream struct {
  19. T *testing.T
  20. Events []eventstream.Message
  21. }
  22. func (s ServeEventStream) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  23. w.WriteHeader(http.StatusOK)
  24. encoder := eventstream.NewEncoder(flushWriter{w})
  25. for _, event := range s.Events {
  26. encoder.Encode(event)
  27. }
  28. }
  29. // SetupEventStreamSession creates a HTTP server SDK session for communicating
  30. // with that server to be used for EventStream APIs. If HTTP/2 is enabled the
  31. // server/client will only attempt to use HTTP/2.
  32. func SetupEventStreamSession(
  33. t *testing.T, handler http.Handler, h2 bool,
  34. ) (sess *session.Session, cleanupFn func(), err error) {
  35. server := httptest.NewUnstartedServer(handler)
  36. client := setupServer(server, h2)
  37. cleanupFn = func() {
  38. server.Close()
  39. }
  40. sess, err = session.NewSession(unit.Session.Config, &aws.Config{
  41. Endpoint: &server.URL,
  42. DisableParamValidation: aws.Bool(true),
  43. HTTPClient: client,
  44. // LogLevel: aws.LogLevel(aws.LogDebugWithEventStreamBody),
  45. })
  46. if err != nil {
  47. return nil, nil, err
  48. }
  49. return sess, cleanupFn, nil
  50. }
  51. type flushWriter struct {
  52. w io.Writer
  53. }
  54. func (fw flushWriter) Write(p []byte) (n int, err error) {
  55. n, err = fw.w.Write(p)
  56. if f, ok := fw.w.(http.Flusher); ok {
  57. f.Flush()
  58. }
  59. return
  60. }
  61. // MarshalEventPayload marshals a SDK API shape into its associated wire
  62. // protocol payload.
  63. func MarshalEventPayload(
  64. payloadMarshaler protocol.PayloadMarshaler,
  65. v interface{},
  66. ) []byte {
  67. var w bytes.Buffer
  68. err := payloadMarshaler.MarshalPayload(&w, v)
  69. if err != nil {
  70. panic(fmt.Sprintf("failed to marshal event %T, %v", v, v))
  71. }
  72. return w.Bytes()
  73. }
  74. // EventMessageTypeHeader is an event message type header for specifying an
  75. // event is an message type.
  76. var EventMessageTypeHeader = eventstream.Header{
  77. Name: eventstreamapi.MessageTypeHeader,
  78. Value: eventstream.StringValue(eventstreamapi.EventMessageType),
  79. }
  80. // EventExceptionTypeHeader is an event exception type header for specifying an
  81. // event is an exeption type.
  82. var EventExceptionTypeHeader = eventstream.Header{
  83. Name: eventstreamapi.MessageTypeHeader,
  84. Value: eventstream.StringValue(eventstreamapi.ExceptionMessageType),
  85. }