integration_test.go 58 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186
  1. // Copyright 2015 Google LLC
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package bigquery
  15. import (
  16. "context"
  17. "encoding/json"
  18. "errors"
  19. "flag"
  20. "fmt"
  21. "log"
  22. "math/big"
  23. "net/http"
  24. "os"
  25. "sort"
  26. "strings"
  27. "testing"
  28. "time"
  29. "cloud.google.com/go/civil"
  30. "cloud.google.com/go/httpreplay"
  31. "cloud.google.com/go/internal"
  32. "cloud.google.com/go/internal/pretty"
  33. "cloud.google.com/go/internal/testutil"
  34. "cloud.google.com/go/internal/uid"
  35. "cloud.google.com/go/storage"
  36. "github.com/google/go-cmp/cmp"
  37. "github.com/google/go-cmp/cmp/cmpopts"
  38. gax "github.com/googleapis/gax-go/v2"
  39. "google.golang.org/api/googleapi"
  40. "google.golang.org/api/iterator"
  41. "google.golang.org/api/option"
  42. )
  43. const replayFilename = "bigquery.replay"
  44. var record = flag.Bool("record", false, "record RPCs")
  45. var (
  46. client *Client
  47. storageClient *storage.Client
  48. dataset *Dataset
  49. schema = Schema{
  50. {Name: "name", Type: StringFieldType},
  51. {Name: "nums", Type: IntegerFieldType, Repeated: true},
  52. {Name: "rec", Type: RecordFieldType, Schema: Schema{
  53. {Name: "bool", Type: BooleanFieldType},
  54. }},
  55. }
  56. testTableExpiration time.Time
  57. datasetIDs, tableIDs *uid.Space
  58. )
  59. // Note: integration tests cannot be run in parallel, because TestIntegration_Location
  60. // modifies the client.
  61. func TestMain(m *testing.M) {
  62. cleanup := initIntegrationTest()
  63. r := m.Run()
  64. cleanup()
  65. os.Exit(r)
  66. }
  67. func getClient(t *testing.T) *Client {
  68. if client == nil {
  69. t.Skip("Integration tests skipped")
  70. }
  71. return client
  72. }
  73. // If integration tests will be run, create a unique dataset for them.
  74. // Return a cleanup function.
  75. func initIntegrationTest() func() {
  76. ctx := context.Background()
  77. flag.Parse() // needed for testing.Short()
  78. projID := testutil.ProjID()
  79. switch {
  80. case testing.Short() && *record:
  81. log.Fatal("cannot combine -short and -record")
  82. return func() {}
  83. case testing.Short() && httpreplay.Supported() && testutil.CanReplay(replayFilename) && projID != "":
  84. // go test -short with a replay file will replay the integration tests if the
  85. // environment variables are set.
  86. log.Printf("replaying from %s", replayFilename)
  87. httpreplay.DebugHeaders()
  88. replayer, err := httpreplay.NewReplayer(replayFilename)
  89. if err != nil {
  90. log.Fatal(err)
  91. }
  92. var t time.Time
  93. if err := json.Unmarshal(replayer.Initial(), &t); err != nil {
  94. log.Fatal(err)
  95. }
  96. hc, err := replayer.Client(ctx) // no creds needed
  97. if err != nil {
  98. log.Fatal(err)
  99. }
  100. client, err = NewClient(ctx, projID, option.WithHTTPClient(hc))
  101. if err != nil {
  102. log.Fatal(err)
  103. }
  104. storageClient, err = storage.NewClient(ctx, option.WithHTTPClient(hc))
  105. if err != nil {
  106. log.Fatal(err)
  107. }
  108. cleanup := initTestState(client, t)
  109. return func() {
  110. cleanup()
  111. _ = replayer.Close() // No actionable error returned.
  112. }
  113. case testing.Short():
  114. // go test -short without a replay file skips the integration tests.
  115. if testutil.CanReplay(replayFilename) && projID != "" {
  116. log.Print("replay not supported for Go versions before 1.8")
  117. }
  118. client = nil
  119. storageClient = nil
  120. return func() {}
  121. default: // Run integration tests against a real backend.
  122. ts := testutil.TokenSource(ctx, Scope)
  123. if ts == nil {
  124. log.Println("Integration tests skipped. See CONTRIBUTING.md for details")
  125. return func() {}
  126. }
  127. bqOpt := option.WithTokenSource(ts)
  128. sOpt := option.WithTokenSource(testutil.TokenSource(ctx, storage.ScopeFullControl))
  129. cleanup := func() {}
  130. now := time.Now().UTC()
  131. if *record {
  132. if !httpreplay.Supported() {
  133. log.Print("record not supported for Go versions before 1.8")
  134. } else {
  135. nowBytes, err := json.Marshal(now)
  136. if err != nil {
  137. log.Fatal(err)
  138. }
  139. recorder, err := httpreplay.NewRecorder(replayFilename, nowBytes)
  140. if err != nil {
  141. log.Fatalf("could not record: %v", err)
  142. }
  143. log.Printf("recording to %s", replayFilename)
  144. hc, err := recorder.Client(ctx, bqOpt)
  145. if err != nil {
  146. log.Fatal(err)
  147. }
  148. bqOpt = option.WithHTTPClient(hc)
  149. hc, err = recorder.Client(ctx, sOpt)
  150. if err != nil {
  151. log.Fatal(err)
  152. }
  153. sOpt = option.WithHTTPClient(hc)
  154. cleanup = func() {
  155. if err := recorder.Close(); err != nil {
  156. log.Printf("saving recording: %v", err)
  157. }
  158. }
  159. }
  160. }
  161. var err error
  162. client, err = NewClient(ctx, projID, bqOpt)
  163. if err != nil {
  164. log.Fatalf("NewClient: %v", err)
  165. }
  166. storageClient, err = storage.NewClient(ctx, sOpt)
  167. if err != nil {
  168. log.Fatalf("storage.NewClient: %v", err)
  169. }
  170. c := initTestState(client, now)
  171. return func() { c(); cleanup() }
  172. }
  173. }
  174. func initTestState(client *Client, t time.Time) func() {
  175. // BigQuery does not accept hyphens in dataset or table IDs, so we create IDs
  176. // with underscores.
  177. ctx := context.Background()
  178. opts := &uid.Options{Sep: '_', Time: t}
  179. datasetIDs = uid.NewSpace("dataset", opts)
  180. tableIDs = uid.NewSpace("table", opts)
  181. testTableExpiration = t.Add(10 * time.Minute).Round(time.Second)
  182. // For replayability, seed the random source with t.
  183. Seed(t.UnixNano())
  184. dataset = client.Dataset(datasetIDs.New())
  185. if err := dataset.Create(ctx, nil); err != nil {
  186. log.Fatalf("creating dataset %s: %v", dataset.DatasetID, err)
  187. }
  188. return func() {
  189. if err := dataset.DeleteWithContents(ctx); err != nil {
  190. log.Printf("could not delete %s", dataset.DatasetID)
  191. }
  192. }
  193. }
  194. func TestIntegration_TableCreate(t *testing.T) {
  195. // Check that creating a record field with an empty schema is an error.
  196. if client == nil {
  197. t.Skip("Integration tests skipped")
  198. }
  199. table := dataset.Table("t_bad")
  200. schema := Schema{
  201. {Name: "rec", Type: RecordFieldType, Schema: Schema{}},
  202. }
  203. err := table.Create(context.Background(), &TableMetadata{
  204. Schema: schema,
  205. ExpirationTime: testTableExpiration.Add(5 * time.Minute),
  206. })
  207. if err == nil {
  208. t.Fatal("want error, got nil")
  209. }
  210. if !hasStatusCode(err, http.StatusBadRequest) {
  211. t.Fatalf("want a 400 error, got %v", err)
  212. }
  213. }
  214. func TestIntegration_TableCreateView(t *testing.T) {
  215. if client == nil {
  216. t.Skip("Integration tests skipped")
  217. }
  218. ctx := context.Background()
  219. table := newTable(t, schema)
  220. defer table.Delete(ctx)
  221. // Test that standard SQL views work.
  222. view := dataset.Table("t_view_standardsql")
  223. query := fmt.Sprintf("SELECT APPROX_COUNT_DISTINCT(name) FROM `%s.%s.%s`",
  224. dataset.ProjectID, dataset.DatasetID, table.TableID)
  225. err := view.Create(context.Background(), &TableMetadata{
  226. ViewQuery: query,
  227. UseStandardSQL: true,
  228. })
  229. if err != nil {
  230. t.Fatalf("table.create: Did not expect an error, got: %v", err)
  231. }
  232. if err := view.Delete(ctx); err != nil {
  233. t.Fatal(err)
  234. }
  235. }
  236. func TestIntegration_TableMetadata(t *testing.T) {
  237. if client == nil {
  238. t.Skip("Integration tests skipped")
  239. }
  240. ctx := context.Background()
  241. table := newTable(t, schema)
  242. defer table.Delete(ctx)
  243. // Check table metadata.
  244. md, err := table.Metadata(ctx)
  245. if err != nil {
  246. t.Fatal(err)
  247. }
  248. // TODO(jba): check md more thorougly.
  249. if got, want := md.FullID, fmt.Sprintf("%s:%s.%s", dataset.ProjectID, dataset.DatasetID, table.TableID); got != want {
  250. t.Errorf("metadata.FullID: got %q, want %q", got, want)
  251. }
  252. if got, want := md.Type, RegularTable; got != want {
  253. t.Errorf("metadata.Type: got %v, want %v", got, want)
  254. }
  255. if got, want := md.ExpirationTime, testTableExpiration; !got.Equal(want) {
  256. t.Errorf("metadata.Type: got %v, want %v", got, want)
  257. }
  258. // Check that timePartitioning is nil by default
  259. if md.TimePartitioning != nil {
  260. t.Errorf("metadata.TimePartitioning: got %v, want %v", md.TimePartitioning, nil)
  261. }
  262. // Create tables that have time partitioning
  263. partitionCases := []struct {
  264. timePartitioning TimePartitioning
  265. wantExpiration time.Duration
  266. wantField string
  267. wantPruneFilter bool
  268. }{
  269. {TimePartitioning{}, time.Duration(0), "", false},
  270. {TimePartitioning{Expiration: time.Second}, time.Second, "", false},
  271. {TimePartitioning{RequirePartitionFilter: true}, time.Duration(0), "", true},
  272. {
  273. TimePartitioning{
  274. Expiration: time.Second,
  275. Field: "date",
  276. RequirePartitionFilter: true,
  277. }, time.Second, "date", true},
  278. }
  279. schema2 := Schema{
  280. {Name: "name", Type: StringFieldType},
  281. {Name: "date", Type: DateFieldType},
  282. }
  283. clustering := &Clustering{
  284. Fields: []string{"name"},
  285. }
  286. // Currently, clustering depends on partitioning. Interleave testing of the two features.
  287. for i, c := range partitionCases {
  288. table := dataset.Table(fmt.Sprintf("t_metadata_partition_nocluster_%v", i))
  289. clusterTable := dataset.Table(fmt.Sprintf("t_metadata_partition_cluster_%v", i))
  290. // Create unclustered, partitioned variant and get metadata.
  291. err = table.Create(context.Background(), &TableMetadata{
  292. Schema: schema2,
  293. TimePartitioning: &c.timePartitioning,
  294. ExpirationTime: testTableExpiration,
  295. })
  296. if err != nil {
  297. t.Fatal(err)
  298. }
  299. defer table.Delete(ctx)
  300. md, err := table.Metadata(ctx)
  301. if err != nil {
  302. t.Fatal(err)
  303. }
  304. // Created clustered table and get metadata.
  305. err = clusterTable.Create(context.Background(), &TableMetadata{
  306. Schema: schema2,
  307. TimePartitioning: &c.timePartitioning,
  308. ExpirationTime: testTableExpiration,
  309. Clustering: clustering,
  310. })
  311. if err != nil {
  312. t.Fatal(err)
  313. }
  314. clusterMD, err := clusterTable.Metadata(ctx)
  315. if err != nil {
  316. t.Fatal(err)
  317. }
  318. for _, v := range []*TableMetadata{md, clusterMD} {
  319. got := v.TimePartitioning
  320. want := &TimePartitioning{
  321. Expiration: c.wantExpiration,
  322. Field: c.wantField,
  323. RequirePartitionFilter: c.wantPruneFilter,
  324. }
  325. if !testutil.Equal(got, want) {
  326. t.Errorf("metadata.TimePartitioning: got %v, want %v", got, want)
  327. }
  328. // check that RequirePartitionFilter can be inverted.
  329. mdUpdate := TableMetadataToUpdate{
  330. TimePartitioning: &TimePartitioning{
  331. Expiration: v.TimePartitioning.Expiration,
  332. RequirePartitionFilter: !want.RequirePartitionFilter,
  333. },
  334. }
  335. newmd, err := table.Update(ctx, mdUpdate, "")
  336. if err != nil {
  337. t.Errorf("failed to invert RequirePartitionFilter on %s: %v", table.FullyQualifiedName(), err)
  338. }
  339. if newmd.TimePartitioning.RequirePartitionFilter == want.RequirePartitionFilter {
  340. t.Errorf("inverting RequirePartitionFilter on %s failed, want %t got %t", table.FullyQualifiedName(), !want.RequirePartitionFilter, newmd.TimePartitioning.RequirePartitionFilter)
  341. }
  342. }
  343. if md.Clustering != nil {
  344. t.Errorf("metadata.Clustering was not nil on unclustered table %s", table.TableID)
  345. }
  346. got := clusterMD.Clustering
  347. want := clustering
  348. if clusterMD.Clustering != clustering {
  349. if !testutil.Equal(got, want) {
  350. t.Errorf("metadata.Clustering: got %v, want %v", got, want)
  351. }
  352. }
  353. }
  354. }
  355. func TestIntegration_RemoveTimePartitioning(t *testing.T) {
  356. if client == nil {
  357. t.Skip("Integration tests skipped")
  358. }
  359. ctx := context.Background()
  360. table := dataset.Table(tableIDs.New())
  361. want := 24 * time.Hour
  362. err := table.Create(ctx, &TableMetadata{
  363. ExpirationTime: testTableExpiration,
  364. TimePartitioning: &TimePartitioning{
  365. Expiration: want,
  366. },
  367. })
  368. if err != nil {
  369. t.Fatal(err)
  370. }
  371. defer table.Delete(ctx)
  372. md, err := table.Metadata(ctx)
  373. if err != nil {
  374. t.Fatal(err)
  375. }
  376. if got := md.TimePartitioning.Expiration; got != want {
  377. t.Fatalf("TimeParitioning expiration want = %v, got = %v", want, got)
  378. }
  379. // Remove time partitioning expiration
  380. md, err = table.Update(context.Background(), TableMetadataToUpdate{
  381. TimePartitioning: &TimePartitioning{Expiration: 0},
  382. }, md.ETag)
  383. if err != nil {
  384. t.Fatal(err)
  385. }
  386. want = time.Duration(0)
  387. if got := md.TimePartitioning.Expiration; got != want {
  388. t.Fatalf("TimeParitioning expiration want = %v, got = %v", want, got)
  389. }
  390. }
  391. func TestIntegration_DatasetCreate(t *testing.T) {
  392. if client == nil {
  393. t.Skip("Integration tests skipped")
  394. }
  395. ctx := context.Background()
  396. ds := client.Dataset(datasetIDs.New())
  397. wmd := &DatasetMetadata{Name: "name", Location: "EU"}
  398. err := ds.Create(ctx, wmd)
  399. if err != nil {
  400. t.Fatal(err)
  401. }
  402. gmd, err := ds.Metadata(ctx)
  403. if err != nil {
  404. t.Fatal(err)
  405. }
  406. if got, want := gmd.Name, wmd.Name; got != want {
  407. t.Errorf("name: got %q, want %q", got, want)
  408. }
  409. if got, want := gmd.Location, wmd.Location; got != want {
  410. t.Errorf("location: got %q, want %q", got, want)
  411. }
  412. if err := ds.Delete(ctx); err != nil {
  413. t.Fatalf("deleting dataset %v: %v", ds, err)
  414. }
  415. }
  416. func TestIntegration_DatasetMetadata(t *testing.T) {
  417. if client == nil {
  418. t.Skip("Integration tests skipped")
  419. }
  420. ctx := context.Background()
  421. md, err := dataset.Metadata(ctx)
  422. if err != nil {
  423. t.Fatal(err)
  424. }
  425. if got, want := md.FullID, fmt.Sprintf("%s:%s", dataset.ProjectID, dataset.DatasetID); got != want {
  426. t.Errorf("FullID: got %q, want %q", got, want)
  427. }
  428. jan2016 := time.Date(2016, 1, 1, 0, 0, 0, 0, time.UTC)
  429. if md.CreationTime.Before(jan2016) {
  430. t.Errorf("CreationTime: got %s, want > 2016-1-1", md.CreationTime)
  431. }
  432. if md.LastModifiedTime.Before(jan2016) {
  433. t.Errorf("LastModifiedTime: got %s, want > 2016-1-1", md.LastModifiedTime)
  434. }
  435. // Verify that we get a NotFound for a nonexistent dataset.
  436. _, err = client.Dataset("does_not_exist").Metadata(ctx)
  437. if err == nil || !hasStatusCode(err, http.StatusNotFound) {
  438. t.Errorf("got %v, want NotFound error", err)
  439. }
  440. }
  441. func TestIntegration_DatasetDelete(t *testing.T) {
  442. if client == nil {
  443. t.Skip("Integration tests skipped")
  444. }
  445. ctx := context.Background()
  446. ds := client.Dataset(datasetIDs.New())
  447. if err := ds.Create(ctx, nil); err != nil {
  448. t.Fatalf("creating dataset %s: %v", ds.DatasetID, err)
  449. }
  450. if err := ds.Delete(ctx); err != nil {
  451. t.Fatalf("deleting dataset %s: %v", ds.DatasetID, err)
  452. }
  453. }
  454. func TestIntegration_DatasetDeleteWithContents(t *testing.T) {
  455. if client == nil {
  456. t.Skip("Integration tests skipped")
  457. }
  458. ctx := context.Background()
  459. ds := client.Dataset(datasetIDs.New())
  460. if err := ds.Create(ctx, nil); err != nil {
  461. t.Fatalf("creating dataset %s: %v", ds.DatasetID, err)
  462. }
  463. table := ds.Table(tableIDs.New())
  464. if err := table.Create(ctx, nil); err != nil {
  465. t.Fatalf("creating table %s in dataset %s: %v", table.TableID, table.DatasetID, err)
  466. }
  467. // We expect failure here
  468. if err := ds.Delete(ctx); err == nil {
  469. t.Fatalf("non-recursive delete of dataset %s succeeded unexpectedly.", ds.DatasetID)
  470. }
  471. if err := ds.DeleteWithContents(ctx); err != nil {
  472. t.Fatalf("deleting recursively dataset %s: %v", ds.DatasetID, err)
  473. }
  474. }
  475. func TestIntegration_DatasetUpdateETags(t *testing.T) {
  476. if client == nil {
  477. t.Skip("Integration tests skipped")
  478. }
  479. check := func(md *DatasetMetadata, wantDesc, wantName string) {
  480. if md.Description != wantDesc {
  481. t.Errorf("description: got %q, want %q", md.Description, wantDesc)
  482. }
  483. if md.Name != wantName {
  484. t.Errorf("name: got %q, want %q", md.Name, wantName)
  485. }
  486. }
  487. ctx := context.Background()
  488. md, err := dataset.Metadata(ctx)
  489. if err != nil {
  490. t.Fatal(err)
  491. }
  492. if md.ETag == "" {
  493. t.Fatal("empty ETag")
  494. }
  495. // Write without ETag succeeds.
  496. desc := md.Description + "d2"
  497. name := md.Name + "n2"
  498. md2, err := dataset.Update(ctx, DatasetMetadataToUpdate{Description: desc, Name: name}, "")
  499. if err != nil {
  500. t.Fatal(err)
  501. }
  502. check(md2, desc, name)
  503. // Write with original ETag fails because of intervening write.
  504. _, err = dataset.Update(ctx, DatasetMetadataToUpdate{Description: "d", Name: "n"}, md.ETag)
  505. if err == nil {
  506. t.Fatal("got nil, want error")
  507. }
  508. // Write with most recent ETag succeeds.
  509. md3, err := dataset.Update(ctx, DatasetMetadataToUpdate{Description: "", Name: ""}, md2.ETag)
  510. if err != nil {
  511. t.Fatal(err)
  512. }
  513. check(md3, "", "")
  514. }
  515. func TestIntegration_DatasetUpdateDefaultExpiration(t *testing.T) {
  516. if client == nil {
  517. t.Skip("Integration tests skipped")
  518. }
  519. ctx := context.Background()
  520. _, err := dataset.Metadata(ctx)
  521. if err != nil {
  522. t.Fatal(err)
  523. }
  524. // Set the default expiration time.
  525. md, err := dataset.Update(ctx, DatasetMetadataToUpdate{DefaultTableExpiration: time.Hour}, "")
  526. if err != nil {
  527. t.Fatal(err)
  528. }
  529. if md.DefaultTableExpiration != time.Hour {
  530. t.Fatalf("got %s, want 1h", md.DefaultTableExpiration)
  531. }
  532. // Omitting DefaultTableExpiration doesn't change it.
  533. md, err = dataset.Update(ctx, DatasetMetadataToUpdate{Name: "xyz"}, "")
  534. if err != nil {
  535. t.Fatal(err)
  536. }
  537. if md.DefaultTableExpiration != time.Hour {
  538. t.Fatalf("got %s, want 1h", md.DefaultTableExpiration)
  539. }
  540. // Setting it to 0 deletes it (which looks like a 0 duration).
  541. md, err = dataset.Update(ctx, DatasetMetadataToUpdate{DefaultTableExpiration: time.Duration(0)}, "")
  542. if err != nil {
  543. t.Fatal(err)
  544. }
  545. if md.DefaultTableExpiration != 0 {
  546. t.Fatalf("got %s, want 0", md.DefaultTableExpiration)
  547. }
  548. }
  549. func TestIntegration_DatasetUpdateAccess(t *testing.T) {
  550. if client == nil {
  551. t.Skip("Integration tests skipped")
  552. }
  553. ctx := context.Background()
  554. md, err := dataset.Metadata(ctx)
  555. if err != nil {
  556. t.Fatal(err)
  557. }
  558. origAccess := append([]*AccessEntry(nil), md.Access...)
  559. newEntry := &AccessEntry{
  560. Role: ReaderRole,
  561. Entity: "Joe@example.com",
  562. EntityType: UserEmailEntity,
  563. }
  564. newAccess := append(md.Access, newEntry)
  565. dm := DatasetMetadataToUpdate{Access: newAccess}
  566. md, err = dataset.Update(ctx, dm, md.ETag)
  567. if err != nil {
  568. t.Fatal(err)
  569. }
  570. defer func() {
  571. _, err := dataset.Update(ctx, DatasetMetadataToUpdate{Access: origAccess}, md.ETag)
  572. if err != nil {
  573. t.Log("could not restore dataset access list")
  574. }
  575. }()
  576. if diff := testutil.Diff(md.Access, newAccess); diff != "" {
  577. t.Fatalf("got=-, want=+:\n%s", diff)
  578. }
  579. }
  580. func TestIntegration_DatasetUpdateLabels(t *testing.T) {
  581. if client == nil {
  582. t.Skip("Integration tests skipped")
  583. }
  584. ctx := context.Background()
  585. _, err := dataset.Metadata(ctx)
  586. if err != nil {
  587. t.Fatal(err)
  588. }
  589. var dm DatasetMetadataToUpdate
  590. dm.SetLabel("label", "value")
  591. md, err := dataset.Update(ctx, dm, "")
  592. if err != nil {
  593. t.Fatal(err)
  594. }
  595. if got, want := md.Labels["label"], "value"; got != want {
  596. t.Errorf("got %q, want %q", got, want)
  597. }
  598. dm = DatasetMetadataToUpdate{}
  599. dm.DeleteLabel("label")
  600. md, err = dataset.Update(ctx, dm, "")
  601. if err != nil {
  602. t.Fatal(err)
  603. }
  604. if _, ok := md.Labels["label"]; ok {
  605. t.Error("label still present after deletion")
  606. }
  607. }
  608. func TestIntegration_TableUpdateLabels(t *testing.T) {
  609. if client == nil {
  610. t.Skip("Integration tests skipped")
  611. }
  612. ctx := context.Background()
  613. table := newTable(t, schema)
  614. defer table.Delete(ctx)
  615. var tm TableMetadataToUpdate
  616. tm.SetLabel("label", "value")
  617. md, err := table.Update(ctx, tm, "")
  618. if err != nil {
  619. t.Fatal(err)
  620. }
  621. if got, want := md.Labels["label"], "value"; got != want {
  622. t.Errorf("got %q, want %q", got, want)
  623. }
  624. tm = TableMetadataToUpdate{}
  625. tm.DeleteLabel("label")
  626. md, err = table.Update(ctx, tm, "")
  627. if err != nil {
  628. t.Fatal(err)
  629. }
  630. if _, ok := md.Labels["label"]; ok {
  631. t.Error("label still present after deletion")
  632. }
  633. }
  634. func TestIntegration_Tables(t *testing.T) {
  635. if client == nil {
  636. t.Skip("Integration tests skipped")
  637. }
  638. ctx := context.Background()
  639. table := newTable(t, schema)
  640. defer table.Delete(ctx)
  641. wantName := table.FullyQualifiedName()
  642. // This test is flaky due to eventual consistency.
  643. ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
  644. defer cancel()
  645. err := internal.Retry(ctx, gax.Backoff{}, func() (stop bool, err error) {
  646. // Iterate over tables in the dataset.
  647. it := dataset.Tables(ctx)
  648. var tableNames []string
  649. for {
  650. tbl, err := it.Next()
  651. if err == iterator.Done {
  652. break
  653. }
  654. if err != nil {
  655. return false, err
  656. }
  657. tableNames = append(tableNames, tbl.FullyQualifiedName())
  658. }
  659. // Other tests may be running with this dataset, so there might be more
  660. // than just our table in the list. So don't try for an exact match; just
  661. // make sure that our table is there somewhere.
  662. for _, tn := range tableNames {
  663. if tn == wantName {
  664. return true, nil
  665. }
  666. }
  667. return false, fmt.Errorf("got %v\nwant %s in the list", tableNames, wantName)
  668. })
  669. if err != nil {
  670. t.Fatal(err)
  671. }
  672. }
  673. func TestIntegration_InsertAndRead(t *testing.T) {
  674. if client == nil {
  675. t.Skip("Integration tests skipped")
  676. }
  677. ctx := context.Background()
  678. table := newTable(t, schema)
  679. defer table.Delete(ctx)
  680. // Populate the table.
  681. ins := table.Inserter()
  682. var (
  683. wantRows [][]Value
  684. saverRows []*ValuesSaver
  685. )
  686. for i, name := range []string{"a", "b", "c"} {
  687. row := []Value{name, []Value{int64(i)}, []Value{true}}
  688. wantRows = append(wantRows, row)
  689. saverRows = append(saverRows, &ValuesSaver{
  690. Schema: schema,
  691. InsertID: name,
  692. Row: row,
  693. })
  694. }
  695. if err := ins.Put(ctx, saverRows); err != nil {
  696. t.Fatal(putError(err))
  697. }
  698. // Wait until the data has been uploaded. This can take a few seconds, according
  699. // to https://cloud.google.com/bigquery/streaming-data-into-bigquery.
  700. if err := waitForRow(ctx, table); err != nil {
  701. t.Fatal(err)
  702. }
  703. // Read the table.
  704. checkRead(t, "upload", table.Read(ctx), wantRows)
  705. // Query the table.
  706. q := client.Query(fmt.Sprintf("select name, nums, rec from %s", table.TableID))
  707. q.DefaultProjectID = dataset.ProjectID
  708. q.DefaultDatasetID = dataset.DatasetID
  709. rit, err := q.Read(ctx)
  710. if err != nil {
  711. t.Fatal(err)
  712. }
  713. checkRead(t, "query", rit, wantRows)
  714. // Query the long way.
  715. job1, err := q.Run(ctx)
  716. if err != nil {
  717. t.Fatal(err)
  718. }
  719. if job1.LastStatus() == nil {
  720. t.Error("no LastStatus")
  721. }
  722. job2, err := client.JobFromID(ctx, job1.ID())
  723. if err != nil {
  724. t.Fatal(err)
  725. }
  726. if job2.LastStatus() == nil {
  727. t.Error("no LastStatus")
  728. }
  729. rit, err = job2.Read(ctx)
  730. if err != nil {
  731. t.Fatal(err)
  732. }
  733. checkRead(t, "job.Read", rit, wantRows)
  734. // Get statistics.
  735. jobStatus, err := job2.Status(ctx)
  736. if err != nil {
  737. t.Fatal(err)
  738. }
  739. if jobStatus.Statistics == nil {
  740. t.Fatal("jobStatus missing statistics")
  741. }
  742. if _, ok := jobStatus.Statistics.Details.(*QueryStatistics); !ok {
  743. t.Errorf("expected QueryStatistics, got %T", jobStatus.Statistics.Details)
  744. }
  745. // Test reading directly into a []Value.
  746. valueLists, schema, _, err := readAll(table.Read(ctx))
  747. if err != nil {
  748. t.Fatal(err)
  749. }
  750. it := table.Read(ctx)
  751. for i, vl := range valueLists {
  752. var got []Value
  753. if err := it.Next(&got); err != nil {
  754. t.Fatal(err)
  755. }
  756. if !testutil.Equal(it.Schema, schema) {
  757. t.Fatalf("got schema %v, want %v", it.Schema, schema)
  758. }
  759. want := []Value(vl)
  760. if !testutil.Equal(got, want) {
  761. t.Errorf("%d: got %v, want %v", i, got, want)
  762. }
  763. }
  764. // Test reading into a map.
  765. it = table.Read(ctx)
  766. for _, vl := range valueLists {
  767. var vm map[string]Value
  768. if err := it.Next(&vm); err != nil {
  769. t.Fatal(err)
  770. }
  771. if got, want := len(vm), len(vl); got != want {
  772. t.Fatalf("valueMap len: got %d, want %d", got, want)
  773. }
  774. // With maps, structs become nested maps.
  775. vl[2] = map[string]Value{"bool": vl[2].([]Value)[0]}
  776. for i, v := range vl {
  777. if got, want := vm[schema[i].Name], v; !testutil.Equal(got, want) {
  778. t.Errorf("%d, name=%s: got %#v, want %#v",
  779. i, schema[i].Name, got, want)
  780. }
  781. }
  782. }
  783. }
  784. type SubSubTestStruct struct {
  785. Integer int64
  786. }
  787. type SubTestStruct struct {
  788. String string
  789. Record SubSubTestStruct
  790. RecordArray []SubSubTestStruct
  791. }
  792. type TestStruct struct {
  793. Name string
  794. Bytes []byte
  795. Integer int64
  796. Float float64
  797. Boolean bool
  798. Timestamp time.Time
  799. Date civil.Date
  800. Time civil.Time
  801. DateTime civil.DateTime
  802. Numeric *big.Rat
  803. StringArray []string
  804. IntegerArray []int64
  805. FloatArray []float64
  806. BooleanArray []bool
  807. TimestampArray []time.Time
  808. DateArray []civil.Date
  809. TimeArray []civil.Time
  810. DateTimeArray []civil.DateTime
  811. NumericArray []*big.Rat
  812. Record SubTestStruct
  813. RecordArray []SubTestStruct
  814. }
  815. // Round times to the microsecond for comparison purposes.
  816. var roundToMicros = cmp.Transformer("RoundToMicros",
  817. func(t time.Time) time.Time { return t.Round(time.Microsecond) })
  818. func TestIntegration_InsertAndReadStructs(t *testing.T) {
  819. if client == nil {
  820. t.Skip("Integration tests skipped")
  821. }
  822. schema, err := InferSchema(TestStruct{})
  823. if err != nil {
  824. t.Fatal(err)
  825. }
  826. ctx := context.Background()
  827. table := newTable(t, schema)
  828. defer table.Delete(ctx)
  829. d := civil.Date{Year: 2016, Month: 3, Day: 20}
  830. tm := civil.Time{Hour: 15, Minute: 4, Second: 5, Nanosecond: 6000}
  831. ts := time.Date(2016, 3, 20, 15, 4, 5, 6000, time.UTC)
  832. dtm := civil.DateTime{Date: d, Time: tm}
  833. d2 := civil.Date{Year: 1994, Month: 5, Day: 15}
  834. tm2 := civil.Time{Hour: 1, Minute: 2, Second: 4, Nanosecond: 0}
  835. ts2 := time.Date(1994, 5, 15, 1, 2, 4, 0, time.UTC)
  836. dtm2 := civil.DateTime{Date: d2, Time: tm2}
  837. // Populate the table.
  838. ins := table.Inserter()
  839. want := []*TestStruct{
  840. {
  841. "a",
  842. []byte("byte"),
  843. 42,
  844. 3.14,
  845. true,
  846. ts,
  847. d,
  848. tm,
  849. dtm,
  850. big.NewRat(57, 100),
  851. []string{"a", "b"},
  852. []int64{1, 2},
  853. []float64{1, 1.41},
  854. []bool{true, false},
  855. []time.Time{ts, ts2},
  856. []civil.Date{d, d2},
  857. []civil.Time{tm, tm2},
  858. []civil.DateTime{dtm, dtm2},
  859. []*big.Rat{big.NewRat(1, 2), big.NewRat(3, 5)},
  860. SubTestStruct{
  861. "string",
  862. SubSubTestStruct{24},
  863. []SubSubTestStruct{{1}, {2}},
  864. },
  865. []SubTestStruct{
  866. {String: "empty"},
  867. {
  868. "full",
  869. SubSubTestStruct{1},
  870. []SubSubTestStruct{{1}, {2}},
  871. },
  872. },
  873. },
  874. {
  875. Name: "b",
  876. Bytes: []byte("byte2"),
  877. Integer: 24,
  878. Float: 4.13,
  879. Boolean: false,
  880. Timestamp: ts,
  881. Date: d,
  882. Time: tm,
  883. DateTime: dtm,
  884. Numeric: big.NewRat(4499, 10000),
  885. },
  886. }
  887. var savers []*StructSaver
  888. for _, s := range want {
  889. savers = append(savers, &StructSaver{Schema: schema, Struct: s})
  890. }
  891. if err := ins.Put(ctx, savers); err != nil {
  892. t.Fatal(putError(err))
  893. }
  894. // Wait until the data has been uploaded. This can take a few seconds, according
  895. // to https://cloud.google.com/bigquery/streaming-data-into-bigquery.
  896. if err := waitForRow(ctx, table); err != nil {
  897. t.Fatal(err)
  898. }
  899. // Test iteration with structs.
  900. it := table.Read(ctx)
  901. var got []*TestStruct
  902. for {
  903. var g TestStruct
  904. err := it.Next(&g)
  905. if err == iterator.Done {
  906. break
  907. }
  908. if err != nil {
  909. t.Fatal(err)
  910. }
  911. got = append(got, &g)
  912. }
  913. sort.Sort(byName(got))
  914. // BigQuery does not elide nils. It reports an error for nil fields.
  915. for i, g := range got {
  916. if i >= len(want) {
  917. t.Errorf("%d: got %v, past end of want", i, pretty.Value(g))
  918. } else if diff := testutil.Diff(g, want[i], roundToMicros); diff != "" {
  919. t.Errorf("%d: got=-, want=+:\n%s", i, diff)
  920. }
  921. }
  922. }
  923. type byName []*TestStruct
  924. func (b byName) Len() int { return len(b) }
  925. func (b byName) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
  926. func (b byName) Less(i, j int) bool { return b[i].Name < b[j].Name }
  927. func TestIntegration_InsertAndReadNullable(t *testing.T) {
  928. if client == nil {
  929. t.Skip("Integration tests skipped")
  930. }
  931. ctm := civil.Time{Hour: 15, Minute: 4, Second: 5, Nanosecond: 6000}
  932. cdt := civil.DateTime{Date: testDate, Time: ctm}
  933. rat := big.NewRat(33, 100)
  934. testInsertAndReadNullable(t, testStructNullable{}, make([]Value, len(testStructNullableSchema)))
  935. testInsertAndReadNullable(t, testStructNullable{
  936. String: NullString{"x", true},
  937. Bytes: []byte{1, 2, 3},
  938. Integer: NullInt64{1, true},
  939. Float: NullFloat64{2.3, true},
  940. Boolean: NullBool{true, true},
  941. Timestamp: NullTimestamp{testTimestamp, true},
  942. Date: NullDate{testDate, true},
  943. Time: NullTime{ctm, true},
  944. DateTime: NullDateTime{cdt, true},
  945. Numeric: rat,
  946. Record: &subNullable{X: NullInt64{4, true}},
  947. },
  948. []Value{"x", []byte{1, 2, 3}, int64(1), 2.3, true, testTimestamp, testDate, ctm, cdt, rat, []Value{int64(4)}})
  949. }
  950. func testInsertAndReadNullable(t *testing.T, ts testStructNullable, wantRow []Value) {
  951. ctx := context.Background()
  952. table := newTable(t, testStructNullableSchema)
  953. defer table.Delete(ctx)
  954. // Populate the table.
  955. ins := table.Inserter()
  956. if err := ins.Put(ctx, []*StructSaver{{Schema: testStructNullableSchema, Struct: ts}}); err != nil {
  957. t.Fatal(putError(err))
  958. }
  959. // Wait until the data has been uploaded. This can take a few seconds, according
  960. // to https://cloud.google.com/bigquery/streaming-data-into-bigquery.
  961. if err := waitForRow(ctx, table); err != nil {
  962. t.Fatal(err)
  963. }
  964. // Read into a []Value.
  965. iter := table.Read(ctx)
  966. gotRows, _, _, err := readAll(iter)
  967. if err != nil {
  968. t.Fatal(err)
  969. }
  970. if len(gotRows) != 1 {
  971. t.Fatalf("got %d rows, want 1", len(gotRows))
  972. }
  973. if diff := testutil.Diff(gotRows[0], wantRow, roundToMicros); diff != "" {
  974. t.Error(diff)
  975. }
  976. // Read into a struct.
  977. want := ts
  978. var sn testStructNullable
  979. it := table.Read(ctx)
  980. if err := it.Next(&sn); err != nil {
  981. t.Fatal(err)
  982. }
  983. if diff := testutil.Diff(sn, want, roundToMicros); diff != "" {
  984. t.Error(diff)
  985. }
  986. }
  987. func TestIntegration_TableUpdate(t *testing.T) {
  988. if client == nil {
  989. t.Skip("Integration tests skipped")
  990. }
  991. ctx := context.Background()
  992. table := newTable(t, schema)
  993. defer table.Delete(ctx)
  994. // Test Update of non-schema fields.
  995. tm, err := table.Metadata(ctx)
  996. if err != nil {
  997. t.Fatal(err)
  998. }
  999. wantDescription := tm.Description + "more"
  1000. wantName := tm.Name + "more"
  1001. wantExpiration := tm.ExpirationTime.Add(time.Hour * 24)
  1002. got, err := table.Update(ctx, TableMetadataToUpdate{
  1003. Description: wantDescription,
  1004. Name: wantName,
  1005. ExpirationTime: wantExpiration,
  1006. }, tm.ETag)
  1007. if err != nil {
  1008. t.Fatal(err)
  1009. }
  1010. if got.Description != wantDescription {
  1011. t.Errorf("Description: got %q, want %q", got.Description, wantDescription)
  1012. }
  1013. if got.Name != wantName {
  1014. t.Errorf("Name: got %q, want %q", got.Name, wantName)
  1015. }
  1016. if got.ExpirationTime != wantExpiration {
  1017. t.Errorf("ExpirationTime: got %q, want %q", got.ExpirationTime, wantExpiration)
  1018. }
  1019. if !testutil.Equal(got.Schema, schema) {
  1020. t.Errorf("Schema: got %v, want %v", pretty.Value(got.Schema), pretty.Value(schema))
  1021. }
  1022. // Blind write succeeds.
  1023. _, err = table.Update(ctx, TableMetadataToUpdate{Name: "x"}, "")
  1024. if err != nil {
  1025. t.Fatal(err)
  1026. }
  1027. // Write with old etag fails.
  1028. _, err = table.Update(ctx, TableMetadataToUpdate{Name: "y"}, got.ETag)
  1029. if err == nil {
  1030. t.Fatal("Update with old ETag succeeded, wanted failure")
  1031. }
  1032. // Test schema update.
  1033. // Columns can be added. schema2 is the same as schema, except for the
  1034. // added column in the middle.
  1035. nested := Schema{
  1036. {Name: "nested", Type: BooleanFieldType},
  1037. {Name: "other", Type: StringFieldType},
  1038. }
  1039. schema2 := Schema{
  1040. schema[0],
  1041. {Name: "rec2", Type: RecordFieldType, Schema: nested},
  1042. schema[1],
  1043. schema[2],
  1044. }
  1045. got, err = table.Update(ctx, TableMetadataToUpdate{Schema: schema2}, "")
  1046. if err != nil {
  1047. t.Fatal(err)
  1048. }
  1049. // Wherever you add the column, it appears at the end.
  1050. schema3 := Schema{schema2[0], schema2[2], schema2[3], schema2[1]}
  1051. if !testutil.Equal(got.Schema, schema3) {
  1052. t.Errorf("add field:\ngot %v\nwant %v",
  1053. pretty.Value(got.Schema), pretty.Value(schema3))
  1054. }
  1055. // Updating with the empty schema succeeds, but is a no-op.
  1056. got, err = table.Update(ctx, TableMetadataToUpdate{Schema: Schema{}}, "")
  1057. if err != nil {
  1058. t.Fatal(err)
  1059. }
  1060. if !testutil.Equal(got.Schema, schema3) {
  1061. t.Errorf("empty schema:\ngot %v\nwant %v",
  1062. pretty.Value(got.Schema), pretty.Value(schema3))
  1063. }
  1064. // Error cases when updating schema.
  1065. for _, test := range []struct {
  1066. desc string
  1067. fields Schema
  1068. }{
  1069. {"change from optional to required", Schema{
  1070. {Name: "name", Type: StringFieldType, Required: true},
  1071. schema3[1],
  1072. schema3[2],
  1073. schema3[3],
  1074. }},
  1075. {"add a required field", Schema{
  1076. schema3[0], schema3[1], schema3[2], schema3[3],
  1077. {Name: "req", Type: StringFieldType, Required: true},
  1078. }},
  1079. {"remove a field", Schema{schema3[0], schema3[1], schema3[2]}},
  1080. {"remove a nested field", Schema{
  1081. schema3[0], schema3[1], schema3[2],
  1082. {Name: "rec2", Type: RecordFieldType, Schema: Schema{nested[0]}}}},
  1083. {"remove all nested fields", Schema{
  1084. schema3[0], schema3[1], schema3[2],
  1085. {Name: "rec2", Type: RecordFieldType, Schema: Schema{}}}},
  1086. } {
  1087. _, err = table.Update(ctx, TableMetadataToUpdate{Schema: Schema(test.fields)}, "")
  1088. if err == nil {
  1089. t.Errorf("%s: want error, got nil", test.desc)
  1090. } else if !hasStatusCode(err, 400) {
  1091. t.Errorf("%s: want 400, got %v", test.desc, err)
  1092. }
  1093. }
  1094. }
  1095. func TestIntegration_Load(t *testing.T) {
  1096. if client == nil {
  1097. t.Skip("Integration tests skipped")
  1098. }
  1099. ctx := context.Background()
  1100. // CSV data can't be loaded into a repeated field, so we use a different schema.
  1101. table := newTable(t, Schema{
  1102. {Name: "name", Type: StringFieldType},
  1103. {Name: "nums", Type: IntegerFieldType},
  1104. })
  1105. defer table.Delete(ctx)
  1106. // Load the table from a reader.
  1107. r := strings.NewReader("a,0\nb,1\nc,2\n")
  1108. wantRows := [][]Value{
  1109. {"a", int64(0)},
  1110. {"b", int64(1)},
  1111. {"c", int64(2)},
  1112. }
  1113. rs := NewReaderSource(r)
  1114. loader := table.LoaderFrom(rs)
  1115. loader.WriteDisposition = WriteTruncate
  1116. loader.Labels = map[string]string{"test": "go"}
  1117. job, err := loader.Run(ctx)
  1118. if err != nil {
  1119. t.Fatal(err)
  1120. }
  1121. if job.LastStatus() == nil {
  1122. t.Error("no LastStatus")
  1123. }
  1124. conf, err := job.Config()
  1125. if err != nil {
  1126. t.Fatal(err)
  1127. }
  1128. config, ok := conf.(*LoadConfig)
  1129. if !ok {
  1130. t.Fatalf("got %T, want LoadConfig", conf)
  1131. }
  1132. diff := testutil.Diff(config, &loader.LoadConfig,
  1133. cmp.AllowUnexported(Table{}),
  1134. cmpopts.IgnoreUnexported(Client{}, ReaderSource{}),
  1135. // returned schema is at top level, not in the config
  1136. cmpopts.IgnoreFields(FileConfig{}, "Schema"))
  1137. if diff != "" {
  1138. t.Errorf("got=-, want=+:\n%s", diff)
  1139. }
  1140. if err := wait(ctx, job); err != nil {
  1141. t.Fatal(err)
  1142. }
  1143. checkReadAndTotalRows(t, "reader load", table.Read(ctx), wantRows)
  1144. }
  1145. func TestIntegration_DML(t *testing.T) {
  1146. if client == nil {
  1147. t.Skip("Integration tests skipped")
  1148. }
  1149. ctx := context.Background()
  1150. table := newTable(t, schema)
  1151. defer table.Delete(ctx)
  1152. sql := fmt.Sprintf(`INSERT %s.%s (name, nums, rec)
  1153. VALUES ('a', [0], STRUCT<BOOL>(TRUE)),
  1154. ('b', [1], STRUCT<BOOL>(FALSE)),
  1155. ('c', [2], STRUCT<BOOL>(TRUE))`,
  1156. table.DatasetID, table.TableID)
  1157. if err := runDML(ctx, sql); err != nil {
  1158. t.Fatal(err)
  1159. }
  1160. wantRows := [][]Value{
  1161. {"a", []Value{int64(0)}, []Value{true}},
  1162. {"b", []Value{int64(1)}, []Value{false}},
  1163. {"c", []Value{int64(2)}, []Value{true}},
  1164. }
  1165. checkRead(t, "DML", table.Read(ctx), wantRows)
  1166. }
  1167. func runDML(ctx context.Context, sql string) error {
  1168. // Retry insert; sometimes it fails with INTERNAL.
  1169. return internal.Retry(ctx, gax.Backoff{}, func() (stop bool, err error) {
  1170. ri, err := client.Query(sql).Read(ctx)
  1171. if err != nil {
  1172. if e, ok := err.(*googleapi.Error); ok && e.Code < 500 {
  1173. return true, err // fail on 4xx
  1174. }
  1175. return false, err
  1176. }
  1177. // It is OK to try to iterate over DML results. The first call to Next
  1178. // will return iterator.Done.
  1179. err = ri.Next(nil)
  1180. if err == nil {
  1181. return true, errors.New("want iterator.Done on the first call, got nil")
  1182. }
  1183. if err == iterator.Done {
  1184. return true, nil
  1185. }
  1186. if e, ok := err.(*googleapi.Error); ok && e.Code < 500 {
  1187. return true, err // fail on 4xx
  1188. }
  1189. return false, err
  1190. })
  1191. }
  1192. func TestIntegration_TimeTypes(t *testing.T) {
  1193. if client == nil {
  1194. t.Skip("Integration tests skipped")
  1195. }
  1196. ctx := context.Background()
  1197. dtSchema := Schema{
  1198. {Name: "d", Type: DateFieldType},
  1199. {Name: "t", Type: TimeFieldType},
  1200. {Name: "dt", Type: DateTimeFieldType},
  1201. {Name: "ts", Type: TimestampFieldType},
  1202. }
  1203. table := newTable(t, dtSchema)
  1204. defer table.Delete(ctx)
  1205. d := civil.Date{Year: 2016, Month: 3, Day: 20}
  1206. tm := civil.Time{Hour: 12, Minute: 30, Second: 0, Nanosecond: 6000}
  1207. dtm := civil.DateTime{Date: d, Time: tm}
  1208. ts := time.Date(2016, 3, 20, 15, 04, 05, 0, time.UTC)
  1209. wantRows := [][]Value{
  1210. {d, tm, dtm, ts},
  1211. }
  1212. ins := table.Inserter()
  1213. if err := ins.Put(ctx, []*ValuesSaver{
  1214. {Schema: dtSchema, Row: wantRows[0]},
  1215. }); err != nil {
  1216. t.Fatal(putError(err))
  1217. }
  1218. if err := waitForRow(ctx, table); err != nil {
  1219. t.Fatal(err)
  1220. }
  1221. // SQL wants DATETIMEs with a space between date and time, but the service
  1222. // returns them in RFC3339 form, with a "T" between.
  1223. query := fmt.Sprintf("INSERT %s.%s (d, t, dt, ts) "+
  1224. "VALUES ('%s', '%s', '%s', '%s')",
  1225. table.DatasetID, table.TableID,
  1226. d, CivilTimeString(tm), CivilDateTimeString(dtm), ts.Format("2006-01-02 15:04:05"))
  1227. if err := runDML(ctx, query); err != nil {
  1228. t.Fatal(err)
  1229. }
  1230. wantRows = append(wantRows, wantRows[0])
  1231. checkRead(t, "TimeTypes", table.Read(ctx), wantRows)
  1232. }
  1233. func TestIntegration_StandardQuery(t *testing.T) {
  1234. if client == nil {
  1235. t.Skip("Integration tests skipped")
  1236. }
  1237. ctx := context.Background()
  1238. d := civil.Date{Year: 2016, Month: 3, Day: 20}
  1239. tm := civil.Time{Hour: 15, Minute: 04, Second: 05, Nanosecond: 0}
  1240. ts := time.Date(2016, 3, 20, 15, 04, 05, 0, time.UTC)
  1241. dtm := ts.Format("2006-01-02 15:04:05")
  1242. // Constructs Value slices made up of int64s.
  1243. ints := func(args ...int) []Value {
  1244. vals := make([]Value, len(args))
  1245. for i, arg := range args {
  1246. vals[i] = int64(arg)
  1247. }
  1248. return vals
  1249. }
  1250. testCases := []struct {
  1251. query string
  1252. wantRow []Value
  1253. }{
  1254. {"SELECT 1", ints(1)},
  1255. {"SELECT 1.3", []Value{1.3}},
  1256. {"SELECT CAST(1.3 AS NUMERIC)", []Value{big.NewRat(13, 10)}},
  1257. {"SELECT NUMERIC '0.25'", []Value{big.NewRat(1, 4)}},
  1258. {"SELECT TRUE", []Value{true}},
  1259. {"SELECT 'ABC'", []Value{"ABC"}},
  1260. {"SELECT CAST('foo' AS BYTES)", []Value{[]byte("foo")}},
  1261. {fmt.Sprintf("SELECT TIMESTAMP '%s'", dtm), []Value{ts}},
  1262. {fmt.Sprintf("SELECT [TIMESTAMP '%s', TIMESTAMP '%s']", dtm, dtm), []Value{[]Value{ts, ts}}},
  1263. {fmt.Sprintf("SELECT ('hello', TIMESTAMP '%s')", dtm), []Value{[]Value{"hello", ts}}},
  1264. {fmt.Sprintf("SELECT DATETIME(TIMESTAMP '%s')", dtm), []Value{civil.DateTime{Date: d, Time: tm}}},
  1265. {fmt.Sprintf("SELECT DATE(TIMESTAMP '%s')", dtm), []Value{d}},
  1266. {fmt.Sprintf("SELECT TIME(TIMESTAMP '%s')", dtm), []Value{tm}},
  1267. {"SELECT (1, 2)", []Value{ints(1, 2)}},
  1268. {"SELECT [1, 2, 3]", []Value{ints(1, 2, 3)}},
  1269. {"SELECT ([1, 2], 3, [4, 5])", []Value{[]Value{ints(1, 2), int64(3), ints(4, 5)}}},
  1270. {"SELECT [(1, 2, 3), (4, 5, 6)]", []Value{[]Value{ints(1, 2, 3), ints(4, 5, 6)}}},
  1271. {"SELECT [([1, 2, 3], 4), ([5, 6], 7)]", []Value{[]Value{[]Value{ints(1, 2, 3), int64(4)}, []Value{ints(5, 6), int64(7)}}}},
  1272. {"SELECT ARRAY(SELECT STRUCT([1, 2]))", []Value{[]Value{[]Value{ints(1, 2)}}}},
  1273. }
  1274. for _, c := range testCases {
  1275. q := client.Query(c.query)
  1276. it, err := q.Read(ctx)
  1277. if err != nil {
  1278. t.Fatal(err)
  1279. }
  1280. checkRead(t, "StandardQuery", it, [][]Value{c.wantRow})
  1281. }
  1282. }
  1283. func TestIntegration_LegacyQuery(t *testing.T) {
  1284. if client == nil {
  1285. t.Skip("Integration tests skipped")
  1286. }
  1287. ctx := context.Background()
  1288. ts := time.Date(2016, 3, 20, 15, 04, 05, 0, time.UTC)
  1289. dtm := ts.Format("2006-01-02 15:04:05")
  1290. testCases := []struct {
  1291. query string
  1292. wantRow []Value
  1293. }{
  1294. {"SELECT 1", []Value{int64(1)}},
  1295. {"SELECT 1.3", []Value{1.3}},
  1296. {"SELECT TRUE", []Value{true}},
  1297. {"SELECT 'ABC'", []Value{"ABC"}},
  1298. {"SELECT CAST('foo' AS BYTES)", []Value{[]byte("foo")}},
  1299. {fmt.Sprintf("SELECT TIMESTAMP('%s')", dtm), []Value{ts}},
  1300. {fmt.Sprintf("SELECT DATE(TIMESTAMP('%s'))", dtm), []Value{"2016-03-20"}},
  1301. {fmt.Sprintf("SELECT TIME(TIMESTAMP('%s'))", dtm), []Value{"15:04:05"}},
  1302. }
  1303. for _, c := range testCases {
  1304. q := client.Query(c.query)
  1305. q.UseLegacySQL = true
  1306. it, err := q.Read(ctx)
  1307. if err != nil {
  1308. t.Fatal(err)
  1309. }
  1310. checkRead(t, "LegacyQuery", it, [][]Value{c.wantRow})
  1311. }
  1312. }
  1313. func TestIntegration_QueryParameters(t *testing.T) {
  1314. if client == nil {
  1315. t.Skip("Integration tests skipped")
  1316. }
  1317. ctx := context.Background()
  1318. d := civil.Date{Year: 2016, Month: 3, Day: 20}
  1319. tm := civil.Time{Hour: 15, Minute: 04, Second: 05, Nanosecond: 3008}
  1320. rtm := tm
  1321. rtm.Nanosecond = 3000 // round to microseconds
  1322. dtm := civil.DateTime{Date: d, Time: tm}
  1323. ts := time.Date(2016, 3, 20, 15, 04, 05, 0, time.UTC)
  1324. rat := big.NewRat(13, 10)
  1325. type ss struct {
  1326. String string
  1327. }
  1328. type s struct {
  1329. Timestamp time.Time
  1330. StringArray []string
  1331. SubStruct ss
  1332. SubStructArray []ss
  1333. }
  1334. testCases := []struct {
  1335. query string
  1336. parameters []QueryParameter
  1337. wantRow []Value
  1338. wantConfig interface{}
  1339. }{
  1340. {
  1341. "SELECT @val",
  1342. []QueryParameter{{"val", 1}},
  1343. []Value{int64(1)},
  1344. int64(1),
  1345. },
  1346. {
  1347. "SELECT @val",
  1348. []QueryParameter{{"val", 1.3}},
  1349. []Value{1.3},
  1350. 1.3,
  1351. },
  1352. {
  1353. "SELECT @val",
  1354. []QueryParameter{{"val", rat}},
  1355. []Value{rat},
  1356. rat,
  1357. },
  1358. {
  1359. "SELECT @val",
  1360. []QueryParameter{{"val", true}},
  1361. []Value{true},
  1362. true,
  1363. },
  1364. {
  1365. "SELECT @val",
  1366. []QueryParameter{{"val", "ABC"}},
  1367. []Value{"ABC"},
  1368. "ABC",
  1369. },
  1370. {
  1371. "SELECT @val",
  1372. []QueryParameter{{"val", []byte("foo")}},
  1373. []Value{[]byte("foo")},
  1374. []byte("foo"),
  1375. },
  1376. {
  1377. "SELECT @val",
  1378. []QueryParameter{{"val", ts}},
  1379. []Value{ts},
  1380. ts,
  1381. },
  1382. {
  1383. "SELECT @val",
  1384. []QueryParameter{{"val", []time.Time{ts, ts}}},
  1385. []Value{[]Value{ts, ts}},
  1386. []interface{}{ts, ts},
  1387. },
  1388. {
  1389. "SELECT @val",
  1390. []QueryParameter{{"val", dtm}},
  1391. []Value{civil.DateTime{Date: d, Time: rtm}},
  1392. civil.DateTime{Date: d, Time: rtm},
  1393. },
  1394. {
  1395. "SELECT @val",
  1396. []QueryParameter{{"val", d}},
  1397. []Value{d},
  1398. d,
  1399. },
  1400. {
  1401. "SELECT @val",
  1402. []QueryParameter{{"val", tm}},
  1403. []Value{rtm},
  1404. rtm,
  1405. },
  1406. {
  1407. "SELECT @val",
  1408. []QueryParameter{{"val", s{ts, []string{"a", "b"}, ss{"c"}, []ss{{"d"}, {"e"}}}}},
  1409. []Value{[]Value{ts, []Value{"a", "b"}, []Value{"c"}, []Value{[]Value{"d"}, []Value{"e"}}}},
  1410. map[string]interface{}{
  1411. "Timestamp": ts,
  1412. "StringArray": []interface{}{"a", "b"},
  1413. "SubStruct": map[string]interface{}{"String": "c"},
  1414. "SubStructArray": []interface{}{
  1415. map[string]interface{}{"String": "d"},
  1416. map[string]interface{}{"String": "e"},
  1417. },
  1418. },
  1419. },
  1420. {
  1421. "SELECT @val.Timestamp, @val.SubStruct.String",
  1422. []QueryParameter{{"val", s{Timestamp: ts, SubStruct: ss{"a"}}}},
  1423. []Value{ts, "a"},
  1424. map[string]interface{}{
  1425. "Timestamp": ts,
  1426. "SubStruct": map[string]interface{}{"String": "a"},
  1427. "StringArray": nil,
  1428. "SubStructArray": nil,
  1429. },
  1430. },
  1431. }
  1432. for _, c := range testCases {
  1433. q := client.Query(c.query)
  1434. q.Parameters = c.parameters
  1435. job, err := q.Run(ctx)
  1436. if err != nil {
  1437. t.Fatal(err)
  1438. }
  1439. if job.LastStatus() == nil {
  1440. t.Error("no LastStatus")
  1441. }
  1442. it, err := job.Read(ctx)
  1443. if err != nil {
  1444. t.Fatal(err)
  1445. }
  1446. checkRead(t, "QueryParameters", it, [][]Value{c.wantRow})
  1447. config, err := job.Config()
  1448. if err != nil {
  1449. t.Fatal(err)
  1450. }
  1451. got := config.(*QueryConfig).Parameters[0].Value
  1452. if !testutil.Equal(got, c.wantConfig) {
  1453. t.Errorf("param %[1]v (%[1]T): config:\ngot %[2]v (%[2]T)\nwant %[3]v (%[3]T)",
  1454. c.parameters[0].Value, got, c.wantConfig)
  1455. }
  1456. }
  1457. }
  1458. func TestIntegration_QueryDryRun(t *testing.T) {
  1459. if client == nil {
  1460. t.Skip("Integration tests skipped")
  1461. }
  1462. ctx := context.Background()
  1463. q := client.Query("SELECT word from " + stdName + " LIMIT 10")
  1464. q.DryRun = true
  1465. job, err := q.Run(ctx)
  1466. if err != nil {
  1467. t.Fatal(err)
  1468. }
  1469. s := job.LastStatus()
  1470. if s.State != Done {
  1471. t.Errorf("state is %v, expected Done", s.State)
  1472. }
  1473. if s.Statistics == nil {
  1474. t.Fatal("no statistics")
  1475. }
  1476. if s.Statistics.Details.(*QueryStatistics).Schema == nil {
  1477. t.Fatal("no schema")
  1478. }
  1479. }
  1480. func TestIntegration_ExtractExternal(t *testing.T) {
  1481. // Create a table, extract it to GCS, then query it externally.
  1482. if client == nil {
  1483. t.Skip("Integration tests skipped")
  1484. }
  1485. ctx := context.Background()
  1486. schema := Schema{
  1487. {Name: "name", Type: StringFieldType},
  1488. {Name: "num", Type: IntegerFieldType},
  1489. }
  1490. table := newTable(t, schema)
  1491. defer table.Delete(ctx)
  1492. // Insert table data.
  1493. sql := fmt.Sprintf(`INSERT %s.%s (name, num)
  1494. VALUES ('a', 1), ('b', 2), ('c', 3)`,
  1495. table.DatasetID, table.TableID)
  1496. if err := runDML(ctx, sql); err != nil {
  1497. t.Fatal(err)
  1498. }
  1499. // Extract to a GCS object as CSV.
  1500. bucketName := testutil.ProjID()
  1501. objectName := fmt.Sprintf("bq-test-%s.csv", table.TableID)
  1502. uri := fmt.Sprintf("gs://%s/%s", bucketName, objectName)
  1503. defer storageClient.Bucket(bucketName).Object(objectName).Delete(ctx)
  1504. gr := NewGCSReference(uri)
  1505. gr.DestinationFormat = CSV
  1506. e := table.ExtractorTo(gr)
  1507. job, err := e.Run(ctx)
  1508. if err != nil {
  1509. t.Fatal(err)
  1510. }
  1511. conf, err := job.Config()
  1512. if err != nil {
  1513. t.Fatal(err)
  1514. }
  1515. config, ok := conf.(*ExtractConfig)
  1516. if !ok {
  1517. t.Fatalf("got %T, want ExtractConfig", conf)
  1518. }
  1519. diff := testutil.Diff(config, &e.ExtractConfig,
  1520. cmp.AllowUnexported(Table{}),
  1521. cmpopts.IgnoreUnexported(Client{}))
  1522. if diff != "" {
  1523. t.Errorf("got=-, want=+:\n%s", diff)
  1524. }
  1525. if err := wait(ctx, job); err != nil {
  1526. t.Fatal(err)
  1527. }
  1528. edc := &ExternalDataConfig{
  1529. SourceFormat: CSV,
  1530. SourceURIs: []string{uri},
  1531. Schema: schema,
  1532. Options: &CSVOptions{
  1533. SkipLeadingRows: 1,
  1534. // This is the default. Since we use edc as an expectation later on,
  1535. // let's just be explicit.
  1536. FieldDelimiter: ",",
  1537. },
  1538. }
  1539. // Query that CSV file directly.
  1540. q := client.Query("SELECT * FROM csv")
  1541. q.TableDefinitions = map[string]ExternalData{"csv": edc}
  1542. wantRows := [][]Value{
  1543. {"a", int64(1)},
  1544. {"b", int64(2)},
  1545. {"c", int64(3)},
  1546. }
  1547. iter, err := q.Read(ctx)
  1548. if err != nil {
  1549. t.Fatal(err)
  1550. }
  1551. checkReadAndTotalRows(t, "external query", iter, wantRows)
  1552. // Make a table pointing to the file, and query it.
  1553. // BigQuery does not allow a Table.Read on an external table.
  1554. table = dataset.Table(tableIDs.New())
  1555. err = table.Create(context.Background(), &TableMetadata{
  1556. Schema: schema,
  1557. ExpirationTime: testTableExpiration,
  1558. ExternalDataConfig: edc,
  1559. })
  1560. if err != nil {
  1561. t.Fatal(err)
  1562. }
  1563. q = client.Query(fmt.Sprintf("SELECT * FROM %s.%s", table.DatasetID, table.TableID))
  1564. iter, err = q.Read(ctx)
  1565. if err != nil {
  1566. t.Fatal(err)
  1567. }
  1568. checkReadAndTotalRows(t, "external table", iter, wantRows)
  1569. // While we're here, check that the table metadata is correct.
  1570. md, err := table.Metadata(ctx)
  1571. if err != nil {
  1572. t.Fatal(err)
  1573. }
  1574. // One difference: since BigQuery returns the schema as part of the ordinary
  1575. // table metadata, it does not populate ExternalDataConfig.Schema.
  1576. md.ExternalDataConfig.Schema = md.Schema
  1577. if diff := testutil.Diff(md.ExternalDataConfig, edc); diff != "" {
  1578. t.Errorf("got=-, want=+\n%s", diff)
  1579. }
  1580. }
  1581. func TestIntegration_ReadNullIntoStruct(t *testing.T) {
  1582. // Reading a null into a struct field should return an error (not panic).
  1583. if client == nil {
  1584. t.Skip("Integration tests skipped")
  1585. }
  1586. ctx := context.Background()
  1587. table := newTable(t, schema)
  1588. defer table.Delete(ctx)
  1589. ins := table.Inserter()
  1590. row := &ValuesSaver{
  1591. Schema: schema,
  1592. Row: []Value{nil, []Value{}, []Value{nil}},
  1593. }
  1594. if err := ins.Put(ctx, []*ValuesSaver{row}); err != nil {
  1595. t.Fatal(putError(err))
  1596. }
  1597. if err := waitForRow(ctx, table); err != nil {
  1598. t.Fatal(err)
  1599. }
  1600. q := client.Query(fmt.Sprintf("select name from %s", table.TableID))
  1601. q.DefaultProjectID = dataset.ProjectID
  1602. q.DefaultDatasetID = dataset.DatasetID
  1603. it, err := q.Read(ctx)
  1604. if err != nil {
  1605. t.Fatal(err)
  1606. }
  1607. type S struct{ Name string }
  1608. var s S
  1609. if err := it.Next(&s); err == nil {
  1610. t.Fatal("got nil, want error")
  1611. }
  1612. }
  1613. const (
  1614. stdName = "`bigquery-public-data.samples.shakespeare`"
  1615. legacyName = "[bigquery-public-data:samples.shakespeare]"
  1616. )
  1617. // These tests exploit the fact that the two SQL versions have different syntaxes for
  1618. // fully-qualified table names.
  1619. var useLegacySQLTests = []struct {
  1620. t string // name of table
  1621. std, legacy bool // use standard/legacy SQL
  1622. err bool // do we expect an error?
  1623. }{
  1624. {t: legacyName, std: false, legacy: true, err: false},
  1625. {t: legacyName, std: true, legacy: false, err: true},
  1626. {t: legacyName, std: false, legacy: false, err: true}, // standard SQL is default
  1627. {t: legacyName, std: true, legacy: true, err: true},
  1628. {t: stdName, std: false, legacy: true, err: true},
  1629. {t: stdName, std: true, legacy: false, err: false},
  1630. {t: stdName, std: false, legacy: false, err: false}, // standard SQL is default
  1631. {t: stdName, std: true, legacy: true, err: true},
  1632. }
  1633. func TestIntegration_QueryUseLegacySQL(t *testing.T) {
  1634. // Test the UseLegacySQL and UseStandardSQL options for queries.
  1635. if client == nil {
  1636. t.Skip("Integration tests skipped")
  1637. }
  1638. ctx := context.Background()
  1639. for _, test := range useLegacySQLTests {
  1640. q := client.Query(fmt.Sprintf("select word from %s limit 1", test.t))
  1641. q.UseStandardSQL = test.std
  1642. q.UseLegacySQL = test.legacy
  1643. _, err := q.Read(ctx)
  1644. gotErr := err != nil
  1645. if gotErr && !test.err {
  1646. t.Errorf("%+v:\nunexpected error: %v", test, err)
  1647. } else if !gotErr && test.err {
  1648. t.Errorf("%+v:\nsucceeded, but want error", test)
  1649. }
  1650. }
  1651. }
  1652. func TestIntegration_TableUseLegacySQL(t *testing.T) {
  1653. // Test UseLegacySQL and UseStandardSQL for Table.Create.
  1654. if client == nil {
  1655. t.Skip("Integration tests skipped")
  1656. }
  1657. ctx := context.Background()
  1658. table := newTable(t, schema)
  1659. defer table.Delete(ctx)
  1660. for i, test := range useLegacySQLTests {
  1661. view := dataset.Table(fmt.Sprintf("t_view_%d", i))
  1662. tm := &TableMetadata{
  1663. ViewQuery: fmt.Sprintf("SELECT word from %s", test.t),
  1664. UseStandardSQL: test.std,
  1665. UseLegacySQL: test.legacy,
  1666. }
  1667. err := view.Create(ctx, tm)
  1668. gotErr := err != nil
  1669. if gotErr && !test.err {
  1670. t.Errorf("%+v:\nunexpected error: %v", test, err)
  1671. } else if !gotErr && test.err {
  1672. t.Errorf("%+v:\nsucceeded, but want error", test)
  1673. }
  1674. _ = view.Delete(ctx)
  1675. }
  1676. }
  1677. func TestIntegration_ListJobs(t *testing.T) {
  1678. // It's difficult to test the list of jobs, because we can't easily
  1679. // control what's in it. Also, there are many jobs in the test project,
  1680. // and it takes considerable time to list them all.
  1681. if client == nil {
  1682. t.Skip("Integration tests skipped")
  1683. }
  1684. ctx := context.Background()
  1685. // About all we can do is list a few jobs.
  1686. const max = 20
  1687. var jobs []*Job
  1688. it := client.Jobs(ctx)
  1689. for {
  1690. job, err := it.Next()
  1691. if err == iterator.Done {
  1692. break
  1693. }
  1694. if err != nil {
  1695. t.Fatal(err)
  1696. }
  1697. jobs = append(jobs, job)
  1698. if len(jobs) >= max {
  1699. break
  1700. }
  1701. }
  1702. // We expect that there is at least one job in the last few months.
  1703. if len(jobs) == 0 {
  1704. t.Fatal("did not get any jobs")
  1705. }
  1706. }
  1707. const tokyo = "asia-northeast1"
  1708. func TestIntegration_Location(t *testing.T) {
  1709. if client == nil {
  1710. t.Skip("Integration tests skipped")
  1711. }
  1712. client.Location = ""
  1713. testLocation(t, tokyo)
  1714. client.Location = tokyo
  1715. defer func() {
  1716. client.Location = ""
  1717. }()
  1718. testLocation(t, "")
  1719. }
  1720. func testLocation(t *testing.T, loc string) {
  1721. ctx := context.Background()
  1722. tokyoDataset := client.Dataset("tokyo")
  1723. err := tokyoDataset.Create(ctx, &DatasetMetadata{Location: loc})
  1724. if err != nil && !hasStatusCode(err, 409) { // 409 = already exists
  1725. t.Fatal(err)
  1726. }
  1727. md, err := tokyoDataset.Metadata(ctx)
  1728. if err != nil {
  1729. t.Fatal(err)
  1730. }
  1731. if md.Location != tokyo {
  1732. t.Fatalf("dataset location: got %s, want %s", md.Location, tokyo)
  1733. }
  1734. table := tokyoDataset.Table(tableIDs.New())
  1735. err = table.Create(context.Background(), &TableMetadata{
  1736. Schema: Schema{
  1737. {Name: "name", Type: StringFieldType},
  1738. {Name: "nums", Type: IntegerFieldType},
  1739. },
  1740. ExpirationTime: testTableExpiration,
  1741. })
  1742. if err != nil {
  1743. t.Fatal(err)
  1744. }
  1745. defer table.Delete(ctx)
  1746. loader := table.LoaderFrom(NewReaderSource(strings.NewReader("a,0\nb,1\nc,2\n")))
  1747. loader.Location = loc
  1748. job, err := loader.Run(ctx)
  1749. if err != nil {
  1750. t.Fatal("loader.Run", err)
  1751. }
  1752. if job.Location() != tokyo {
  1753. t.Fatalf("job location: got %s, want %s", job.Location(), tokyo)
  1754. }
  1755. _, err = client.JobFromID(ctx, job.ID())
  1756. if client.Location == "" && err == nil {
  1757. t.Error("JobFromID with Tokyo job, no client location: want error, got nil")
  1758. }
  1759. if client.Location != "" && err != nil {
  1760. t.Errorf("JobFromID with Tokyo job, with client location: want nil, got %v", err)
  1761. }
  1762. _, err = client.JobFromIDLocation(ctx, job.ID(), "US")
  1763. if err == nil {
  1764. t.Error("JobFromIDLocation with US: want error, got nil")
  1765. }
  1766. job2, err := client.JobFromIDLocation(ctx, job.ID(), loc)
  1767. if loc == tokyo && err != nil {
  1768. t.Errorf("loc=tokyo: %v", err)
  1769. }
  1770. if loc == "" && err == nil {
  1771. t.Error("loc empty: got nil, want error")
  1772. }
  1773. if job2 != nil && (job2.ID() != job.ID() || job2.Location() != tokyo) {
  1774. t.Errorf("got id %s loc %s, want id%s loc %s", job2.ID(), job2.Location(), job.ID(), tokyo)
  1775. }
  1776. if err := wait(ctx, job); err != nil {
  1777. t.Fatal(err)
  1778. }
  1779. // Cancel should succeed even if the job is done.
  1780. if err := job.Cancel(ctx); err != nil {
  1781. t.Fatal(err)
  1782. }
  1783. q := client.Query(fmt.Sprintf("SELECT * FROM %s.%s", table.DatasetID, table.TableID))
  1784. q.Location = loc
  1785. iter, err := q.Read(ctx)
  1786. if err != nil {
  1787. t.Fatal(err)
  1788. }
  1789. wantRows := [][]Value{
  1790. {"a", int64(0)},
  1791. {"b", int64(1)},
  1792. {"c", int64(2)},
  1793. }
  1794. checkRead(t, "location", iter, wantRows)
  1795. table2 := tokyoDataset.Table(tableIDs.New())
  1796. copier := table2.CopierFrom(table)
  1797. copier.Location = loc
  1798. if _, err := copier.Run(ctx); err != nil {
  1799. t.Fatal(err)
  1800. }
  1801. bucketName := testutil.ProjID()
  1802. objectName := fmt.Sprintf("bq-test-%s.csv", table.TableID)
  1803. uri := fmt.Sprintf("gs://%s/%s", bucketName, objectName)
  1804. defer storageClient.Bucket(bucketName).Object(objectName).Delete(ctx)
  1805. gr := NewGCSReference(uri)
  1806. gr.DestinationFormat = CSV
  1807. e := table.ExtractorTo(gr)
  1808. e.Location = loc
  1809. if _, err := e.Run(ctx); err != nil {
  1810. t.Fatal(err)
  1811. }
  1812. }
  1813. func TestIntegration_NumericErrors(t *testing.T) {
  1814. // Verify that the service returns an error for a big.Rat that's too large.
  1815. if client == nil {
  1816. t.Skip("Integration tests skipped")
  1817. }
  1818. ctx := context.Background()
  1819. schema := Schema{{Name: "n", Type: NumericFieldType}}
  1820. table := newTable(t, schema)
  1821. defer table.Delete(ctx)
  1822. tooBigRat := &big.Rat{}
  1823. if _, ok := tooBigRat.SetString("1e40"); !ok {
  1824. t.Fatal("big.Rat.SetString failed")
  1825. }
  1826. ins := table.Inserter()
  1827. err := ins.Put(ctx, []*ValuesSaver{{Schema: schema, Row: []Value{tooBigRat}}})
  1828. if err == nil {
  1829. t.Fatal("got nil, want error")
  1830. }
  1831. }
  1832. func TestIntegration_QueryErrors(t *testing.T) {
  1833. // Verify that a bad query returns an appropriate error.
  1834. if client == nil {
  1835. t.Skip("Integration tests skipped")
  1836. }
  1837. ctx := context.Background()
  1838. q := client.Query("blah blah broken")
  1839. _, err := q.Read(ctx)
  1840. const want = "invalidQuery"
  1841. if !strings.Contains(err.Error(), want) {
  1842. t.Fatalf("got %q, want substring %q", err, want)
  1843. }
  1844. }
  1845. func TestIntegration_Model(t *testing.T) {
  1846. // Create an ML model.
  1847. if client == nil {
  1848. t.Skip("Integration tests skipped")
  1849. }
  1850. ctx := context.Background()
  1851. schema := Schema{
  1852. {Name: "input", Type: IntegerFieldType},
  1853. {Name: "label", Type: IntegerFieldType},
  1854. }
  1855. table := newTable(t, schema)
  1856. defer table.Delete(ctx)
  1857. // Insert table data.
  1858. tableName := fmt.Sprintf("%s.%s", table.DatasetID, table.TableID)
  1859. sql := fmt.Sprintf(`INSERT %s (input, label)
  1860. VALUES (1, 0), (2, 1), (3, 0), (4, 1)`,
  1861. tableName)
  1862. wantNumRows := 4
  1863. if err := runDML(ctx, sql); err != nil {
  1864. t.Fatal(err)
  1865. }
  1866. model := dataset.Table("my_model")
  1867. modelName := fmt.Sprintf("%s.%s", model.DatasetID, model.TableID)
  1868. sql = fmt.Sprintf(`CREATE MODEL %s OPTIONS (model_type='logistic_reg') AS SELECT input, label FROM %s`,
  1869. modelName, tableName)
  1870. if err := runDML(ctx, sql); err != nil {
  1871. t.Fatal(err)
  1872. }
  1873. defer model.Delete(ctx)
  1874. sql = fmt.Sprintf(`SELECT * FROM ml.PREDICT(MODEL %s, TABLE %s)`, modelName, tableName)
  1875. q := client.Query(sql)
  1876. ri, err := q.Read(ctx)
  1877. if err != nil {
  1878. t.Fatal(err)
  1879. }
  1880. rows, _, _, err := readAll(ri)
  1881. if err != nil {
  1882. t.Fatal(err)
  1883. }
  1884. if got := len(rows); got != wantNumRows {
  1885. t.Fatalf("got %d rows in prediction table, want %d", got, wantNumRows)
  1886. }
  1887. iter := dataset.Tables(ctx)
  1888. seen := false
  1889. for {
  1890. tbl, err := iter.Next()
  1891. if err == iterator.Done {
  1892. break
  1893. }
  1894. if err != nil {
  1895. t.Fatal(err)
  1896. }
  1897. if tbl.TableID == "my_model" {
  1898. seen = true
  1899. }
  1900. }
  1901. if !seen {
  1902. t.Fatal("model not listed in dataset")
  1903. }
  1904. if err := model.Delete(ctx); err != nil {
  1905. t.Fatal(err)
  1906. }
  1907. }
  1908. // Creates a new, temporary table with a unique name and the given schema.
  1909. func newTable(t *testing.T, s Schema) *Table {
  1910. table := dataset.Table(tableIDs.New())
  1911. err := table.Create(context.Background(), &TableMetadata{
  1912. Schema: s,
  1913. ExpirationTime: testTableExpiration,
  1914. })
  1915. if err != nil {
  1916. t.Fatal(err)
  1917. }
  1918. return table
  1919. }
  1920. func checkRead(t *testing.T, msg string, it *RowIterator, want [][]Value) {
  1921. if msg2, ok := compareRead(it, want, false); !ok {
  1922. t.Errorf("%s: %s", msg, msg2)
  1923. }
  1924. }
  1925. func checkReadAndTotalRows(t *testing.T, msg string, it *RowIterator, want [][]Value) {
  1926. if msg2, ok := compareRead(it, want, true); !ok {
  1927. t.Errorf("%s: %s", msg, msg2)
  1928. }
  1929. }
  1930. func compareRead(it *RowIterator, want [][]Value, compareTotalRows bool) (msg string, ok bool) {
  1931. got, _, totalRows, err := readAll(it)
  1932. if err != nil {
  1933. return err.Error(), false
  1934. }
  1935. if len(got) != len(want) {
  1936. return fmt.Sprintf("got %d rows, want %d", len(got), len(want)), false
  1937. }
  1938. if compareTotalRows && len(got) != int(totalRows) {
  1939. return fmt.Sprintf("got %d rows, but totalRows = %d", len(got), totalRows), false
  1940. }
  1941. sort.Sort(byCol0(got))
  1942. for i, r := range got {
  1943. gotRow := []Value(r)
  1944. wantRow := want[i]
  1945. if !testutil.Equal(gotRow, wantRow) {
  1946. return fmt.Sprintf("#%d: got %#v, want %#v", i, gotRow, wantRow), false
  1947. }
  1948. }
  1949. return "", true
  1950. }
  1951. func readAll(it *RowIterator) ([][]Value, Schema, uint64, error) {
  1952. var (
  1953. rows [][]Value
  1954. schema Schema
  1955. totalRows uint64
  1956. )
  1957. for {
  1958. var vals []Value
  1959. err := it.Next(&vals)
  1960. if err == iterator.Done {
  1961. return rows, schema, totalRows, nil
  1962. }
  1963. if err != nil {
  1964. return nil, nil, 0, err
  1965. }
  1966. rows = append(rows, vals)
  1967. schema = it.Schema
  1968. totalRows = it.TotalRows
  1969. }
  1970. }
  1971. type byCol0 [][]Value
  1972. func (b byCol0) Len() int { return len(b) }
  1973. func (b byCol0) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
  1974. func (b byCol0) Less(i, j int) bool {
  1975. switch a := b[i][0].(type) {
  1976. case string:
  1977. return a < b[j][0].(string)
  1978. case civil.Date:
  1979. return a.Before(b[j][0].(civil.Date))
  1980. default:
  1981. panic("unknown type")
  1982. }
  1983. }
  1984. func hasStatusCode(err error, code int) bool {
  1985. if e, ok := err.(*googleapi.Error); ok && e.Code == code {
  1986. return true
  1987. }
  1988. return false
  1989. }
  1990. // wait polls the job until it is complete or an error is returned.
  1991. func wait(ctx context.Context, job *Job) error {
  1992. status, err := job.Wait(ctx)
  1993. if err != nil {
  1994. return err
  1995. }
  1996. if status.Err() != nil {
  1997. return fmt.Errorf("job status error: %#v", status.Err())
  1998. }
  1999. if status.Statistics == nil {
  2000. return errors.New("nil Statistics")
  2001. }
  2002. if status.Statistics.EndTime.IsZero() {
  2003. return errors.New("EndTime is zero")
  2004. }
  2005. if status.Statistics.Details == nil {
  2006. return errors.New("nil Statistics.Details")
  2007. }
  2008. return nil
  2009. }
  2010. // waitForRow polls the table until it contains a row.
  2011. // TODO(jba): use internal.Retry.
  2012. func waitForRow(ctx context.Context, table *Table) error {
  2013. for {
  2014. it := table.Read(ctx)
  2015. var v []Value
  2016. err := it.Next(&v)
  2017. if err == nil {
  2018. return nil
  2019. }
  2020. if err != iterator.Done {
  2021. return err
  2022. }
  2023. time.Sleep(1 * time.Second)
  2024. }
  2025. }
  2026. func putError(err error) string {
  2027. pme, ok := err.(PutMultiError)
  2028. if !ok {
  2029. return err.Error()
  2030. }
  2031. var msgs []string
  2032. for _, err := range pme {
  2033. msgs = append(msgs, err.Error())
  2034. }
  2035. return strings.Join(msgs, "\n")
  2036. }