Skip to content

Commit a671d36

Browse files
plamutemar-kar
authored andcommitted
BigQuery: Add client.insert_rows_from_dataframe() method (googleapis#9162)
* Add client.insert_rows_from_dataframe() method * Avoid using nametuples for dataframe row iteration dataframe.itertuples() returns plain tuples under certain conditions, thus this commit enforces always returning plain tuples, and constructs the row dictionary manually from each tuple. * Skip insert_rows_from_dataframe tests if no Pandas
1 parent 76e1d4a commit a671d36

3 files changed

Lines changed: 245 additions & 0 deletions

File tree

bigquery/google/cloud/bigquery/client.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
"""Client for interacting with the Google BigQuery API."""
1616

1717
from __future__ import absolute_import
18+
from __future__ import division
1819

1920
try:
2021
from collections import abc as collections_abc
@@ -25,7 +26,9 @@
2526
import functools
2627
import gzip
2728
import io
29+
import itertools
2830
import json
31+
import math
2932
import os
3033
import tempfile
3134
import uuid
@@ -2111,6 +2114,57 @@ def insert_rows(self, table, rows, selected_fields=None, **kwargs):
21112114

21122115
return self.insert_rows_json(table, json_rows, **kwargs)
21132116

2117+
def insert_rows_from_dataframe(
2118+
self, table, dataframe, selected_fields=None, chunk_size=500, **kwargs
2119+
):
2120+
"""Insert rows into a table from a dataframe via the streaming API.
2121+
2122+
Args:
2123+
table (Union[ \
2124+
:class:`~google.cloud.bigquery.table.Table`, \
2125+
:class:`~google.cloud.bigquery.table.TableReference`, \
2126+
str, \
2127+
]):
2128+
The destination table for the row data, or a reference to it.
2129+
dataframe (pandas.DataFrame):
2130+
A :class:`~pandas.DataFrame` containing the data to load.
2131+
selected_fields (Sequence[ \
2132+
:class:`~google.cloud.bigquery.schema.SchemaField`, \
2133+
]):
2134+
The fields to return. Required if ``table`` is a
2135+
:class:`~google.cloud.bigquery.table.TableReference`.
2136+
chunk_size (int):
2137+
The number of rows to stream in a single chunk. Must be positive.
2138+
kwargs (dict):
2139+
Keyword arguments to
2140+
:meth:`~google.cloud.bigquery.client.Client.insert_rows_json`.
2141+
2142+
Returns:
2143+
Sequence[Sequence[Mappings]]:
2144+
A list with insert errors for each insert chunk. Each element
2145+
is a list containing one mapping per row with insert errors:
2146+
the "index" key identifies the row, and the "errors" key
2147+
contains a list of the mappings describing one or more problems
2148+
with the row.
2149+
2150+
Raises:
2151+
ValueError: if table's schema is not set
2152+
"""
2153+
insert_results = []
2154+
2155+
chunk_count = int(math.ceil(len(dataframe) / chunk_size))
2156+
rows_iter = (
2157+
dict(six.moves.zip(dataframe.columns, row))
2158+
for row in dataframe.itertuples(index=False, name=None)
2159+
)
2160+
2161+
for _ in range(chunk_count):
2162+
rows_chunk = itertools.islice(rows_iter, chunk_size)
2163+
result = self.insert_rows(table, rows_chunk, selected_fields, **kwargs)
2164+
insert_results.append(result)
2165+
2166+
return insert_results
2167+
21142168
def insert_rows_json(
21152169
self,
21162170
table,

bigquery/tests/system.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1951,6 +1951,73 @@ def test_query_results_to_dataframe_w_bqstorage(self):
19511951
if not row[col] is None:
19521952
self.assertIsInstance(row[col], exp_datatypes[col])
19531953

1954+
@unittest.skipIf(pandas is None, "Requires `pandas`")
1955+
def test_insert_rows_from_dataframe(self):
1956+
SF = bigquery.SchemaField
1957+
schema = [
1958+
SF("float_col", "FLOAT", mode="REQUIRED"),
1959+
SF("int_col", "INTEGER", mode="REQUIRED"),
1960+
SF("bool_col", "BOOLEAN", mode="REQUIRED"),
1961+
SF("string_col", "STRING", mode="NULLABLE"),
1962+
]
1963+
1964+
dataframe = pandas.DataFrame(
1965+
[
1966+
{
1967+
"float_col": 1.11,
1968+
"bool_col": True,
1969+
"string_col": "my string",
1970+
"int_col": 10,
1971+
},
1972+
{
1973+
"float_col": 2.22,
1974+
"bool_col": False,
1975+
"string_col": "another string",
1976+
"int_col": 20,
1977+
},
1978+
{
1979+
"float_col": 3.33,
1980+
"bool_col": False,
1981+
"string_col": "another string",
1982+
"int_col": 30,
1983+
},
1984+
{
1985+
"float_col": 4.44,
1986+
"bool_col": True,
1987+
"string_col": "another string",
1988+
"int_col": 40,
1989+
},
1990+
{
1991+
"float_col": 5.55,
1992+
"bool_col": False,
1993+
"string_col": "another string",
1994+
"int_col": 50,
1995+
},
1996+
]
1997+
)
1998+
1999+
table_id = "test_table"
2000+
dataset = self.temp_dataset(_make_dataset_id("issue_7553"))
2001+
table_arg = Table(dataset.table(table_id), schema=schema)
2002+
table = retry_403(Config.CLIENT.create_table)(table_arg)
2003+
self.to_delete.insert(0, table)
2004+
2005+
Config.CLIENT.insert_rows_from_dataframe(table, dataframe, chunk_size=3)
2006+
2007+
retry = RetryResult(_has_rows, max_tries=8)
2008+
rows = retry(self._fetch_single_page)(table)
2009+
2010+
sorted_rows = sorted(rows, key=operator.attrgetter("int_col"))
2011+
row_tuples = [r.values() for r in sorted_rows]
2012+
expected = [tuple(data_row) for data_row in dataframe.itertuples(index=False)]
2013+
2014+
assert len(row_tuples) == len(expected)
2015+
2016+
for row, expected_row in zip(row_tuples, expected):
2017+
six.assertCountEqual(
2018+
self, row, expected_row
2019+
) # column order does not matter
2020+
19542021
def test_insert_rows_nested_nested(self):
19552022
# See #2951
19562023
SF = bigquery.SchemaField

bigquery/tests/unit/test_client.py

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4473,6 +4473,130 @@ def test_insert_rows_w_numeric(self):
44734473
data=sent,
44744474
)
44754475

4476+
@unittest.skipIf(pandas is None, "Requires `pandas`")
4477+
def test_insert_rows_from_dataframe(self):
4478+
from google.cloud.bigquery.table import SchemaField
4479+
from google.cloud.bigquery.table import Table
4480+
4481+
API_PATH = "/projects/{}/datasets/{}/tables/{}/insertAll".format(
4482+
self.PROJECT, self.DS_ID, self.TABLE_REF.table_id
4483+
)
4484+
4485+
dataframe = pandas.DataFrame(
4486+
[
4487+
{"name": u"Little One", "age": 10, "adult": False},
4488+
{"name": u"Young Gun", "age": 20, "adult": True},
4489+
{"name": u"Dad", "age": 30, "adult": True},
4490+
{"name": u"Stranger", "age": 40, "adult": True},
4491+
]
4492+
)
4493+
4494+
# create client
4495+
creds = _make_credentials()
4496+
http = object()
4497+
client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)
4498+
conn = client._connection = make_connection({}, {})
4499+
4500+
# create table
4501+
schema = [
4502+
SchemaField("name", "STRING", mode="REQUIRED"),
4503+
SchemaField("age", "INTEGER", mode="REQUIRED"),
4504+
SchemaField("adult", "BOOLEAN", mode="REQUIRED"),
4505+
]
4506+
table = Table(self.TABLE_REF, schema=schema)
4507+
4508+
with mock.patch("uuid.uuid4", side_effect=map(str, range(len(dataframe)))):
4509+
error_info = client.insert_rows_from_dataframe(
4510+
table, dataframe, chunk_size=3
4511+
)
4512+
4513+
self.assertEqual(len(error_info), 2)
4514+
for chunk_errors in error_info:
4515+
assert chunk_errors == []
4516+
4517+
EXPECTED_SENT_DATA = [
4518+
{
4519+
"rows": [
4520+
{
4521+
"insertId": "0",
4522+
"json": {"name": "Little One", "age": "10", "adult": "false"},
4523+
},
4524+
{
4525+
"insertId": "1",
4526+
"json": {"name": "Young Gun", "age": "20", "adult": "true"},
4527+
},
4528+
{
4529+
"insertId": "2",
4530+
"json": {"name": "Dad", "age": "30", "adult": "true"},
4531+
},
4532+
]
4533+
},
4534+
{
4535+
"rows": [
4536+
{
4537+
"insertId": "3",
4538+
"json": {"name": "Stranger", "age": "40", "adult": "true"},
4539+
}
4540+
]
4541+
},
4542+
]
4543+
4544+
actual_calls = conn.api_request.call_args_list
4545+
4546+
for call, expected_data in six.moves.zip_longest(
4547+
actual_calls, EXPECTED_SENT_DATA
4548+
):
4549+
expected_call = mock.call(method="POST", path=API_PATH, data=expected_data)
4550+
assert call == expected_call
4551+
4552+
@unittest.skipIf(pandas is None, "Requires `pandas`")
4553+
def test_insert_rows_from_dataframe_many_columns(self):
4554+
from google.cloud.bigquery.table import SchemaField
4555+
from google.cloud.bigquery.table import Table
4556+
4557+
API_PATH = "/projects/{}/datasets/{}/tables/{}/insertAll".format(
4558+
self.PROJECT, self.DS_ID, self.TABLE_REF.table_id
4559+
)
4560+
N_COLUMNS = 256 # should be >= 256
4561+
4562+
dataframe = pandas.DataFrame(
4563+
[{"foo_{}".format(i): "bar_{}".format(i) for i in range(N_COLUMNS)}]
4564+
)
4565+
4566+
# create client
4567+
creds = _make_credentials()
4568+
http = object()
4569+
client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)
4570+
conn = client._connection = make_connection({}, {})
4571+
4572+
# create table
4573+
schema = [SchemaField("foo_{}".format(i), "STRING") for i in range(N_COLUMNS)]
4574+
table = Table(self.TABLE_REF, schema=schema)
4575+
4576+
with mock.patch("uuid.uuid4", side_effect=map(str, range(len(dataframe)))):
4577+
error_info = client.insert_rows_from_dataframe(
4578+
table, dataframe, chunk_size=3
4579+
)
4580+
4581+
assert len(error_info) == 1
4582+
assert error_info[0] == []
4583+
4584+
EXPECTED_SENT_DATA = {
4585+
"rows": [
4586+
{
4587+
"insertId": "0",
4588+
"json": {
4589+
"foo_{}".format(i): "bar_{}".format(i) for i in range(N_COLUMNS)
4590+
},
4591+
}
4592+
]
4593+
}
4594+
expected_call = mock.call(method="POST", path=API_PATH, data=EXPECTED_SENT_DATA)
4595+
4596+
actual_calls = conn.api_request.call_args_list
4597+
assert len(actual_calls) == 1
4598+
assert actual_calls[0] == expected_call
4599+
44764600
def test_insert_rows_json(self):
44774601
from google.cloud.bigquery.table import Table, SchemaField
44784602
from google.cloud.bigquery.dataset import DatasetReference

0 commit comments

Comments
 (0)