eventstream.go 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182
  1. // +build codegen
  2. package api
  3. import (
  4. "bytes"
  5. "fmt"
  6. "io"
  7. "strings"
  8. "text/template"
  9. )
  10. // EventStreamAPI provides details about the event stream async API and
  11. // associated EventStream shapes.
  12. type EventStreamAPI struct {
  13. API *API
  14. Name string
  15. Operation *Operation
  16. Shape *Shape
  17. Inbound *EventStream
  18. Outbound *EventStream
  19. }
  20. // EventStream represents a single eventstream group (input/output) and the
  21. // modeled events that are known for the stream.
  22. type EventStream struct {
  23. Name string
  24. Shape *Shape
  25. Events []*Event
  26. Exceptions []*Event
  27. }
  28. // Event is a single EventStream event that can be sent or received in an
  29. // EventStream.
  30. type Event struct {
  31. Name string
  32. Shape *Shape
  33. For *EventStream
  34. }
  35. // ShapeDoc returns the docstring for the EventStream API.
  36. func (esAPI *EventStreamAPI) ShapeDoc() string {
  37. tmpl := template.Must(template.New("eventStreamShapeDoc").Parse(`
  38. {{- $.Name }} provides handling of EventStreams for
  39. the {{ $.Operation.ExportedName }} API.
  40. {{- if $.Inbound }}
  41. Use this type to receive {{ $.Inbound.Name }} events. The events
  42. can be read from the Events channel member.
  43. The events that can be received are:
  44. {{ range $_, $event := $.Inbound.Events }}
  45. * {{ $event.Shape.ShapeName }}
  46. {{- end }}
  47. {{- end }}
  48. {{- if $.Outbound }}
  49. Use this type to send {{ $.Outbound.Name }} events. The events
  50. can be sent with the Send method.
  51. The events that can be sent are:
  52. {{ range $_, $event := $.Outbound.Events -}}
  53. * {{ $event.Shape.ShapeName }}
  54. {{- end }}
  55. {{- end }}`))
  56. var w bytes.Buffer
  57. if err := tmpl.Execute(&w, esAPI); err != nil {
  58. panic(fmt.Sprintf("failed to generate eventstream shape template for %v, %v", esAPI.Name, err))
  59. }
  60. return commentify(w.String())
  61. }
  62. func hasEventStream(topShape *Shape) bool {
  63. for _, ref := range topShape.MemberRefs {
  64. if ref.Shape.IsEventStream {
  65. return true
  66. }
  67. }
  68. return false
  69. }
  70. func eventStreamAPIShapeRefDoc(refName string) string {
  71. return commentify(fmt.Sprintf("Use %s to use the API's stream.", refName))
  72. }
  73. func (a *API) setupEventStreams() {
  74. const eventStreamMemberName = "EventStream"
  75. for _, op := range a.Operations {
  76. outbound := setupEventStream(op.InputRef.Shape)
  77. inbound := setupEventStream(op.OutputRef.Shape)
  78. if outbound == nil && inbound == nil {
  79. continue
  80. }
  81. if outbound != nil {
  82. panic(fmt.Sprintf("Outbound stream support not implemented, %s, %s",
  83. outbound.Name, outbound.Shape.ShapeName))
  84. }
  85. switch a.Metadata.Protocol {
  86. case `rest-json`, `rest-xml`, `json`:
  87. default:
  88. panic(fmt.Sprintf("EventStream not supported for protocol %v",
  89. a.Metadata.Protocol))
  90. }
  91. op.EventStreamAPI = &EventStreamAPI{
  92. API: a,
  93. Name: op.ExportedName + eventStreamMemberName,
  94. Operation: op,
  95. Outbound: outbound,
  96. Inbound: inbound,
  97. }
  98. streamShape := &Shape{
  99. API: a,
  100. ShapeName: op.EventStreamAPI.Name,
  101. Documentation: op.EventStreamAPI.ShapeDoc(),
  102. Type: "structure",
  103. EventStreamAPI: op.EventStreamAPI,
  104. IsEventStream: true,
  105. MemberRefs: map[string]*ShapeRef{
  106. "Inbound": &ShapeRef{
  107. ShapeName: inbound.Shape.ShapeName,
  108. },
  109. },
  110. }
  111. inbound.Shape.refs = append(inbound.Shape.refs, streamShape.MemberRefs["Inbound"])
  112. streamShapeRef := &ShapeRef{
  113. API: a,
  114. ShapeName: streamShape.ShapeName,
  115. Shape: streamShape,
  116. Documentation: eventStreamAPIShapeRefDoc(eventStreamMemberName),
  117. }
  118. streamShape.refs = []*ShapeRef{streamShapeRef}
  119. op.EventStreamAPI.Shape = streamShape
  120. if _, ok := op.OutputRef.Shape.MemberRefs[eventStreamMemberName]; ok {
  121. panic(fmt.Sprintf("shape ref already exists, %s.%s",
  122. op.OutputRef.Shape.ShapeName, eventStreamMemberName))
  123. }
  124. op.OutputRef.Shape.MemberRefs[eventStreamMemberName] = streamShapeRef
  125. op.OutputRef.Shape.EventStreamsMemberName = eventStreamMemberName
  126. if _, ok := a.Shapes[streamShape.ShapeName]; ok {
  127. panic("shape already exists, " + streamShape.ShapeName)
  128. }
  129. a.Shapes[streamShape.ShapeName] = streamShape
  130. a.HasEventStream = true
  131. }
  132. }
  133. func setupEventStream(topShape *Shape) *EventStream {
  134. var eventStream *EventStream
  135. for refName, ref := range topShape.MemberRefs {
  136. if !ref.Shape.IsEventStream {
  137. continue
  138. }
  139. if eventStream != nil {
  140. panic(fmt.Sprintf("multiple shape ref eventstreams, %s, prev: %s",
  141. refName, eventStream.Name))
  142. }
  143. eventStream = &EventStream{
  144. Name: ref.Shape.ShapeName,
  145. Shape: ref.Shape,
  146. }
  147. if topShape.API.Metadata.Protocol == "json" {
  148. topShape.EventFor = append(topShape.EventFor, eventStream)
  149. }
  150. for _, eventRefName := range ref.Shape.MemberNames() {
  151. eventRef := ref.Shape.MemberRefs[eventRefName]
  152. if !(eventRef.Shape.IsEvent || eventRef.Shape.Exception) {
  153. panic(fmt.Sprintf("unexpected non-event member reference %s.%s",
  154. ref.Shape.ShapeName, eventRefName))
  155. }
  156. updateEventPayloadRef(eventRef.Shape)
  157. eventRef.Shape.EventFor = append(eventRef.Shape.EventFor, eventStream)
  158. // Exceptions and events are two different lists to allow the SDK
  159. // to easly generate code with the two handled differently.
  160. event := &Event{
  161. Name: eventRefName,
  162. Shape: eventRef.Shape,
  163. For: eventStream,
  164. }
  165. if eventRef.Shape.Exception {
  166. eventStream.Exceptions = append(eventStream.Exceptions, event)
  167. } else {
  168. eventStream.Events = append(eventStream.Events, event)
  169. }
  170. }
  171. // Remove the eventstream references as they will be added elsewhere.
  172. ref.Shape.removeRef(ref)
  173. delete(topShape.MemberRefs, refName)
  174. delete(topShape.API.Shapes, ref.Shape.ShapeName)
  175. }
  176. return eventStream
  177. }
  178. func updateEventPayloadRef(parent *Shape) {
  179. refName := parent.PayloadRefName()
  180. if len(refName) == 0 {
  181. return
  182. }
  183. payloadRef := parent.MemberRefs[refName]
  184. if payloadRef.Shape.Type == "blob" {
  185. return
  186. }
  187. if len(payloadRef.LocationName) != 0 {
  188. return
  189. }
  190. payloadRef.LocationName = refName
  191. }
  192. func renderEventStreamAPIShape(w io.Writer, s *Shape) error {
  193. // Imports needed by the EventStream APIs.
  194. s.API.AddImport("fmt")
  195. s.API.AddImport("bytes")
  196. s.API.AddImport("io")
  197. s.API.AddImport("sync")
  198. s.API.AddImport("sync/atomic")
  199. s.API.AddSDKImport("aws")
  200. s.API.AddSDKImport("aws/awserr")
  201. s.API.AddSDKImport("private/protocol/eventstream")
  202. s.API.AddSDKImport("private/protocol/eventstream/eventstreamapi")
  203. return eventStreamAPIShapeTmpl.Execute(w, s)
  204. }
  205. // Template for an EventStream API Shape that will provide read/writing events
  206. // across the EventStream. This is a special shape that's only public members
  207. // are the Events channel and a Close and Err method.
  208. //
  209. // Executed in the context of a Shape.
  210. var eventStreamAPIShapeTmpl = func() *template.Template {
  211. t := template.Must(
  212. template.New("eventStreamAPIShapeTmpl").
  213. Funcs(template.FuncMap{}).
  214. Parse(eventStreamAPITmplDef),
  215. )
  216. template.Must(
  217. t.AddParseTree(
  218. "eventStreamAPIReaderTmpl", eventStreamAPIReaderTmpl.Tree),
  219. )
  220. return t
  221. }()
  222. const eventStreamAPITmplDef = `
  223. {{ $.Documentation }}
  224. type {{ $.ShapeName }} struct {
  225. {{- if $.EventStreamAPI.Inbound }}
  226. // Reader is the EventStream reader for the {{ $.EventStreamAPI.Inbound.Name }}
  227. // events. This value is automatically set by the SDK when the API call is made
  228. // Use this member when unit testing your code with the SDK to mock out the
  229. // EventStream Reader.
  230. //
  231. // Must not be nil.
  232. Reader {{ $.ShapeName }}Reader
  233. {{ end -}}
  234. {{- if $.EventStreamAPI.Outbound }}
  235. // Writer is the EventStream reader for the {{ $.EventStreamAPI.Inbound.Name }}
  236. // events. This value is automatically set by the SDK when the API call is made
  237. // Use this member when unit testing your code with the SDK to mock out the
  238. // EventStream Writer.
  239. //
  240. // Must not be nil.
  241. Writer *{{ $.ShapeName }}Writer
  242. {{ end -}}
  243. // StreamCloser is the io.Closer for the EventStream connection. For HTTP
  244. // EventStream this is the response Body. The stream will be closed when
  245. // the Close method of the EventStream is called.
  246. StreamCloser io.Closer
  247. }
  248. // Close closes the EventStream. This will also cause the Events channel to be
  249. // closed. You can use the closing of the Events channel to terminate your
  250. // application's read from the API's EventStream.
  251. {{- if $.EventStreamAPI.Inbound }}
  252. //
  253. // Will close the underlying EventStream reader. For EventStream over HTTP
  254. // connection this will also close the HTTP connection.
  255. {{ end -}}
  256. //
  257. // Close must be called when done using the EventStream API. Not calling Close
  258. // may result in resource leaks.
  259. func (es *{{ $.ShapeName }}) Close() (err error) {
  260. {{- if $.EventStreamAPI.Inbound }}
  261. es.Reader.Close()
  262. {{ end -}}
  263. {{- if $.EventStreamAPI.Outbound }}
  264. es.Writer.Close()
  265. {{ end -}}
  266. return es.Err()
  267. }
  268. // Err returns any error that occurred while reading EventStream Events from
  269. // the service API's response. Returns nil if there were no errors.
  270. func (es *{{ $.ShapeName }}) Err() error {
  271. {{- if $.EventStreamAPI.Outbound }}
  272. if err := es.Writer.Err(); err != nil {
  273. return err
  274. }
  275. {{ end -}}
  276. {{- if $.EventStreamAPI.Inbound }}
  277. if err := es.Reader.Err(); err != nil {
  278. return err
  279. }
  280. {{ end -}}
  281. es.StreamCloser.Close()
  282. return nil
  283. }
  284. {{ if $.EventStreamAPI.Inbound }}
  285. // Events returns a channel to read EventStream Events from the
  286. // {{ $.EventStreamAPI.Operation.ExportedName }} API.
  287. //
  288. // These events are:
  289. // {{ range $_, $event := $.EventStreamAPI.Inbound.Events }}
  290. // * {{ $event.Shape.ShapeName }}
  291. {{- end }}
  292. func (es *{{ $.ShapeName }}) Events() <-chan {{ $.EventStreamAPI.Inbound.Name }}Event {
  293. return es.Reader.Events()
  294. }
  295. {{ template "eventStreamAPIReaderTmpl" $ }}
  296. {{ end }}
  297. {{ if $.EventStreamAPI.Outbound }}
  298. // TODO writer helper method.
  299. {{ end }}
  300. `
  301. var eventStreamAPIReaderTmpl = template.Must(template.New("eventStreamAPIReaderTmpl").
  302. Funcs(template.FuncMap{}).
  303. Parse(`
  304. // {{ $.EventStreamAPI.Inbound.Name }}Event groups together all EventStream
  305. // events read from the {{ $.EventStreamAPI.Operation.ExportedName }} API.
  306. //
  307. // These events are:
  308. // {{ range $_, $event := $.EventStreamAPI.Inbound.Events }}
  309. // * {{ $event.Shape.ShapeName }}
  310. {{- end }}
  311. type {{ $.EventStreamAPI.Inbound.Name }}Event interface {
  312. event{{ $.EventStreamAPI.Inbound.Name }}()
  313. }
  314. // {{ $.ShapeName }}Reader provides the interface for reading EventStream
  315. // Events from the {{ $.EventStreamAPI.Operation.ExportedName }} API. The
  316. // default implementation for this interface will be {{ $.ShapeName }}.
  317. //
  318. // The reader's Close method must allow multiple concurrent calls.
  319. //
  320. // These events are:
  321. // {{ range $_, $event := $.EventStreamAPI.Inbound.Events }}
  322. // * {{ $event.Shape.ShapeName }}
  323. {{- end }}
  324. type {{ $.ShapeName }}Reader interface {
  325. // Returns a channel of events as they are read from the event stream.
  326. Events() <-chan {{ $.EventStreamAPI.Inbound.Name }}Event
  327. // Close will close the underlying event stream reader. For event stream over
  328. // HTTP this will also close the HTTP connection.
  329. Close() error
  330. // Returns any error that has occurred while reading from the event stream.
  331. Err() error
  332. }
  333. type read{{ $.ShapeName }} struct {
  334. eventReader *eventstreamapi.EventReader
  335. stream chan {{ $.EventStreamAPI.Inbound.Name }}Event
  336. errVal atomic.Value
  337. done chan struct{}
  338. closeOnce sync.Once
  339. {{ if eq $.API.Metadata.Protocol "json" -}}
  340. initResp eventstreamapi.Unmarshaler
  341. {{ end -}}
  342. }
  343. func newRead{{ $.ShapeName }}(
  344. reader io.ReadCloser,
  345. unmarshalers request.HandlerList,
  346. logger aws.Logger,
  347. logLevel aws.LogLevelType,
  348. {{ if eq $.API.Metadata.Protocol "json" -}}
  349. initResp eventstreamapi.Unmarshaler,
  350. {{ end -}}
  351. ) *read{{ $.ShapeName }} {
  352. r := &read{{ $.ShapeName }}{
  353. stream: make(chan {{ $.EventStreamAPI.Inbound.Name }}Event),
  354. done: make(chan struct{}),
  355. {{ if eq $.API.Metadata.Protocol "json" -}}
  356. initResp: initResp,
  357. {{ end -}}
  358. }
  359. r.eventReader = eventstreamapi.NewEventReader(
  360. reader,
  361. protocol.HandlerPayloadUnmarshal{
  362. Unmarshalers: unmarshalers,
  363. },
  364. r.unmarshalerForEventType,
  365. )
  366. r.eventReader.UseLogger(logger, logLevel)
  367. return r
  368. }
  369. // Close will close the underlying event stream reader. For EventStream over
  370. // HTTP this will also close the HTTP connection.
  371. func (r *read{{ $.ShapeName }}) Close() error {
  372. r.closeOnce.Do(r.safeClose)
  373. return r.Err()
  374. }
  375. func (r *read{{ $.ShapeName }}) safeClose() {
  376. close(r.done)
  377. err := r.eventReader.Close()
  378. if err != nil {
  379. r.errVal.Store(err)
  380. }
  381. }
  382. func (r *read{{ $.ShapeName }}) Err() error {
  383. if v := r.errVal.Load(); v != nil {
  384. return v.(error)
  385. }
  386. return nil
  387. }
  388. func (r *read{{ $.ShapeName }}) Events() <-chan {{ $.EventStreamAPI.Inbound.Name }}Event {
  389. return r.stream
  390. }
  391. func (r *read{{ $.ShapeName }}) readEventStream() {
  392. defer close(r.stream)
  393. for {
  394. event, err := r.eventReader.ReadEvent()
  395. if err != nil {
  396. if err == io.EOF {
  397. return
  398. }
  399. select {
  400. case <-r.done:
  401. // If closed already ignore the error
  402. return
  403. default:
  404. }
  405. r.errVal.Store(err)
  406. return
  407. }
  408. select {
  409. case r.stream <- event.({{ $.EventStreamAPI.Inbound.Name }}Event):
  410. case <-r.done:
  411. return
  412. }
  413. }
  414. }
  415. func (r *read{{ $.ShapeName }}) unmarshalerForEventType(
  416. eventType string,
  417. ) (eventstreamapi.Unmarshaler, error) {
  418. switch eventType {
  419. {{- if eq $.API.Metadata.Protocol "json" }}
  420. case "initial-response":
  421. return r.initResp, nil
  422. {{ end -}}
  423. {{- range $_, $event := $.EventStreamAPI.Inbound.Events }}
  424. case {{ printf "%q" $event.Name }}:
  425. return &{{ $event.Shape.ShapeName }}{}, nil
  426. {{ end -}}
  427. {{- range $_, $event := $.EventStreamAPI.Inbound.Exceptions }}
  428. case {{ printf "%q" $event.Name }}:
  429. return &{{ $event.Shape.ShapeName }}{}, nil
  430. {{ end -}}
  431. default:
  432. return nil, awserr.New(
  433. request.ErrCodeSerialization,
  434. fmt.Sprintf("unknown event type name, %s, for {{ $.ShapeName }}", eventType),
  435. nil,
  436. )
  437. }
  438. }
  439. `))
  440. // Template for the EventStream API Output shape that contains the EventStream
  441. // member.
  442. //
  443. // Executed in the context of a Shape.
  444. var eventStreamAPILoopMethodTmpl = template.Must(
  445. template.New("eventStreamAPILoopMethodTmpl").Parse(`
  446. func (s *{{ $.ShapeName }}) runEventStreamLoop(r *request.Request) {
  447. if r.Error != nil {
  448. return
  449. }
  450. {{- $esMemberRef := index $.MemberRefs $.EventStreamsMemberName }}
  451. {{- if $esMemberRef.Shape.EventStreamAPI.Inbound }}
  452. reader := newRead{{ $esMemberRef.ShapeName }}(
  453. r.HTTPResponse.Body,
  454. r.Handlers.UnmarshalStream,
  455. r.Config.Logger,
  456. r.Config.LogLevel.Value(),
  457. {{ if eq $.API.Metadata.Protocol "json" -}}
  458. s,
  459. {{ end -}}
  460. )
  461. go reader.readEventStream()
  462. eventStream := &{{ $esMemberRef.ShapeName }} {
  463. StreamCloser: r.HTTPResponse.Body,
  464. Reader: reader,
  465. }
  466. {{ end -}}
  467. s.{{ $.EventStreamsMemberName }} = eventStream
  468. }
  469. {{ if eq $.API.Metadata.Protocol "json" -}}
  470. func (s *{{ $.ShapeName }}) unmarshalInitialResponse(r *request.Request) {
  471. // Wait for the initial response event, which must be the first event to be
  472. // received from the API.
  473. select {
  474. case event, ok := <-s.EventStream.Events():
  475. if !ok {
  476. return
  477. }
  478. es := s.EventStream
  479. v, ok := event.(*{{ $.ShapeName }})
  480. if !ok || v == nil {
  481. r.Error = awserr.New(
  482. request.ErrCodeSerialization,
  483. fmt.Sprintf("invalid event, %T, expect *SubscribeToShardOutput, %v", event, v),
  484. nil,
  485. )
  486. return
  487. }
  488. *s = *v
  489. s.EventStream = es
  490. }
  491. }
  492. {{ end -}}
  493. `))
  494. // EventStreamHeaderTypeMap provides the mapping of a EventStream Header's
  495. // Value type to the shape reference's member type.
  496. type EventStreamHeaderTypeMap struct {
  497. Header string
  498. Member string
  499. }
  500. var eventStreamEventShapeTmplFuncs = template.FuncMap{
  501. "EventStreamHeaderTypeMap": func(ref *ShapeRef) EventStreamHeaderTypeMap {
  502. switch ref.Shape.Type {
  503. case "boolean":
  504. return EventStreamHeaderTypeMap{Header: "bool", Member: "bool"}
  505. case "byte":
  506. return EventStreamHeaderTypeMap{Header: "int8", Member: "int64"}
  507. case "short":
  508. return EventStreamHeaderTypeMap{Header: "int16", Member: "int64"}
  509. case "integer":
  510. return EventStreamHeaderTypeMap{Header: "int32", Member: "int64"}
  511. case "long":
  512. return EventStreamHeaderTypeMap{Header: "int64", Member: "int64"}
  513. case "timestamp":
  514. return EventStreamHeaderTypeMap{Header: "time.Time", Member: "time.Time"}
  515. case "blob":
  516. return EventStreamHeaderTypeMap{Header: "[]byte", Member: "[]byte"}
  517. case "string":
  518. return EventStreamHeaderTypeMap{Header: "string", Member: "string"}
  519. // TODO case "uuid" what is modeled type
  520. default:
  521. panic("unsupported EventStream header type, " + ref.Shape.Type)
  522. }
  523. },
  524. "HasNonBlobPayloadMembers": eventHasNonBlobPayloadMembers,
  525. }
  526. // Returns if the event has any members which are not the event's blob payload,
  527. // nor a header.
  528. func eventHasNonBlobPayloadMembers(s *Shape) bool {
  529. num := len(s.MemberRefs)
  530. for _, ref := range s.MemberRefs {
  531. if ref.IsEventHeader || (ref.IsEventPayload && (ref.Shape.Type == "blob" || ref.Shape.Type == "string")) {
  532. num--
  533. }
  534. }
  535. return num > 0
  536. }
  537. // Template for an EventStream Event shape. This is a normal API shape that is
  538. // decorated as an EventStream Event.
  539. //
  540. // Executed in the context of a Shape.
  541. var eventStreamEventShapeTmpl = template.Must(template.New("eventStreamEventShapeTmpl").
  542. Funcs(eventStreamEventShapeTmplFuncs).Parse(`
  543. {{ range $_, $eventstream := $.EventFor }}
  544. // The {{ $.ShapeName }} is and event in the {{ $eventstream.Name }} group of events.
  545. func (s *{{ $.ShapeName }}) event{{ $eventstream.Name }}() {}
  546. {{ end }}
  547. // UnmarshalEvent unmarshals the EventStream Message into the {{ $.ShapeName }} value.
  548. // This method is only used internally within the SDK's EventStream handling.
  549. func (s *{{ $.ShapeName }}) UnmarshalEvent(
  550. payloadUnmarshaler protocol.PayloadUnmarshaler,
  551. msg eventstream.Message,
  552. ) error {
  553. {{- range $memName, $memRef := $.MemberRefs }}
  554. {{- if $memRef.IsEventHeader }}
  555. if hv := msg.Headers.Get("{{ $memName }}"); hv != nil {
  556. {{ $types := EventStreamHeaderTypeMap $memRef -}}
  557. v := hv.Get().({{ $types.Header }})
  558. {{- if ne $types.Header $types.Member }}
  559. m := {{ $types.Member }}(v)
  560. s.{{ $memName }} = {{ if $memRef.UseIndirection }}&{{ end }}m
  561. {{- else }}
  562. s.{{ $memName }} = {{ if $memRef.UseIndirection }}&{{ end }}v
  563. {{- end }}
  564. }
  565. {{- else if (and ($memRef.IsEventPayload) (eq $memRef.Shape.Type "blob")) }}
  566. s.{{ $memName }} = make([]byte, len(msg.Payload))
  567. copy(s.{{ $memName }}, msg.Payload)
  568. {{- else if (and ($memRef.IsEventPayload) (eq $memRef.Shape.Type "string")) }}
  569. s.{{ $memName }} = aws.String(string(msg.Payload))
  570. {{- end }}
  571. {{- end }}
  572. {{- if HasNonBlobPayloadMembers $ }}
  573. if err := payloadUnmarshaler.UnmarshalPayload(
  574. bytes.NewReader(msg.Payload), s,
  575. ); err != nil {
  576. return err
  577. }
  578. {{- end }}
  579. return nil
  580. }
  581. `))
  582. var eventStreamExceptionEventShapeTmpl = template.Must(
  583. template.New("eventStreamExceptionEventShapeTmpl").Parse(`
  584. // Code returns the exception type name.
  585. func (s {{ $.ShapeName }}) Code() string {
  586. {{- if $.ErrorInfo.Code }}
  587. return "{{ $.ErrorInfo.Code }}"
  588. {{- else }}
  589. return "{{ $.ShapeName }}"
  590. {{ end -}}
  591. }
  592. // Message returns the exception's message.
  593. func (s {{ $.ShapeName }}) Message() string {
  594. {{- if index $.MemberRefs "Message_" }}
  595. return *s.Message_
  596. {{- else }}
  597. return ""
  598. {{ end -}}
  599. }
  600. // OrigErr always returns nil, satisfies awserr.Error interface.
  601. func (s {{ $.ShapeName }}) OrigErr() error {
  602. return nil
  603. }
  604. func (s {{ $.ShapeName }}) Error() string {
  605. return fmt.Sprintf("%s: %s", s.Code(), s.Message())
  606. }
  607. `))
  608. // APIEventStreamTestGoCode generates Go code for EventStream operation tests.
  609. func (a *API) APIEventStreamTestGoCode() string {
  610. var buf bytes.Buffer
  611. a.resetImports()
  612. a.AddImport("bytes")
  613. a.AddImport("io/ioutil")
  614. a.AddImport("net/http")
  615. a.AddImport("reflect")
  616. a.AddImport("testing")
  617. a.AddImport("time")
  618. a.AddSDKImport("aws")
  619. a.AddSDKImport("aws/corehandlers")
  620. a.AddSDKImport("aws/request")
  621. a.AddSDKImport("aws/awserr")
  622. a.AddSDKImport("awstesting/unit")
  623. a.AddSDKImport("private/protocol")
  624. a.AddSDKImport("private/protocol/", a.ProtocolPackage())
  625. a.AddSDKImport("private/protocol/eventstream")
  626. a.AddSDKImport("private/protocol/eventstream/eventstreamapi")
  627. a.AddSDKImport("private/protocol/eventstream/eventstreamtest")
  628. unused := `
  629. var _ time.Time
  630. var _ awserr.Error
  631. `
  632. if err := eventStreamTestTmpl.Execute(&buf, a); err != nil {
  633. panic(err)
  634. }
  635. return a.importsGoCode() + unused + strings.TrimSpace(buf.String())
  636. }
  637. func valueForType(s *Shape, visited []string) string {
  638. for _, v := range visited {
  639. if v == s.ShapeName {
  640. return "nil"
  641. }
  642. }
  643. visited = append(visited, s.ShapeName)
  644. switch s.Type {
  645. case "blob":
  646. return `[]byte("blob value goes here")`
  647. case "string":
  648. return `aws.String("string value goes here")`
  649. case "boolean":
  650. return `aws.Bool(true)`
  651. case "byte":
  652. return `aws.Int64(1)`
  653. case "short":
  654. return `aws.Int64(12)`
  655. case "integer":
  656. return `aws.Int64(123)`
  657. case "long":
  658. return `aws.Int64(1234)`
  659. case "float":
  660. return `aws.Float64(123.4)`
  661. case "double":
  662. return `aws.Float64(123.45)`
  663. case "timestamp":
  664. return `aws.Time(time.Unix(1396594860, 0).UTC())`
  665. case "structure":
  666. w := bytes.NewBuffer(nil)
  667. fmt.Fprintf(w, "&%s{\n", s.ShapeName)
  668. for _, refName := range s.MemberNames() {
  669. fmt.Fprintf(w, "%s: %s,\n", refName, valueForType(s.MemberRefs[refName].Shape, visited))
  670. }
  671. fmt.Fprintf(w, "}")
  672. return w.String()
  673. case "list":
  674. w := bytes.NewBuffer(nil)
  675. fmt.Fprintf(w, "%s{\n", s.GoType())
  676. for i := 0; i < 3; i++ {
  677. fmt.Fprintf(w, "%s,\n", valueForType(s.MemberRef.Shape, visited))
  678. }
  679. fmt.Fprintf(w, "}")
  680. return w.String()
  681. case "map":
  682. w := bytes.NewBuffer(nil)
  683. fmt.Fprintf(w, "%s{\n", s.GoType())
  684. for _, k := range []string{"a", "b", "c"} {
  685. fmt.Fprintf(w, "%q: %s,\n", k, valueForType(s.ValueRef.Shape, visited))
  686. }
  687. fmt.Fprintf(w, "}")
  688. return w.String()
  689. default:
  690. panic(fmt.Sprintf("valueForType does not support %s, %s", s.ShapeName, s.Type))
  691. }
  692. }
  693. func setEventHeaderValueForType(s *Shape, memVar string) string {
  694. switch s.Type {
  695. case "blob":
  696. return fmt.Sprintf("eventstream.BytesValue(%s)", memVar)
  697. case "string":
  698. return fmt.Sprintf("eventstream.StringValue(*%s)", memVar)
  699. case "boolean":
  700. return fmt.Sprintf("eventstream.BoolValue(*%s)", memVar)
  701. case "byte":
  702. return fmt.Sprintf("eventstream.Int8Value(int8(*%s))", memVar)
  703. case "short":
  704. return fmt.Sprintf("eventstream.Int16Value(int16(*%s))", memVar)
  705. case "integer":
  706. return fmt.Sprintf("eventstream.Int32Value(int32(*%s))", memVar)
  707. case "long":
  708. return fmt.Sprintf("eventstream.Int64Value(*%s)", memVar)
  709. case "float":
  710. return fmt.Sprintf("eventstream.Float32Value(float32(*%s))", memVar)
  711. case "double":
  712. return fmt.Sprintf("eventstream.Float64Value(*%s)", memVar)
  713. case "timestamp":
  714. return fmt.Sprintf("eventstream.TimestampValue(*%s)", memVar)
  715. default:
  716. panic(fmt.Sprintf("value type %s not supported for event headers, %s", s.Type, s.ShapeName))
  717. }
  718. }
  719. func templateMap(args ...interface{}) map[string]interface{} {
  720. if len(args)%2 != 0 {
  721. panic(fmt.Sprintf("invalid map call, non-even args %v", args))
  722. }
  723. m := map[string]interface{}{}
  724. for i := 0; i < len(args); i += 2 {
  725. k, ok := args[i].(string)
  726. if !ok {
  727. panic(fmt.Sprintf("invalid map call, arg is not string, %T, %v", args[i], args[i]))
  728. }
  729. m[k] = args[i+1]
  730. }
  731. return m
  732. }
  733. var eventStreamTestTmpl = template.Must(
  734. template.New("eventStreamTestTmpl").Funcs(template.FuncMap{
  735. "ValueForType": valueForType,
  736. "HasNonBlobPayloadMembers": eventHasNonBlobPayloadMembers,
  737. "SetEventHeaderValueForType": setEventHeaderValueForType,
  738. "Map": templateMap,
  739. "OptionalAddInt": func(do bool, a, b int) int {
  740. if !do {
  741. return a
  742. }
  743. return a + b
  744. },
  745. "HasNonEventStreamMember": func(s *Shape) bool {
  746. for _, ref := range s.MemberRefs {
  747. if !ref.Shape.IsEventStream {
  748. return true
  749. }
  750. }
  751. return false
  752. },
  753. }).Parse(`
  754. {{ range $opName, $op := $.Operations }}
  755. {{ if $op.EventStreamAPI }}
  756. {{ if $op.EventStreamAPI.Inbound }}
  757. {{ template "event stream inbound tests" $op.EventStreamAPI }}
  758. {{ end }}
  759. {{ end }}
  760. {{ end }}
  761. type loopReader struct {
  762. source *bytes.Reader
  763. }
  764. func (c *loopReader) Read(p []byte) (int, error) {
  765. if c.source.Len() == 0 {
  766. c.source.Seek(0, 0)
  767. }
  768. return c.source.Read(p)
  769. }
  770. {{ define "event stream inbound tests" }}
  771. func Test{{ $.Operation.ExportedName }}_Read(t *testing.T) {
  772. expectEvents, eventMsgs := mock{{ $.Operation.ExportedName }}ReadEvents()
  773. sess, cleanupFn, err := eventstreamtest.SetupEventStreamSession(t,
  774. eventstreamtest.ServeEventStream{
  775. T: t,
  776. Events: eventMsgs,
  777. },
  778. true,
  779. )
  780. if err != nil {
  781. t.Fatalf("expect no error, %v", err)
  782. }
  783. defer cleanupFn()
  784. svc := New(sess)
  785. resp, err := svc.{{ $.Operation.ExportedName }}(nil)
  786. if err != nil {
  787. t.Fatalf("expect no error got, %v", err)
  788. }
  789. defer resp.EventStream.Close()
  790. {{- if and (eq $.Operation.API.Metadata.Protocol "json") (HasNonEventStreamMember $.Operation.OutputRef.Shape) }}
  791. expectResp := expectEvents[0].(*{{ $.Operation.OutputRef.Shape.ShapeName }})
  792. {{- range $name, $ref := $.Operation.OutputRef.Shape.MemberRefs }}
  793. {{- if not $ref.Shape.IsEventStream }}
  794. if e, a := expectResp.{{ $name }}, resp.{{ $name }}; !reflect.DeepEqual(e,a) {
  795. t.Errorf("expect %v, got %v", e, a)
  796. }
  797. {{- end }}
  798. {{- end }}
  799. // Trim off response output type pseudo event so only event messages remain.
  800. expectEvents = expectEvents[1:]
  801. {{ end }}
  802. var i int
  803. for event := range resp.EventStream.Events() {
  804. if event == nil {
  805. t.Errorf("%d, expect event, got nil", i)
  806. }
  807. if e, a := expectEvents[i], event; !reflect.DeepEqual(e, a) {
  808. t.Errorf("%d, expect %T %v, got %T %v", i, e, e, a, a)
  809. }
  810. i++
  811. }
  812. if err := resp.EventStream.Err(); err != nil {
  813. t.Errorf("expect no error, %v", err)
  814. }
  815. }
  816. func Test{{ $.Operation.ExportedName }}_ReadClose(t *testing.T) {
  817. _, eventMsgs := mock{{ $.Operation.ExportedName }}ReadEvents()
  818. sess, cleanupFn, err := eventstreamtest.SetupEventStreamSession(t,
  819. eventstreamtest.ServeEventStream{
  820. T: t,
  821. Events: eventMsgs,
  822. },
  823. true,
  824. )
  825. if err != nil {
  826. t.Fatalf("expect no error, %v", err)
  827. }
  828. defer cleanupFn()
  829. svc := New(sess)
  830. resp, err := svc.{{ $.Operation.ExportedName }}(nil)
  831. if err != nil {
  832. t.Fatalf("expect no error got, %v", err)
  833. }
  834. resp.EventStream.Close()
  835. <-resp.EventStream.Events()
  836. if err := resp.EventStream.Err(); err != nil {
  837. t.Errorf("expect no error, %v", err)
  838. }
  839. }
  840. func Benchmark{{ $.Operation.ExportedName }}_Read(b *testing.B) {
  841. _, eventMsgs := mock{{ $.Operation.ExportedName }}ReadEvents()
  842. var buf bytes.Buffer
  843. encoder := eventstream.NewEncoder(&buf)
  844. for _, msg := range eventMsgs {
  845. if err := encoder.Encode(msg); err != nil {
  846. b.Fatalf("failed to encode message, %v", err)
  847. }
  848. }
  849. stream := &loopReader{source: bytes.NewReader(buf.Bytes())}
  850. sess := unit.Session
  851. svc := New(sess, &aws.Config{
  852. Endpoint: aws.String("https://example.com"),
  853. DisableParamValidation: aws.Bool(true),
  854. })
  855. svc.Handlers.Send.Swap(corehandlers.SendHandler.Name,
  856. request.NamedHandler{Name: "mockSend",
  857. Fn: func(r *request.Request) {
  858. r.HTTPResponse = &http.Response{
  859. Status: "200 OK",
  860. StatusCode: 200,
  861. Header: http.Header{},
  862. Body: ioutil.NopCloser(stream),
  863. }
  864. },
  865. },
  866. )
  867. resp, err := svc.{{ $.Operation.ExportedName }}(nil)
  868. if err != nil {
  869. b.Fatalf("failed to create request, %v", err)
  870. }
  871. defer resp.EventStream.Close()
  872. b.ResetTimer()
  873. for i := 0; i < b.N; i++ {
  874. if err = resp.EventStream.Err(); err != nil {
  875. b.Fatalf("expect no error, got %v", err)
  876. }
  877. event := <-resp.EventStream.Events()
  878. if event == nil {
  879. b.Fatalf("expect event, got nil, %v, %d", resp.EventStream.Err(), i)
  880. }
  881. }
  882. }
  883. func mock{{ $.Operation.ExportedName }}ReadEvents() (
  884. []{{ $.Inbound.Name }}Event,
  885. []eventstream.Message,
  886. ) {
  887. expectEvents := []{{ $.Inbound.Name }}Event {
  888. {{- if eq $.Operation.API.Metadata.Protocol "json" }}
  889. {{- template "set event type" $.Operation.OutputRef.Shape }}
  890. {{- end }}
  891. {{- range $_, $event := $.Inbound.Events }}
  892. {{- template "set event type" $event.Shape }}
  893. {{- end }}
  894. }
  895. var marshalers request.HandlerList
  896. marshalers.PushBackNamed({{ $.API.ProtocolPackage }}.BuildHandler)
  897. payloadMarshaler := protocol.HandlerPayloadMarshal{
  898. Marshalers: marshalers,
  899. }
  900. _ = payloadMarshaler
  901. eventMsgs := []eventstream.Message{
  902. {{- if eq $.Operation.API.Metadata.Protocol "json" }}
  903. {{- template "set event message" Map "idx" 0 "parentShape" $.Operation.OutputRef.Shape "eventName" "initial-response" }}
  904. {{- end }}
  905. {{- range $idx, $event := $.Inbound.Events }}
  906. {{- $offsetIdx := OptionalAddInt (eq $.Operation.API.Metadata.Protocol "json") $idx 1 }}
  907. {{- template "set event message" Map "idx" $offsetIdx "parentShape" $event.Shape "eventName" $event.Name }}
  908. {{- end }}
  909. }
  910. return expectEvents, eventMsgs
  911. }
  912. {{- if $.Inbound.Exceptions }}
  913. func Test{{ $.Operation.ExportedName }}_ReadException(t *testing.T) {
  914. expectEvents := []{{ $.Inbound.Name }}Event {
  915. {{- if eq $.Operation.API.Metadata.Protocol "json" }}
  916. {{- template "set event type" $.Operation.OutputRef.Shape }}
  917. {{- end }}
  918. {{- $exception := index $.Inbound.Exceptions 0 }}
  919. {{- template "set event type" $exception.Shape }}
  920. }
  921. var marshalers request.HandlerList
  922. marshalers.PushBackNamed({{ $.API.ProtocolPackage }}.BuildHandler)
  923. payloadMarshaler := protocol.HandlerPayloadMarshal{
  924. Marshalers: marshalers,
  925. }
  926. eventMsgs := []eventstream.Message{
  927. {{- if eq $.Operation.API.Metadata.Protocol "json" }}
  928. {{- template "set event message" Map "idx" 0 "parentShape" $.Operation.OutputRef.Shape "eventName" "initial-response" }}
  929. {{- end }}
  930. {{- $offsetIdx := OptionalAddInt (eq $.Operation.API.Metadata.Protocol "json") 0 1 }}
  931. {{- $exception := index $.Inbound.Exceptions 0 }}
  932. {{- template "set event message" Map "idx" $offsetIdx "parentShape" $exception.Shape "eventName" $exception.Name }}
  933. }
  934. sess, cleanupFn, err := eventstreamtest.SetupEventStreamSession(t,
  935. eventstreamtest.ServeEventStream{
  936. T: t,
  937. Events: eventMsgs,
  938. },
  939. true,
  940. )
  941. if err != nil {
  942. t.Fatalf("expect no error, %v", err)
  943. }
  944. defer cleanupFn()
  945. svc := New(sess)
  946. resp, err := svc.{{ $.Operation.ExportedName }}(nil)
  947. if err != nil {
  948. t.Fatalf("expect no error got, %v", err)
  949. }
  950. defer resp.EventStream.Close()
  951. <-resp.EventStream.Events()
  952. err = resp.EventStream.Err()
  953. if err == nil {
  954. t.Fatalf("expect err, got none")
  955. }
  956. expectErr := {{ ValueForType $exception.Shape nil }}
  957. aerr, ok := err.(awserr.Error)
  958. if !ok {
  959. t.Errorf("expect exception, got %T, %#v", err, err)
  960. }
  961. if e, a := expectErr.Code(), aerr.Code(); e != a {
  962. t.Errorf("expect %v, got %v", e, a)
  963. }
  964. if e, a := expectErr.Message(), aerr.Message(); e != a {
  965. t.Errorf("expect %v, got %v", e, a)
  966. }
  967. if e, a := expectErr, aerr; !reflect.DeepEqual(e, a) {
  968. t.Errorf("expect %#v, got %#v", e, a)
  969. }
  970. }
  971. {{- range $_, $exception := $.Inbound.Exceptions }}
  972. var _ awserr.Error = (*{{ $exception.Shape.ShapeName }})(nil)
  973. {{- end }}
  974. {{ end }}
  975. {{ end }}
  976. {{/* Params: *Shape */}}
  977. {{ define "set event type" }}
  978. &{{ $.ShapeName }}{
  979. {{- range $memName, $memRef := $.MemberRefs }}
  980. {{- if not $memRef.Shape.IsEventStream }}
  981. {{ $memName }}: {{ ValueForType $memRef.Shape nil }},
  982. {{- end }}
  983. {{- end }}
  984. },
  985. {{- end }}
  986. {{/* Params: idx:int, parentShape:*Shape, eventName:string */}}
  987. {{ define "set event message" }}
  988. {
  989. Headers: eventstream.Headers{
  990. {{- if $.parentShape.Exception }}
  991. eventstreamtest.EventExceptionTypeHeader,
  992. {
  993. Name: eventstreamapi.ExceptionTypeHeader,
  994. Value: eventstream.StringValue("{{ $.eventName }}"),
  995. },
  996. {{- else }}
  997. eventstreamtest.EventMessageTypeHeader,
  998. {
  999. Name: eventstreamapi.EventTypeHeader,
  1000. Value: eventstream.StringValue("{{ $.eventName }}"),
  1001. },
  1002. {{- end }}
  1003. {{- range $memName, $memRef := $.parentShape.MemberRefs }}
  1004. {{- template "set event message header" Map "idx" $.idx "parentShape" $.parentShape "memName" $memName "memRef" $memRef }}
  1005. {{- end }}
  1006. },
  1007. {{- template "set event message payload" Map "idx" $.idx "parentShape" $.parentShape }}
  1008. },
  1009. {{- end }}
  1010. {{/* Params: idx:int, parentShape:*Shape, memName:string, memRef:*ShapeRef */}}
  1011. {{ define "set event message header" }}
  1012. {{- if $.memRef.IsEventHeader }}
  1013. {
  1014. Name: "{{ $.memName }}",
  1015. {{- $shapeValueVar := printf "expectEvents[%d].(%s).%s" $.idx $.parentShape.GoType $.memName }}
  1016. Value: {{ SetEventHeaderValueForType $.memRef.Shape $shapeValueVar }},
  1017. },
  1018. {{- end }}
  1019. {{- end }}
  1020. {{/* Params: idx:int, parentShape:*Shape, memName:string, memRef:*ShapeRef */}}
  1021. {{ define "set event message payload" }}
  1022. {{- $payloadMemName := $.parentShape.PayloadRefName }}
  1023. {{- if HasNonBlobPayloadMembers $.parentShape }}
  1024. Payload: eventstreamtest.MarshalEventPayload(payloadMarshaler, expectEvents[{{ $.idx }}]),
  1025. {{- else if $payloadMemName }}
  1026. {{- $shapeType := (index $.parentShape.MemberRefs $payloadMemName).Shape.Type }}
  1027. {{- if eq $shapeType "blob" }}
  1028. Payload: expectEvents[{{ $.idx }}].({{ $.parentShape.GoType }}).{{ $payloadMemName }},
  1029. {{- else if eq $shapeType "string" }}
  1030. Payload: []byte(*expectEvents[{{ $.idx }}].({{ $.parentShape.GoType }}).{{ $payloadMemName }}),
  1031. {{- end }}
  1032. {{- end }}
  1033. {{- end }}
  1034. `))