mockclient.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287
  1. /*
  2. Copyright 2017 Google LLC
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package testutil
  14. import (
  15. "context"
  16. "errors"
  17. "fmt"
  18. "sync"
  19. "testing"
  20. "time"
  21. "github.com/golang/protobuf/proto"
  22. "github.com/golang/protobuf/ptypes/empty"
  23. proto3 "github.com/golang/protobuf/ptypes/struct"
  24. pbt "github.com/golang/protobuf/ptypes/timestamp"
  25. sppb "google.golang.org/genproto/googleapis/spanner/v1"
  26. "google.golang.org/grpc"
  27. "google.golang.org/grpc/codes"
  28. "google.golang.org/grpc/status"
  29. )
  30. // MockCloudSpannerClient is a mock implementation of sppb.SpannerClient.
  31. type MockCloudSpannerClient struct {
  32. sppb.SpannerClient
  33. mu sync.Mutex
  34. t *testing.T
  35. // Live sessions on the client.
  36. sessions map[string]bool
  37. // Session ping history.
  38. pings []string
  39. // Client will stall on any requests.
  40. freezed chan struct{}
  41. // Expected set of actions that have been executed by the client. These
  42. // interfaces should be type reflected against with *Request types in sppb,
  43. // such as sppb.GetSessionRequest. Buffered to a large degree.
  44. ReceivedRequests chan interface{}
  45. }
  46. // NewMockCloudSpannerClient creates new MockCloudSpannerClient instance.
  47. func NewMockCloudSpannerClient(t *testing.T) *MockCloudSpannerClient {
  48. mc := &MockCloudSpannerClient{
  49. t: t,
  50. sessions: map[string]bool{},
  51. ReceivedRequests: make(chan interface{}, 100000),
  52. }
  53. // Produce a closed channel, so the default action of ready is to not block.
  54. mc.Freeze()
  55. mc.Unfreeze()
  56. return mc
  57. }
  58. // DumpPings dumps the ping history.
  59. func (m *MockCloudSpannerClient) DumpPings() []string {
  60. m.mu.Lock()
  61. defer m.mu.Unlock()
  62. return append([]string(nil), m.pings...)
  63. }
  64. // DumpSessions dumps the internal session table.
  65. func (m *MockCloudSpannerClient) DumpSessions() map[string]bool {
  66. m.mu.Lock()
  67. defer m.mu.Unlock()
  68. st := map[string]bool{}
  69. for s, v := range m.sessions {
  70. st[s] = v
  71. }
  72. return st
  73. }
  74. // CreateSession is a placeholder for SpannerClient.CreateSession.
  75. func (m *MockCloudSpannerClient) CreateSession(c context.Context, r *sppb.CreateSessionRequest, opts ...grpc.CallOption) (*sppb.Session, error) {
  76. m.ready()
  77. m.ReceivedRequests <- r
  78. m.mu.Lock()
  79. defer m.mu.Unlock()
  80. s := &sppb.Session{}
  81. if r.Database != "mockdb" {
  82. // Reject other databases
  83. return s, status.Errorf(codes.NotFound, fmt.Sprintf("database not found: %v", r.Database))
  84. }
  85. // Generate & record session name.
  86. s.Name = fmt.Sprintf("mockdb-%v", time.Now().UnixNano())
  87. m.sessions[s.Name] = true
  88. return s, nil
  89. }
  90. // GetSession is a placeholder for SpannerClient.GetSession.
  91. func (m *MockCloudSpannerClient) GetSession(c context.Context, r *sppb.GetSessionRequest, opts ...grpc.CallOption) (*sppb.Session, error) {
  92. m.ready()
  93. m.ReceivedRequests <- r
  94. m.mu.Lock()
  95. defer m.mu.Unlock()
  96. m.pings = append(m.pings, r.Name)
  97. if _, ok := m.sessions[r.Name]; !ok {
  98. return nil, status.Errorf(codes.NotFound, fmt.Sprintf("Session not found: %v", r.Name))
  99. }
  100. return &sppb.Session{Name: r.Name}, nil
  101. }
  102. // DeleteSession is a placeholder for SpannerClient.DeleteSession.
  103. func (m *MockCloudSpannerClient) DeleteSession(c context.Context, r *sppb.DeleteSessionRequest, opts ...grpc.CallOption) (*empty.Empty, error) {
  104. m.ready()
  105. m.ReceivedRequests <- r
  106. m.mu.Lock()
  107. defer m.mu.Unlock()
  108. if _, ok := m.sessions[r.Name]; !ok {
  109. // Session not found.
  110. return &empty.Empty{}, status.Errorf(codes.NotFound, fmt.Sprintf("Session not found: %v", r.Name))
  111. }
  112. // Delete session from in-memory table.
  113. delete(m.sessions, r.Name)
  114. return &empty.Empty{}, nil
  115. }
  116. // ExecuteStreamingSql is a mock implementation of SpannerClient.ExecuteStreamingSql.
  117. func (m *MockCloudSpannerClient) ExecuteStreamingSql(c context.Context, r *sppb.ExecuteSqlRequest, opts ...grpc.CallOption) (sppb.Spanner_ExecuteStreamingSqlClient, error) {
  118. m.ready()
  119. m.ReceivedRequests <- r
  120. m.mu.Lock()
  121. defer m.mu.Unlock()
  122. wantReq := &sppb.ExecuteSqlRequest{
  123. Session: "mocksession",
  124. Transaction: &sppb.TransactionSelector{
  125. Selector: &sppb.TransactionSelector_SingleUse{
  126. SingleUse: &sppb.TransactionOptions{
  127. Mode: &sppb.TransactionOptions_ReadOnly_{
  128. ReadOnly: &sppb.TransactionOptions_ReadOnly{
  129. TimestampBound: &sppb.TransactionOptions_ReadOnly_Strong{
  130. Strong: true,
  131. },
  132. ReturnReadTimestamp: false,
  133. },
  134. },
  135. },
  136. },
  137. },
  138. Sql: "mockquery",
  139. Params: &proto3.Struct{
  140. Fields: map[string]*proto3.Value{"var1": {Kind: &proto3.Value_StringValue{StringValue: "abc"}}},
  141. },
  142. ParamTypes: map[string]*sppb.Type{"var1": {Code: sppb.TypeCode_STRING}},
  143. }
  144. if !proto.Equal(r, wantReq) {
  145. return nil, fmt.Errorf("got query request: %v, want: %v", r, wantReq)
  146. }
  147. return nil, errors.New("query never succeeds on mock client")
  148. }
  149. // StreamingRead is a placeholder for SpannerClient.StreamingRead.
  150. func (m *MockCloudSpannerClient) StreamingRead(c context.Context, r *sppb.ReadRequest, opts ...grpc.CallOption) (sppb.Spanner_StreamingReadClient, error) {
  151. m.ready()
  152. m.ReceivedRequests <- r
  153. m.mu.Lock()
  154. defer m.mu.Unlock()
  155. wantReq := &sppb.ReadRequest{
  156. Session: "mocksession",
  157. Transaction: &sppb.TransactionSelector{
  158. Selector: &sppb.TransactionSelector_SingleUse{
  159. SingleUse: &sppb.TransactionOptions{
  160. Mode: &sppb.TransactionOptions_ReadOnly_{
  161. ReadOnly: &sppb.TransactionOptions_ReadOnly{
  162. TimestampBound: &sppb.TransactionOptions_ReadOnly_Strong{
  163. Strong: true,
  164. },
  165. ReturnReadTimestamp: false,
  166. },
  167. },
  168. },
  169. },
  170. },
  171. Table: "t_mock",
  172. Columns: []string{"col1", "col2"},
  173. KeySet: &sppb.KeySet{
  174. Keys: []*proto3.ListValue{
  175. {
  176. Values: []*proto3.Value{
  177. {Kind: &proto3.Value_StringValue{StringValue: "foo"}},
  178. },
  179. },
  180. },
  181. Ranges: []*sppb.KeyRange{},
  182. All: false,
  183. },
  184. }
  185. if !proto.Equal(r, wantReq) {
  186. return nil, fmt.Errorf("got query request: %v, want: %v", r, wantReq)
  187. }
  188. return nil, errors.New("read never succeeds on mock client")
  189. }
  190. // BeginTransaction is a placeholder for SpannerClient.BeginTransaction.
  191. func (m *MockCloudSpannerClient) BeginTransaction(c context.Context, r *sppb.BeginTransactionRequest, opts ...grpc.CallOption) (*sppb.Transaction, error) {
  192. m.ready()
  193. m.ReceivedRequests <- r
  194. m.mu.Lock()
  195. defer m.mu.Unlock()
  196. resp := &sppb.Transaction{Id: []byte("transaction-1")}
  197. if _, ok := r.Options.Mode.(*sppb.TransactionOptions_ReadOnly_); ok {
  198. resp.ReadTimestamp = &pbt.Timestamp{Seconds: 3, Nanos: 4}
  199. }
  200. return resp, nil
  201. }
  202. // Commit is a placeholder for SpannerClient.Commit.
  203. func (m *MockCloudSpannerClient) Commit(c context.Context, r *sppb.CommitRequest, opts ...grpc.CallOption) (*sppb.CommitResponse, error) {
  204. m.ready()
  205. m.ReceivedRequests <- r
  206. m.mu.Lock()
  207. defer m.mu.Unlock()
  208. return &sppb.CommitResponse{CommitTimestamp: &pbt.Timestamp{Seconds: 1, Nanos: 2}}, nil
  209. }
  210. // Rollback is a placeholder for SpannerClient.Rollback.
  211. func (m *MockCloudSpannerClient) Rollback(c context.Context, r *sppb.RollbackRequest, opts ...grpc.CallOption) (*empty.Empty, error) {
  212. m.ready()
  213. m.ReceivedRequests <- r
  214. m.mu.Lock()
  215. defer m.mu.Unlock()
  216. return nil, nil
  217. }
  218. // PartitionQuery is a placeholder for SpannerServer.PartitionQuery.
  219. func (m *MockCloudSpannerClient) PartitionQuery(ctx context.Context, r *sppb.PartitionQueryRequest, opts ...grpc.CallOption) (*sppb.PartitionResponse, error) {
  220. m.ready()
  221. m.ReceivedRequests <- r
  222. return nil, errors.New("Unimplemented")
  223. }
  224. // PartitionRead is a placeholder for SpannerServer.PartitionRead.
  225. func (m *MockCloudSpannerClient) PartitionRead(ctx context.Context, r *sppb.PartitionReadRequest, opts ...grpc.CallOption) (*sppb.PartitionResponse, error) {
  226. m.ready()
  227. m.ReceivedRequests <- r
  228. return nil, errors.New("Unimplemented")
  229. }
  230. // Freeze stalls all requests.
  231. func (m *MockCloudSpannerClient) Freeze() {
  232. m.mu.Lock()
  233. defer m.mu.Unlock()
  234. m.freezed = make(chan struct{})
  235. }
  236. // Unfreeze restores processing requests.
  237. func (m *MockCloudSpannerClient) Unfreeze() {
  238. m.mu.Lock()
  239. defer m.mu.Unlock()
  240. close(m.freezed)
  241. }
  242. // ready checks conditions before executing requests
  243. // TODO: add checks for injected errors, actions
  244. func (m *MockCloudSpannerClient) ready() {
  245. m.mu.Lock()
  246. freezed := m.freezed
  247. m.mu.Unlock()
  248. // check if client should be freezed
  249. <-freezed
  250. }