load.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. // Copyright 2016 Google LLC
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package bigquery
  15. import (
  16. "context"
  17. "io"
  18. "cloud.google.com/go/internal/trace"
  19. bq "google.golang.org/api/bigquery/v2"
  20. )
  21. // LoadConfig holds the configuration for a load job.
  22. type LoadConfig struct {
  23. // Src is the source from which data will be loaded.
  24. Src LoadSource
  25. // Dst is the table into which the data will be loaded.
  26. Dst *Table
  27. // CreateDisposition specifies the circumstances under which the destination table will be created.
  28. // The default is CreateIfNeeded.
  29. CreateDisposition TableCreateDisposition
  30. // WriteDisposition specifies how existing data in the destination table is treated.
  31. // The default is WriteAppend.
  32. WriteDisposition TableWriteDisposition
  33. // The labels associated with this job.
  34. Labels map[string]string
  35. // If non-nil, the destination table is partitioned by time.
  36. TimePartitioning *TimePartitioning
  37. // Clustering specifies the data clustering configuration for the destination table.
  38. Clustering *Clustering
  39. // Custom encryption configuration (e.g., Cloud KMS keys).
  40. DestinationEncryptionConfig *EncryptionConfig
  41. // Allows the schema of the destination table to be updated as a side effect of
  42. // the load job.
  43. SchemaUpdateOptions []string
  44. }
  45. func (l *LoadConfig) toBQ() (*bq.JobConfiguration, io.Reader) {
  46. config := &bq.JobConfiguration{
  47. Labels: l.Labels,
  48. Load: &bq.JobConfigurationLoad{
  49. CreateDisposition: string(l.CreateDisposition),
  50. WriteDisposition: string(l.WriteDisposition),
  51. DestinationTable: l.Dst.toBQ(),
  52. TimePartitioning: l.TimePartitioning.toBQ(),
  53. Clustering: l.Clustering.toBQ(),
  54. DestinationEncryptionConfiguration: l.DestinationEncryptionConfig.toBQ(),
  55. SchemaUpdateOptions: l.SchemaUpdateOptions,
  56. },
  57. }
  58. media := l.Src.populateLoadConfig(config.Load)
  59. return config, media
  60. }
  61. func bqToLoadConfig(q *bq.JobConfiguration, c *Client) *LoadConfig {
  62. lc := &LoadConfig{
  63. Labels: q.Labels,
  64. CreateDisposition: TableCreateDisposition(q.Load.CreateDisposition),
  65. WriteDisposition: TableWriteDisposition(q.Load.WriteDisposition),
  66. Dst: bqToTable(q.Load.DestinationTable, c),
  67. TimePartitioning: bqToTimePartitioning(q.Load.TimePartitioning),
  68. Clustering: bqToClustering(q.Load.Clustering),
  69. DestinationEncryptionConfig: bqToEncryptionConfig(q.Load.DestinationEncryptionConfiguration),
  70. SchemaUpdateOptions: q.Load.SchemaUpdateOptions,
  71. }
  72. var fc *FileConfig
  73. if len(q.Load.SourceUris) == 0 {
  74. s := NewReaderSource(nil)
  75. fc = &s.FileConfig
  76. lc.Src = s
  77. } else {
  78. s := NewGCSReference(q.Load.SourceUris...)
  79. fc = &s.FileConfig
  80. lc.Src = s
  81. }
  82. bqPopulateFileConfig(q.Load, fc)
  83. return lc
  84. }
  85. // A Loader loads data from Google Cloud Storage into a BigQuery table.
  86. type Loader struct {
  87. JobIDConfig
  88. LoadConfig
  89. c *Client
  90. }
  91. // A LoadSource represents a source of data that can be loaded into
  92. // a BigQuery table.
  93. //
  94. // This package defines two LoadSources: GCSReference, for Google Cloud Storage
  95. // objects, and ReaderSource, for data read from an io.Reader.
  96. type LoadSource interface {
  97. // populates config, returns media
  98. populateLoadConfig(*bq.JobConfigurationLoad) io.Reader
  99. }
  100. // LoaderFrom returns a Loader which can be used to load data into a BigQuery table.
  101. // The returned Loader may optionally be further configured before its Run method is called.
  102. // See GCSReference and ReaderSource for additional configuration options that
  103. // affect loading.
  104. func (t *Table) LoaderFrom(src LoadSource) *Loader {
  105. return &Loader{
  106. c: t.c,
  107. LoadConfig: LoadConfig{
  108. Src: src,
  109. Dst: t,
  110. },
  111. }
  112. }
  113. // Run initiates a load job.
  114. func (l *Loader) Run(ctx context.Context) (j *Job, err error) {
  115. ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Load.Run")
  116. defer func() { trace.EndSpan(ctx, err) }()
  117. job, media := l.newJob()
  118. return l.c.insertJob(ctx, job, media)
  119. }
  120. func (l *Loader) newJob() (*bq.Job, io.Reader) {
  121. config, media := l.LoadConfig.toBQ()
  122. return &bq.Job{
  123. JobReference: l.JobIDConfig.createJobRef(l.c),
  124. Configuration: config,
  125. }, media
  126. }