Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions bigquery/google/cloud/bigquery/_pandas_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,21 @@

_PROGRESS_INTERVAL = 0.2 # Maximum time between download status checks, in seconds.

_PANDAS_DTYPE_TO_BQ = {
"bool": "BOOLEAN",
"datetime64[ns, UTC]": "TIMESTAMP",
"datetime64[ns]": "DATETIME",
"float32": "FLOAT",
"float64": "FLOAT",
"int8": "INTEGER",
"int16": "INTEGER",
"int32": "INTEGER",
"int64": "INTEGER",
"uint8": "INTEGER",
"uint16": "INTEGER",
"uint32": "INTEGER",
}


class _DownloadState(object):
"""Flag to indicate that a thread should exit early."""
Expand Down Expand Up @@ -172,6 +187,31 @@ def bq_to_arrow_array(series, bq_field):
return pyarrow.array(series, type=arrow_type)


def dataframe_to_bq_schema(dataframe):
"""Convert a pandas DataFrame schema to a BigQuery schema.

TODO(GH#8140): Add bq_schema argument to allow overriding autodetected
schema for a subset of columns.

Args:
dataframe (pandas.DataFrame):
DataFrame to convert to convert to Parquet file.

Returns:
Optional[Sequence[google.cloud.bigquery.schema.SchemaField]]:
The automatically determined schema. Returns None if the type of
any column cannot be determined.
"""
bq_schema = []
for column, dtype in zip(dataframe.columns, dataframe.dtypes):
bq_type = _PANDAS_DTYPE_TO_BQ.get(dtype.name)
if not bq_type:
return None
bq_field = schema.SchemaField(column, bq_type)
bq_schema.append(bq_field)
return tuple(bq_schema)


def dataframe_to_arrow(dataframe, bq_schema):
"""Convert pandas dataframe to Arrow table, using BigQuery schema.

Expand Down
9 changes: 9 additions & 0 deletions bigquery/google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
except ImportError: # Python 2.7
import collections as collections_abc

import copy
import functools
import gzip
import io
Expand Down Expand Up @@ -1520,11 +1521,19 @@ def load_table_from_dataframe(

if job_config is None:
job_config = job.LoadJobConfig()
else:
# Make a copy so that the job config isn't modified in-place.
job_config_properties = copy.deepcopy(job_config._properties)
job_config = job.LoadJobConfig()
job_config._properties = job_config_properties
job_config.source_format = job.SourceFormat.PARQUET

if location is None:
location = self.location

if not job_config.schema:
job_config.schema = _pandas_helpers.dataframe_to_bq_schema(dataframe)

tmpfd, tmppath = tempfile.mkstemp(suffix="_job_{}.parquet".format(job_id[:8]))
os.close(tmpfd)

Expand Down
94 changes: 94 additions & 0 deletions bigquery/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,100 @@ def test_load_table_from_local_avro_file_then_dump_table(self):
sorted(row_tuples, key=by_wavelength), sorted(ROWS, key=by_wavelength)
)

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
def test_load_table_from_dataframe_w_automatic_schema(self):
"""Test that a DataFrame with dtypes that map well to BigQuery types
can be uploaded without specifying a schema.

https://github.com/googleapis/google-cloud-python/issues/9044
"""
bool_col = pandas.Series([True, False, True], dtype="bool")
ts_col = pandas.Series(
[
datetime.datetime(2010, 1, 2, 3, 44, 50),
datetime.datetime(2011, 2, 3, 14, 50, 59),
datetime.datetime(2012, 3, 14, 15, 16),
],
dtype="datetime64[ns]",
).dt.tz_localize(pytz.utc)
dt_col = pandas.Series(
[
datetime.datetime(2010, 1, 2, 3, 44, 50),
datetime.datetime(2011, 2, 3, 14, 50, 59),
datetime.datetime(2012, 3, 14, 15, 16),
],
dtype="datetime64[ns]",
)
float32_col = pandas.Series([1.0, 2.0, 3.0], dtype="float32")
float64_col = pandas.Series([4.0, 5.0, 6.0], dtype="float64")
int8_col = pandas.Series([-12, -11, -10], dtype="int8")
int16_col = pandas.Series([-9, -8, -7], dtype="int16")
int32_col = pandas.Series([-6, -5, -4], dtype="int32")
int64_col = pandas.Series([-3, -2, -1], dtype="int64")
uint8_col = pandas.Series([0, 1, 2], dtype="uint8")
uint16_col = pandas.Series([3, 4, 5], dtype="uint16")
uint32_col = pandas.Series([6, 7, 8], dtype="uint32")
dataframe = pandas.DataFrame(
{
"bool_col": bool_col,
"ts_col": ts_col,
"dt_col": dt_col,
"float32_col": float32_col,
"float64_col": float64_col,
"int8_col": int8_col,
"int16_col": int16_col,
"int32_col": int32_col,
"int64_col": int64_col,
"uint8_col": uint8_col,
"uint16_col": uint16_col,
"uint32_col": uint32_col,
},
columns=[
"bool_col",
"ts_col",
"dt_col",
"float32_col",
"float64_col",
"int8_col",
"int16_col",
"int32_col",
"int64_col",
"uint8_col",
"uint16_col",
"uint32_col",
],
)
Comment thread
plamut marked this conversation as resolved.

dataset_id = _make_dataset_id("bq_load_test")
self.temp_dataset(dataset_id)
table_id = "{}.{}.load_table_from_dataframe_w_automatic_schema".format(
Config.CLIENT.project, dataset_id
)

load_job = Config.CLIENT.load_table_from_dataframe(dataframe, table_id)
load_job.result()

table = Config.CLIENT.get_table(table_id)
self.assertEqual(
tuple(table.schema),
(
bigquery.SchemaField("bool_col", "BOOLEAN"),
bigquery.SchemaField("ts_col", "TIMESTAMP"),
bigquery.SchemaField("dt_col", "DATETIME"),
bigquery.SchemaField("float32_col", "FLOAT"),
bigquery.SchemaField("float64_col", "FLOAT"),
bigquery.SchemaField("int8_col", "INTEGER"),
bigquery.SchemaField("int16_col", "INTEGER"),
bigquery.SchemaField("int32_col", "INTEGER"),
bigquery.SchemaField("int64_col", "INTEGER"),
bigquery.SchemaField("uint8_col", "INTEGER"),
bigquery.SchemaField("uint16_col", "INTEGER"),
bigquery.SchemaField("uint32_col", "INTEGER"),
),
)
self.assertEqual(table.num_rows, 3)

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
def test_load_table_from_dataframe_w_nulls(self):
Expand Down