Skip to content
This repository was archived by the owner on Apr 1, 2026. It is now read-only.

Commit 205b9c5

Browse files
test: Add ReadLocalNode tests
1 parent 1d45646 commit 205b9c5

7 files changed

Lines changed: 377 additions & 2 deletions

File tree

bigframes/core/nodes.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -601,6 +601,10 @@ class ScanList:
601601

602602
items: typing.Tuple[ScanItem, ...]
603603

604+
@classmethod
605+
def from_items(cls, items: Iterable[ScanItem]) -> ScanList:
606+
return cls(tuple(items))
607+
604608
def filter_cols(
605609
self,
606610
ids: AbstractSet[identifiers.ColumnId],
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
from __future__ import annotations
15+
16+
from typing import Optional, Tuple
17+
18+
from google.cloud import bigquery
19+
import google.cloud.bigquery.job as bq_job
20+
import google.cloud.bigquery.table as bq_table
21+
22+
from bigframes.core import compile, nodes
23+
from bigframes.session import executor, semi_executor
24+
import bigframes.session._io.bigquery as bq_io
25+
26+
27+
# used only in testing right now, BigQueryCachingExecutor is the fully featured engine
28+
# simplified, doest not do large >10 gb result queries, error handling, respect global config
29+
# or record metrics
30+
class DirectGbqExecutor(semi_executor.SemiExecutor):
31+
def __init__(self, bqclient: bigquery.Client):
32+
self.bqclient = bqclient
33+
34+
def execute(
35+
self,
36+
plan: nodes.BigFrameNode,
37+
ordered: bool,
38+
peek: Optional[int] = None,
39+
) -> executor.ExecuteResult:
40+
"""Just execute whatever plan as is, without further caching or decomposition."""
41+
# TODO(swast): plumb through the api_name of the user-facing api that
42+
# caused this query.
43+
44+
compiled = compile.compile_sql(
45+
compile.CompileRequest(plan, sort_rows=ordered, peek_count=peek)
46+
)
47+
iterator, query_job = self._run_execute_query(
48+
sql=compiled.sql,
49+
)
50+
51+
return executor.ExecuteResult(
52+
arrow_batches=iterator.to_arrow_iterable(),
53+
schema=plan.schema,
54+
query_job=query_job,
55+
total_rows=iterator.total_rows,
56+
)
57+
58+
def _run_execute_query(
59+
self,
60+
sql: str,
61+
job_config: Optional[bq_job.QueryJobConfig] = None,
62+
) -> Tuple[bq_table.RowIterator, Optional[bigquery.QueryJob]]:
63+
"""
64+
Starts BigQuery query job and waits for results.
65+
"""
66+
return bq_io.start_query_with_client(
67+
self.bqclient,
68+
sql,
69+
job_config=job_config or bq_job.QueryJobConfig(),
70+
project=None,
71+
location=None,
72+
timeout=None,
73+
metrics=None,
74+
query_with_job=False,
75+
)
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
from __future__ import annotations
15+
16+
from typing import Optional, TYPE_CHECKING
17+
18+
from bigframes.core import array_value, bigframe_node, nodes
19+
from bigframes.session import executor, semi_executor
20+
21+
if TYPE_CHECKING:
22+
import polars as pl
23+
24+
25+
_COMPATIBLE_NODES = (
26+
nodes.ReadLocalNode,
27+
nodes.OrderByNode,
28+
nodes.ReversedNode,
29+
nodes.SelectionNode,
30+
nodes.FilterNode, # partial support
31+
nodes.ProjectionNode, # partial support
32+
)
33+
34+
35+
class PolarsExecutor(semi_executor.SemiExecutor):
36+
def __init__(self):
37+
# This will error out if polars is not installed
38+
from bigframes.core.compile.polars import PolarsCompiler
39+
40+
self._compiler = PolarsCompiler()
41+
42+
def execute(
43+
self,
44+
plan: bigframe_node.BigFrameNode,
45+
ordered: bool,
46+
peek: Optional[int] = None,
47+
) -> Optional[executor.ExecuteResult]:
48+
if not self._can_execute(plan):
49+
return None
50+
# Note: Ignoring ordered flag, as just executing totally ordered is fine.
51+
try:
52+
lazy_frame: pl.LazyFrame = self._compiler.compile(
53+
array_value.ArrayValue(plan)
54+
)
55+
except Exception:
56+
return None
57+
if peek is not None:
58+
lazy_frame = lazy_frame.limit(peek)
59+
pa_table = lazy_frame.collect().to_arrow()
60+
return executor.ExecuteResult(
61+
arrow_batches=iter(pa_table.to_batches()),
62+
schema=plan.schema,
63+
total_bytes=pa_table.nbytes,
64+
total_rows=pa_table.num_rows,
65+
)
66+
67+
def _can_execute(self, plan: bigframe_node.BigFrameNode):
68+
return all(isinstance(node, _COMPATIBLE_NODES) for node in plan.unique_nodes())

noxfile.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,8 @@
108108
SYSTEM_TEST_EXTRAS_BY_PYTHON: Dict[str, List[str]] = {
109109
"3.9": ["tests"],
110110
"3.10": ["tests"],
111-
"3.12": ["tests", "scikit-learn"],
112-
"3.13": ["tests"],
111+
"3.12": ["tests", "scikit-learn", "polars"],
112+
"3.13": ["tests", "polars"],
113113
}
114114

115115
LOGGING_NAME_ENV_VAR = "BIGFRAMES_PERFORMANCE_LOG_NAME"
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
import pathlib
15+
from typing import Generator
16+
17+
from google.cloud import bigquery
18+
import pandas as pd
19+
import pytest
20+
21+
import bigframes
22+
from bigframes.core import local_data
23+
from bigframes.session import (
24+
direct_gbq_execution,
25+
local_scan_executor,
26+
polars_executor,
27+
semi_executor,
28+
)
29+
30+
CURRENT_DIR = pathlib.Path(__file__).parent
31+
DATA_DIR = CURRENT_DIR.parent.parent.parent / "data"
32+
33+
pytest.importorskip("polars", reason="polars not installed")
34+
35+
36+
@pytest.fixture(scope="module")
37+
def fake_session() -> Generator[bigframes.Session, None, None]:
38+
import bigframes.core.global_session
39+
40+
# its a "polars session", but we are bypassing session-provided execution
41+
# we just want a minimal placeholder session without expensive setup
42+
from bigframes.testing import polars_session
43+
44+
session = polars_session.TestSession()
45+
with bigframes.core.global_session._GlobalSessionContext(session):
46+
yield session
47+
48+
49+
@pytest.fixture(scope="session", params=["pyarrow", "polars", "bq"])
50+
def engine(request, bigquery_client: bigquery.Client) -> semi_executor.SemiExecutor:
51+
if request.param == "pyarrow":
52+
return local_scan_executor.LocalScanExecutor()
53+
if request.param == "polars":
54+
return polars_executor.PolarsExecutor()
55+
if request.param == "bq":
56+
return direct_gbq_execution.DirectGbqExecutor(bigquery_client)
57+
raise ValueError(f"Unrecognized param: {request.param}")
58+
59+
60+
@pytest.fixture(scope="module")
61+
def managed_data_source(
62+
scalars_pandas_df_index: pd.DataFrame,
63+
) -> local_data.ManagedArrowTable:
64+
return local_data.ManagedArrowTable.from_pandas(scalars_pandas_df_index)
65+
66+
67+
@pytest.fixture(scope="module")
68+
def zero_row_source() -> local_data.ManagedArrowTable:
69+
return local_data.ManagedArrowTable.from_pandas(pd.DataFrame({"a": [], "b": []}))
70+
71+
72+
@pytest.fixture(scope="module")
73+
def nested_data_source(
74+
nested_pandas_df: pd.DataFrame,
75+
) -> local_data.ManagedArrowTable:
76+
return local_data.ManagedArrowTable.from_pandas(nested_pandas_df)
77+
78+
79+
@pytest.fixture(scope="module")
80+
def repeated_data_source(
81+
repeated_pandas_df: pd.DataFrame,
82+
) -> local_data.ManagedArrowTable:
83+
return local_data.ManagedArrowTable.from_pandas(repeated_pandas_df)
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import pytest
16+
17+
import bigframes
18+
from bigframes.core import identifiers, local_data, nodes
19+
from bigframes.session import polars_executor, semi_executor
20+
21+
pytest.importorskip("polars")
22+
23+
24+
REFERENCE_ENGINE = polars_executor.PolarsExecutor()
25+
26+
27+
def ensure_equivalence(
28+
node: nodes.BigFrameNode,
29+
engine1: semi_executor.SemiExecutor,
30+
engine2: semi_executor.SemiExecutor,
31+
):
32+
e1_result = engine1.execute(node, ordered=True)
33+
e2_result = engine2.execute(node, ordered=True)
34+
assert e1_result is not None
35+
assert e2_result is not None
36+
e1_result.to_arrow_table().equals(e2_result.to_arrow_table())
37+
38+
39+
def test_engines_read_local(
40+
fake_session: bigframes.Session,
41+
managed_data_source: local_data.ManagedArrowTable,
42+
engine,
43+
):
44+
scan_list = nodes.ScanList.from_items(
45+
nodes.ScanItem(identifiers.ColumnId(item.column), item.dtype, item.column)
46+
for item in managed_data_source.schema.items
47+
)
48+
local_node = nodes.ReadLocalNode(
49+
managed_data_source, scan_list, fake_session, offsets_col=None
50+
)
51+
ensure_equivalence(local_node, REFERENCE_ENGINE, engine)
52+
53+
54+
def test_engines_read_local_w_offsets(
55+
fake_session: bigframes.Session,
56+
managed_data_source: local_data.ManagedArrowTable,
57+
engine,
58+
):
59+
scan_list = nodes.ScanList.from_items(
60+
nodes.ScanItem(identifiers.ColumnId(item.column), item.dtype, item.column)
61+
for item in managed_data_source.schema.items
62+
)
63+
local_node = nodes.ReadLocalNode(
64+
managed_data_source,
65+
scan_list,
66+
fake_session,
67+
offsets_col=identifiers.ColumnId("offsets"),
68+
)
69+
ensure_equivalence(local_node, REFERENCE_ENGINE, engine)
70+
71+
72+
def test_engines_read_local_w_col_subset(
73+
fake_session: bigframes.Session,
74+
managed_data_source: local_data.ManagedArrowTable,
75+
engine,
76+
):
77+
scan_list = nodes.ScanList.from_items(
78+
nodes.ScanItem(identifiers.ColumnId(item.column), item.dtype, item.column)
79+
for item in managed_data_source.schema.items[::-2]
80+
)
81+
local_node = nodes.ReadLocalNode(
82+
managed_data_source, scan_list, fake_session, offsets_col=None
83+
)
84+
ensure_equivalence(local_node, REFERENCE_ENGINE, engine)
85+
86+
87+
def test_engines_read_local_w_empty_scan_list(
88+
fake_session: bigframes.Session,
89+
managed_data_source: local_data.ManagedArrowTable,
90+
engine,
91+
):
92+
scan_list = nodes.ScanList.from_items([])
93+
local_node = nodes.ReadLocalNode(
94+
managed_data_source, scan_list, fake_session, offsets_col=None
95+
)
96+
ensure_equivalence(local_node, REFERENCE_ENGINE, engine)
97+
98+
99+
def test_engines_read_local_w_zero_row_source(
100+
fake_session: bigframes.Session,
101+
zero_row_source: local_data.ManagedArrowTable,
102+
engine,
103+
):
104+
scan_list = nodes.ScanList.from_items([])
105+
local_node = nodes.ReadLocalNode(
106+
zero_row_source, scan_list, fake_session, offsets_col=None
107+
)
108+
ensure_equivalence(local_node, REFERENCE_ENGINE, engine)
109+
110+
111+
def test_engines_read_local_w_nested_source(
112+
fake_session: bigframes.Session,
113+
nested_data_source: local_data.ManagedArrowTable,
114+
engine,
115+
):
116+
scan_list = nodes.ScanList.from_items([])
117+
local_node = nodes.ReadLocalNode(
118+
nested_data_source, scan_list, fake_session, offsets_col=None
119+
)
120+
ensure_equivalence(local_node, REFERENCE_ENGINE, engine)
121+
122+
123+
def test_engines_read_local_w_repeated_source(
124+
fake_session: bigframes.Session,
125+
repeated_data_source: local_data.ManagedArrowTable,
126+
engine,
127+
):
128+
scan_list = nodes.ScanList.from_items([])
129+
local_node = nodes.ReadLocalNode(
130+
repeated_data_source, scan_list, fake_session, offsets_col=None
131+
)
132+
ensure_equivalence(local_node, REFERENCE_ENGINE, engine)

0 commit comments

Comments
 (0)