@@ -1809,6 +1809,43 @@ def test_to_arrow_w_empty_table(self):
18091809 self .assertEqual (child_field .type .value_type [0 ].name , "name" )
18101810 self .assertEqual (child_field .type .value_type [1 ].name , "age" )
18111811
1812+ @unittest .skipIf (pandas is None , "Requires `pandas`" )
1813+ def test_to_arrow_max_results_w_create_bqstorage_warning (self ):
1814+ from google .cloud .bigquery .schema import SchemaField
1815+
1816+ schema = [
1817+ SchemaField ("name" , "STRING" , mode = "REQUIRED" ),
1818+ SchemaField ("age" , "INTEGER" , mode = "REQUIRED" ),
1819+ ]
1820+ rows = [
1821+ {"f" : [{"v" : "Phred Phlyntstone" }, {"v" : "32" }]},
1822+ {"f" : [{"v" : "Bharney Rhubble" }, {"v" : "33" }]},
1823+ ]
1824+ path = "/foo"
1825+ api_request = mock .Mock (return_value = {"rows" : rows })
1826+ mock_client = _mock_client ()
1827+
1828+ row_iterator = self ._make_one (
1829+ client = mock_client ,
1830+ api_request = api_request ,
1831+ path = path ,
1832+ schema = schema ,
1833+ max_results = 42 ,
1834+ )
1835+
1836+ with warnings .catch_warnings (record = True ) as warned :
1837+ row_iterator .to_arrow (create_bqstorage_client = True )
1838+
1839+ matches = [
1840+ warning
1841+ for warning in warned
1842+ if warning .category is UserWarning
1843+ and "cannot use bqstorage_client" in str (warning ).lower ()
1844+ and "tabledata.list" in str (warning )
1845+ ]
1846+ self .assertEqual (len (matches ), 1 , msg = "User warning was not emitted." )
1847+ mock_client ._create_bqstorage_client .assert_not_called ()
1848+
18121849 @unittest .skipIf (pyarrow is None , "Requires `pyarrow`" )
18131850 @unittest .skipIf (
18141851 bigquery_storage_v1beta1 is None , "Requires `google-cloud-bigquery-storage`"
@@ -1856,7 +1893,7 @@ def test_to_arrow_w_bqstorage(self):
18561893
18571894 mock_page = mock .create_autospec (reader .ReadRowsPage )
18581895 mock_page .to_arrow .return_value = pyarrow .RecordBatch .from_arrays (
1859- page_items , arrow_schema
1896+ page_items , schema = arrow_schema
18601897 )
18611898 mock_pages = (mock_page , mock_page , mock_page )
18621899 type(mock_rows ).pages = mock .PropertyMock (return_value = mock_pages )
@@ -2216,9 +2253,9 @@ def test_to_dataframe_w_various_types_nullable(self):
22162253 ]
22172254 row_data = [
22182255 [None , None , None , None , None , None ],
2219- ["1.4338368E9" , "420" , "1.1" , "Cash" , "true" , "1999-12-01" ],
2220- ["1.3878117E9" , "2580" , "17.7" , "Cash" , "false" , "1953-06-14" ],
2221- ["1.3855653E9" , "2280" , "4.4" , "Credit" , "true" , "1981-11-04" ],
2256+ ["1.4338368E9" , "420" , "1.1" , u "Cash" , "true" , "1999-12-01" ],
2257+ ["1.3878117E9" , "2580" , "17.7" , u "Cash" , "false" , "1953-06-14" ],
2258+ ["1.3855653E9" , "2280" , "4.4" , u "Credit" , "true" , "1981-11-04" ],
22222259 ]
22232260 rows = [{"f" : [{"v" : field } for field in row ]} for row in row_data ]
22242261 path = "/foo"
@@ -2238,7 +2275,7 @@ def test_to_dataframe_w_various_types_nullable(self):
22382275 else :
22392276 self .assertIsInstance (row .start_timestamp , pandas .Timestamp )
22402277 self .assertIsInstance (row .seconds , float )
2241- self .assertIsInstance (row .payment_type , str )
2278+ self .assertIsInstance (row .payment_type , six . string_types )
22422279 self .assertIsInstance (row .complete , bool )
22432280 self .assertIsInstance (row .date , datetime .date )
22442281
@@ -2256,9 +2293,9 @@ def test_to_dataframe_column_dtypes(self):
22562293 SchemaField ("date" , "DATE" ),
22572294 ]
22582295 row_data = [
2259- ["1.4338368E9" , "420" , "1.1" , "1.77" , "Cash" , "true" , "1999-12-01" ],
2260- ["1.3878117E9" , "2580" , "17.7" , "28.5" , "Cash" , "false" , "1953-06-14" ],
2261- ["1.3855653E9" , "2280" , "4.4" , "7.1" , "Credit" , "true" , "1981-11-04" ],
2296+ ["1.4338368E9" , "420" , "1.1" , "1.77" , u "Cash" , "true" , "1999-12-01" ],
2297+ ["1.3878117E9" , "2580" , "17.7" , "28.5" , u "Cash" , "false" , "1953-06-14" ],
2298+ ["1.3855653E9" , "2280" , "4.4" , "7.1" , u "Credit" , "true" , "1981-11-04" ],
22622299 ]
22632300 rows = [{"f" : [{"v" : field } for field in row ]} for row in row_data ]
22642301 path = "/foo"
@@ -2424,9 +2461,9 @@ def test_to_dataframe_w_bqstorage_no_streams(self):
24242461 api_request = None ,
24252462 path = None ,
24262463 schema = [
2427- schema .SchemaField ("colA" , "IGNORED " ),
2428- schema .SchemaField ("colC" , "IGNORED " ),
2429- schema .SchemaField ("colB" , "IGNORED " ),
2464+ schema .SchemaField ("colA" , "INTEGER " ),
2465+ schema .SchemaField ("colC" , "FLOAT " ),
2466+ schema .SchemaField ("colB" , "STRING " ),
24302467 ],
24312468 table = mut .TableReference .from_string ("proj.dset.tbl" ),
24322469 )
@@ -2498,10 +2535,11 @@ def test_to_dataframe_w_bqstorage_empty_streams(self):
24982535 mock_pages = mock .PropertyMock (return_value = ())
24992536 type(mock_rows ).pages = mock_pages
25002537
2538+ # Schema is required when there are no record batches in the stream.
25012539 schema = [
2502- schema .SchemaField ("colA" , "IGNORED " ),
2503- schema .SchemaField ("colC" , "IGNORED " ),
2504- schema .SchemaField ("colB" , "IGNORED " ),
2540+ schema .SchemaField ("colA" , "INTEGER " ),
2541+ schema .SchemaField ("colC" , "FLOAT " ),
2542+ schema .SchemaField ("colB" , "STRING " ),
25052543 ]
25062544
25072545 row_iterator = mut .RowIterator (
@@ -2560,14 +2598,15 @@ def test_to_dataframe_w_bqstorage_nonempty(self):
25602598 mock_rows = mock .create_autospec (reader .ReadRowsIterable )
25612599 mock_rowstream .rows .return_value = mock_rows
25622600 page_items = [
2563- {"colA" : 1 , "colB" : "abc" , "colC" : 2.0 },
2564- {"colA" : - 1 , "colB" : "def" , "colC" : 4.0 },
2601+ pyarrow .array ([1 , - 1 ]),
2602+ pyarrow .array ([2.0 , 4.0 ]),
2603+ pyarrow .array (["abc" , "def" ]),
25652604 ]
2566-
2567- mock_page = mock .create_autospec (reader .ReadRowsPage )
2568- mock_page .to_dataframe .return_value = pandas .DataFrame (
2569- page_items , columns = ["colA" , "colB" , "colC" ]
2605+ page_record_batch = pyarrow .RecordBatch .from_arrays (
2606+ page_items , schema = arrow_schema
25702607 )
2608+ mock_page = mock .create_autospec (reader .ReadRowsPage )
2609+ mock_page .to_arrow .return_value = page_record_batch
25712610 mock_pages = (mock_page , mock_page , mock_page )
25722611 type(mock_rows ).pages = mock .PropertyMock (return_value = mock_pages )
25732612
@@ -2594,7 +2633,7 @@ def test_to_dataframe_w_bqstorage_nonempty(self):
25942633
25952634 # Have expected number of rows?
25962635 total_pages = len (streams ) * len (mock_pages )
2597- total_rows = len (page_items ) * total_pages
2636+ total_rows = len (page_items [ 0 ] ) * total_pages
25982637 self .assertEqual (len (got .index ), total_rows )
25992638
26002639 # Don't close the client if it was passed in.
@@ -2633,11 +2672,14 @@ def test_to_dataframe_w_bqstorage_multiple_streams_return_unique_index(self):
26332672 mock_rows = mock .create_autospec (reader .ReadRowsIterable )
26342673 mock_rowstream .rows .return_value = mock_rows
26352674
2636- page_data_frame = pandas .DataFrame (
2637- [{"colA" : 1 }, {"colA" : - 1 }], columns = ["colA" ]
2675+ page_items = [
2676+ pyarrow .array ([1 , - 1 ]),
2677+ ]
2678+ page_record_batch = pyarrow .RecordBatch .from_arrays (
2679+ page_items , schema = arrow_schema
26382680 )
26392681 mock_page = mock .create_autospec (reader .ReadRowsPage )
2640- mock_page .to_dataframe .return_value = page_data_frame
2682+ mock_page .to_arrow .return_value = page_record_batch
26412683 mock_pages = (mock_page , mock_page , mock_page )
26422684 type(mock_rows ).pages = mock .PropertyMock (return_value = mock_pages )
26432685
@@ -2649,7 +2691,7 @@ def test_to_dataframe_w_bqstorage_multiple_streams_return_unique_index(self):
26492691
26502692 self .assertEqual (list (got ), ["colA" ])
26512693 total_pages = len (streams ) * len (mock_pages )
2652- total_rows = len (page_data_frame ) * total_pages
2694+ total_rows = len (page_items [ 0 ] ) * total_pages
26532695 self .assertEqual (len (got .index ), total_rows )
26542696 self .assertTrue (got .index .is_unique )
26552697
@@ -2695,14 +2737,15 @@ def test_to_dataframe_w_bqstorage_updates_progress_bar(self, tqdm_mock):
26952737 page_items = [- 1 , 0 , 1 ]
26962738 type(mock_page ).num_items = mock .PropertyMock (return_value = len (page_items ))
26972739
2698- def blocking_to_dataframe (* args , ** kwargs ):
2699- # Sleep for longer than the waiting interval. This ensures the
2700- # progress_queue gets written to more than once because it gives
2701- # the worker->progress updater time to sum intermediate updates.
2740+ def blocking_to_arrow (* args , ** kwargs ):
2741+ # Sleep for longer than the waiting interval so that we know we're
2742+ # only reading one page per loop at most.
27022743 time .sleep (2 * mut ._PROGRESS_INTERVAL )
2703- return pandas .DataFrame ({"testcol" : page_items })
2744+ return pyarrow .RecordBatch .from_arrays (
2745+ [pyarrow .array (page_items )], schema = arrow_schema
2746+ )
27042747
2705- mock_page .to_dataframe .side_effect = blocking_to_dataframe
2748+ mock_page .to_arrow .side_effect = blocking_to_arrow
27062749 mock_pages = (mock_page , mock_page , mock_page , mock_page , mock_page )
27072750 type(mock_rows ).pages = mock .PropertyMock (return_value = mock_pages )
27082751
@@ -2728,7 +2771,7 @@ def blocking_to_dataframe(*args, **kwargs):
27282771 progress_updates = [
27292772 args [0 ] for args , kwargs in tqdm_mock ().update .call_args_list
27302773 ]
2731- # Should have sent >1 update due to delay in blocking_to_dataframe .
2774+ # Should have sent >1 update due to delay in blocking_to_arrow .
27322775 self .assertGreater (len (progress_updates ), 1 )
27332776 self .assertEqual (sum (progress_updates ), expected_total_rows )
27342777 tqdm_mock ().close .assert_called_once ()
@@ -2768,18 +2811,20 @@ def test_to_dataframe_w_bqstorage_exits_on_keyboardinterrupt(self):
27682811 arrow_schema = {"serialized_schema" : arrow_schema .serialize ().to_pybytes ()},
27692812 )
27702813 bqstorage_client .create_read_session .return_value = session
2814+ page_items = [
2815+ pyarrow .array ([1 , - 1 ]),
2816+ pyarrow .array ([2.0 , 4.0 ]),
2817+ pyarrow .array (["abc" , "def" ]),
2818+ ]
27712819
2772- def blocking_to_dataframe (* args , ** kwargs ):
2820+ def blocking_to_arrow (* args , ** kwargs ):
27732821 # Sleep for longer than the waiting interval so that we know we're
27742822 # only reading one page per loop at most.
27752823 time .sleep (2 * mut ._PROGRESS_INTERVAL )
2776- return pandas .DataFrame (
2777- {"colA" : [1 , - 1 ], "colB" : ["abc" , "def" ], "colC" : [2.0 , 4.0 ]},
2778- columns = ["colA" , "colB" , "colC" ],
2779- )
2824+ return pyarrow .RecordBatch .from_arrays (page_items , schema = arrow_schema )
27802825
27812826 mock_page = mock .create_autospec (reader .ReadRowsPage )
2782- mock_page .to_dataframe .side_effect = blocking_to_dataframe
2827+ mock_page .to_arrow .side_effect = blocking_to_arrow
27832828 mock_rows = mock .create_autospec (reader .ReadRowsIterable )
27842829 mock_pages = mock .PropertyMock (return_value = (mock_page , mock_page , mock_page ))
27852830 type(mock_rows ).pages = mock_pages
0 commit comments