This repository was archived by the owner on Apr 1, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 69
feat: Refactor TableWidget and to_pandas_batches #2250
Merged
Merged
Changes from 7 commits
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
15993a3
feat: Refactor DataFrame.to_pandas_batches and update TableWidget
shuoweil 928f1db
test: fix failed testcase
shuoweil 0c12888
Merge branch 'main' into shuowei-anywidget-index-testcase
shuoweil eff882d
Merge branch 'main' into shuowei-anywidget-index-testcase
shuoweil 4222027
feat: remove unsafe while loop
shuoweil e3f5816
Merge branch 'main' into shuowei-anywidget-index-testcase
shuoweil 3233db4
chore: document why RLock is needed in TableWidget
shuoweil 387246a
fix(display): prevent deadlock in TableWidget page update
shuoweil 7e77932
fix: make TableWidget import-safe when traitlets is missing
shuoweil b303072
fix: refactor test_anywidget imports to avoid NameError when dependen…
shuoweil 5c5616f
test(unit): refine TableWidget deadlock test style
shuoweil e389782
Merge branch 'main' into shuowei-anywidget-index-testcase
shuoweil c312169
test: skip windows test
shuoweil File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -20,7 +20,8 @@ | |||||
| from importlib import resources | ||||||
| import functools | ||||||
| import math | ||||||
| from typing import Any, Dict, Iterator, List, Optional, Type | ||||||
| import threading | ||||||
| from typing import Any, Iterator, Optional | ||||||
| import uuid | ||||||
|
|
||||||
| import pandas as pd | ||||||
|
|
@@ -39,15 +40,15 @@ | |||||
| import anywidget | ||||||
| import traitlets | ||||||
|
|
||||||
| ANYWIDGET_INSTALLED = True | ||||||
| _ANYWIDGET_INSTALLED = True | ||||||
| except Exception: | ||||||
| ANYWIDGET_INSTALLED = False | ||||||
| _ANYWIDGET_INSTALLED = False | ||||||
|
|
||||||
| WIDGET_BASE: Type[Any] | ||||||
| if ANYWIDGET_INSTALLED: | ||||||
| WIDGET_BASE = anywidget.AnyWidget | ||||||
| _WIDGET_BASE: type[Any] | ||||||
| if _ANYWIDGET_INSTALLED: | ||||||
| _WIDGET_BASE = anywidget.AnyWidget | ||||||
| else: | ||||||
| WIDGET_BASE = object | ||||||
| _WIDGET_BASE = object | ||||||
|
|
||||||
|
|
||||||
| @dataclasses.dataclass(frozen=True) | ||||||
|
|
@@ -56,7 +57,7 @@ class _SortState: | |||||
| ascending: bool | ||||||
|
|
||||||
|
|
||||||
| class TableWidget(WIDGET_BASE): | ||||||
| class TableWidget(_WIDGET_BASE): | ||||||
| """An interactive, paginated table widget for BigFrames DataFrames. | ||||||
|
|
||||||
| This widget provides a user-friendly way to display and navigate through | ||||||
|
|
@@ -65,12 +66,8 @@ class TableWidget(WIDGET_BASE): | |||||
|
|
||||||
| page = traitlets.Int(0).tag(sync=True) | ||||||
| page_size = traitlets.Int(0).tag(sync=True) | ||||||
| row_count = traitlets.Union( | ||||||
| [traitlets.Int(), traitlets.Instance(type(None))], | ||||||
| default_value=None, | ||||||
| allow_none=True, | ||||||
| ).tag(sync=True) | ||||||
| table_html = traitlets.Unicode().tag(sync=True) | ||||||
| row_count = traitlets.Int(allow_none=True, default_value=None).tag(sync=True) | ||||||
| table_html = traitlets.Unicode("").tag(sync=True) | ||||||
| sort_column = traitlets.Unicode("").tag(sync=True) | ||||||
| sort_ascending = traitlets.Bool(True).tag(sync=True) | ||||||
| orderable_columns = traitlets.List(traitlets.Unicode(), []).tag(sync=True) | ||||||
|
|
@@ -86,9 +83,10 @@ def __init__(self, dataframe: bigframes.dataframe.DataFrame): | |||||
| Args: | ||||||
| dataframe: The Bigframes Dataframe to display in the widget. | ||||||
| """ | ||||||
| if not ANYWIDGET_INSTALLED: | ||||||
| if not _ANYWIDGET_INSTALLED: | ||||||
| raise ImportError( | ||||||
| "Please `pip install anywidget traitlets` or `pip install 'bigframes[anywidget]'` to use TableWidget." | ||||||
| "Please `pip install anywidget traitlets` or " | ||||||
| "`pip install 'bigframes[anywidget]'` to use TableWidget." | ||||||
| ) | ||||||
|
|
||||||
| self._dataframe = dataframe | ||||||
|
|
@@ -99,15 +97,19 @@ def __init__(self, dataframe: bigframes.dataframe.DataFrame): | |||||
| self._table_id = str(uuid.uuid4()) | ||||||
| self._all_data_loaded = False | ||||||
| self._batch_iter: Optional[Iterator[pd.DataFrame]] = None | ||||||
| self._cached_batches: List[pd.DataFrame] = [] | ||||||
| self._cached_batches: list[pd.DataFrame] = [] | ||||||
| self._last_sort_state: Optional[_SortState] = None | ||||||
| # RLock is needed because _set_table_html can be re-entrant when | ||||||
| # self.page is updated within the method, triggering the observer. | ||||||
| self._setting_html_lock = threading.RLock() | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why use an RLock instead of a lock? Seems to me you only want one thread at a time running setting_html, not multiple.
Suggested change
|
||||||
|
|
||||||
| # respect display options for initial page size | ||||||
| initial_page_size = bigframes.options.display.max_rows | ||||||
|
|
||||||
| # set traitlets properties that trigger observers | ||||||
| # TODO(b/462525985): Investigate and improve TableWidget UX for DataFrames with a large number of columns. | ||||||
| self.page_size = initial_page_size | ||||||
| # TODO(b/469861913): Nested columns from structs (e.g., 'struct_col.name') are not currently sortable. | ||||||
| # TODO(b/463754889): Support non-string column labels for sorting. | ||||||
| if all(isinstance(col, str) for col in dataframe.columns): | ||||||
| self.orderable_columns = [ | ||||||
|
|
@@ -118,13 +120,24 @@ def __init__(self, dataframe: bigframes.dataframe.DataFrame): | |||||
| else: | ||||||
| self.orderable_columns = [] | ||||||
|
|
||||||
| self._initial_load() | ||||||
|
|
||||||
| # Signals to the frontend that the initial data load is complete. | ||||||
| # Also used as a guard to prevent observers from firing during initialization. | ||||||
| self._initial_load_complete = True | ||||||
|
|
||||||
| def _initial_load(self) -> None: | ||||||
| """Get initial data and row count.""" | ||||||
| # obtain the row counts | ||||||
| # TODO(b/428238610): Start iterating over the result of `to_pandas_batches()` | ||||||
| # before we get here so that the count might already be cached. | ||||||
| self._reset_batches_for_new_page_size() | ||||||
|
|
||||||
| if self._batches is None: | ||||||
| self._error_message = "Could not retrieve data batches. Data might be unavailable or an error occurred." | ||||||
| self._error_message = ( | ||||||
| "Could not retrieve data batches. Data might be unavailable or " | ||||||
| "an error occurred." | ||||||
| ) | ||||||
| self.row_count = None | ||||||
| elif self._batches.total_rows is None: | ||||||
| # Total rows is unknown, this is an expected state. | ||||||
|
|
@@ -138,12 +151,8 @@ def __init__(self, dataframe: bigframes.dataframe.DataFrame): | |||||
| # get the initial page | ||||||
| self._set_table_html() | ||||||
|
|
||||||
| # Signals to the frontend that the initial data load is complete. | ||||||
| # Also used as a guard to prevent observers from firing during initialization. | ||||||
| self._initial_load_complete = True | ||||||
|
|
||||||
| @traitlets.observe("_initial_load_complete") | ||||||
| def _on_initial_load_complete(self, change: Dict[str, Any]): | ||||||
| def _on_initial_load_complete(self, change: dict[str, Any]): | ||||||
| if change["new"]: | ||||||
| self._set_table_html() | ||||||
|
|
||||||
|
|
@@ -158,7 +167,7 @@ def _css(self): | |||||
| return resources.read_text(bigframes.display, "table_widget.css") | ||||||
|
|
||||||
| @traitlets.validate("page") | ||||||
| def _validate_page(self, proposal: Dict[str, Any]) -> int: | ||||||
| def _validate_page(self, proposal: dict[str, Any]) -> int: | ||||||
| """Validate and clamp the page number to a valid range. | ||||||
|
|
||||||
| Args: | ||||||
|
|
@@ -191,7 +200,7 @@ def _validate_page(self, proposal: Dict[str, Any]) -> int: | |||||
| return max(0, min(value, max_page)) | ||||||
|
|
||||||
| @traitlets.validate("page_size") | ||||||
| def _validate_page_size(self, proposal: Dict[str, Any]) -> int: | ||||||
| def _validate_page_size(self, proposal: dict[str, Any]) -> int: | ||||||
| """Validate page size to ensure it's positive and reasonable. | ||||||
|
|
||||||
| Args: | ||||||
|
|
@@ -255,95 +264,103 @@ def _reset_batch_cache(self) -> None: | |||||
|
|
||||||
| def _reset_batches_for_new_page_size(self) -> None: | ||||||
| """Reset the batch iterator when page size changes.""" | ||||||
| self._batches = self._dataframe._to_pandas_batches(page_size=self.page_size) | ||||||
| self._batches = self._dataframe.to_pandas_batches(page_size=self.page_size) | ||||||
|
|
||||||
| self._reset_batch_cache() | ||||||
|
|
||||||
| def _set_table_html(self) -> None: | ||||||
| """Sets the current html data based on the current page and page size.""" | ||||||
| if self._error_message: | ||||||
| self.table_html = ( | ||||||
| f"<div class='bigframes-error-message'>{self._error_message}</div>" | ||||||
| ) | ||||||
| return | ||||||
|
|
||||||
| # Apply sorting if a column is selected | ||||||
| df_to_display = self._dataframe | ||||||
| if self.sort_column: | ||||||
| # TODO(b/463715504): Support sorting by index columns. | ||||||
| df_to_display = df_to_display.sort_values( | ||||||
| by=self.sort_column, ascending=self.sort_ascending | ||||||
| with self._setting_html_lock: | ||||||
| if self._error_message: | ||||||
| self.table_html = ( | ||||||
| f"<div class='bigframes-error-message'>" | ||||||
| f"{self._error_message}</div>" | ||||||
| ) | ||||||
| return | ||||||
|
|
||||||
| # Apply sorting if a column is selected | ||||||
| df_to_display = self._dataframe | ||||||
| if self.sort_column: | ||||||
| # TODO(b/463715504): Support sorting by index columns. | ||||||
| df_to_display = df_to_display.sort_values( | ||||||
| by=self.sort_column, ascending=self.sort_ascending | ||||||
| ) | ||||||
|
|
||||||
| # Reset batches when sorting changes | ||||||
| if self._last_sort_state != _SortState( | ||||||
| self.sort_column, self.sort_ascending | ||||||
| ): | ||||||
| self._batches = df_to_display.to_pandas_batches( | ||||||
| page_size=self.page_size | ||||||
| ) | ||||||
| self._reset_batch_cache() | ||||||
| self._last_sort_state = _SortState( | ||||||
| self.sort_column, self.sort_ascending | ||||||
| ) | ||||||
| self.page = 0 # Reset to first page | ||||||
|
|
||||||
| start = self.page * self.page_size | ||||||
| end = start + self.page_size | ||||||
|
|
||||||
| # fetch more data if the requested page is outside our cache | ||||||
| cached_data = self._cached_data | ||||||
| while len(cached_data) < end and not self._all_data_loaded: | ||||||
| if self._get_next_batch(): | ||||||
| cached_data = self._cached_data | ||||||
| else: | ||||||
| break | ||||||
|
|
||||||
| # Get the data for the current page | ||||||
| page_data = cached_data.iloc[start:end].copy() | ||||||
|
|
||||||
| # Handle case where user navigated beyond available data with unknown row count | ||||||
| is_unknown_count = self.row_count is None | ||||||
| is_beyond_data = ( | ||||||
| self._all_data_loaded and len(page_data) == 0 and self.page > 0 | ||||||
| ) | ||||||
|
|
||||||
| # Reset batches when sorting changes | ||||||
| if self._last_sort_state != _SortState(self.sort_column, self.sort_ascending): | ||||||
| self._batches = df_to_display._to_pandas_batches(page_size=self.page_size) | ||||||
| self._reset_batch_cache() | ||||||
| self._last_sort_state = _SortState(self.sort_column, self.sort_ascending) | ||||||
| self.page = 0 # Reset to first page | ||||||
|
|
||||||
| start = self.page * self.page_size | ||||||
| end = start + self.page_size | ||||||
|
|
||||||
| # fetch more data if the requested page is outside our cache | ||||||
| cached_data = self._cached_data | ||||||
| while len(cached_data) < end and not self._all_data_loaded: | ||||||
| if self._get_next_batch(): | ||||||
| cached_data = self._cached_data | ||||||
| else: | ||||||
| break | ||||||
|
|
||||||
| # Get the data for the current page | ||||||
| page_data = cached_data.iloc[start:end].copy() | ||||||
|
|
||||||
| # Handle index display | ||||||
| # TODO(b/438181139): Add tests for custom multiindex | ||||||
| if self._dataframe._block.has_index: | ||||||
| index_name = page_data.index.name | ||||||
| page_data.insert( | ||||||
| 0, index_name if index_name is not None else "", page_data.index | ||||||
| ) | ||||||
| else: | ||||||
| # Default index - include as "Row" column | ||||||
| page_data.insert(0, "Row", range(start + 1, start + len(page_data) + 1)) | ||||||
| # Handle case where user navigated beyond available data with unknown row count | ||||||
| is_unknown_count = self.row_count is None | ||||||
| is_beyond_data = self._all_data_loaded and len(page_data) == 0 and self.page > 0 | ||||||
| if is_unknown_count and is_beyond_data: | ||||||
| # Calculate the last valid page (zero-indexed) | ||||||
| total_rows = len(cached_data) | ||||||
| if total_rows > 0: | ||||||
| if is_unknown_count and is_beyond_data: | ||||||
| # Calculate the last valid page (zero-indexed) | ||||||
| total_rows = len(cached_data) | ||||||
| last_valid_page = max(0, math.ceil(total_rows / self.page_size) - 1) | ||||||
| # Navigate back to the last valid page | ||||||
| # Navigate back to the last valid page. | ||||||
| # This triggers the observer, which will re-enter _set_table_html (allowed by RLock). | ||||||
| self.page = last_valid_page | ||||||
| # Recursively call to display the correct page | ||||||
| return self._set_table_html() | ||||||
| else: | ||||||
| # If no data at all, stay on page 0 with empty display | ||||||
| self.page = 0 | ||||||
| return self._set_table_html() | ||||||
|
|
||||||
| # Generate HTML table | ||||||
| self.table_html = bigframes.display.html.render_html( | ||||||
| dataframe=page_data, | ||||||
| table_id=f"table-{self._table_id}", | ||||||
| orderable_columns=self.orderable_columns, | ||||||
| ) | ||||||
| return | ||||||
|
|
||||||
| # Handle index display | ||||||
| if self._dataframe._block.has_index: | ||||||
| is_unnamed_single_index = ( | ||||||
| page_data.index.name is None | ||||||
| and not isinstance(page_data.index, pd.MultiIndex) | ||||||
| ) | ||||||
| page_data = page_data.reset_index() | ||||||
| if is_unnamed_single_index and "index" in page_data.columns: | ||||||
| page_data.rename(columns={"index": ""}, inplace=True) | ||||||
|
|
||||||
| # Default index - include as "Row" column if no index was present originally | ||||||
| if not self._dataframe._block.has_index: | ||||||
| page_data.insert(0, "Row", range(start + 1, start + len(page_data) + 1)) | ||||||
|
|
||||||
| # Generate HTML table | ||||||
| self.table_html = bigframes.display.html.render_html( | ||||||
| dataframe=page_data, | ||||||
| table_id=f"table-{self._table_id}", | ||||||
| ) | ||||||
|
|
||||||
| @traitlets.observe("sort_column", "sort_ascending") | ||||||
| def _sort_changed(self, _change: Dict[str, Any]): | ||||||
| def _sort_changed(self, _change: dict[str, Any]): | ||||||
| """Handler for when sorting parameters change from the frontend.""" | ||||||
| self._set_table_html() | ||||||
|
|
||||||
| @traitlets.observe("page") | ||||||
| def _page_changed(self, _change: Dict[str, Any]) -> None: | ||||||
| def _page_changed(self, _change: dict[str, Any]) -> None: | ||||||
| """Handler for when the page number is changed from the frontend.""" | ||||||
| if not self._initial_load_complete: | ||||||
| return | ||||||
| self._set_table_html() | ||||||
|
|
||||||
| @traitlets.observe("page_size") | ||||||
| def _page_size_changed(self, _change: Dict[str, Any]) -> None: | ||||||
| def _page_size_changed(self, _change: dict[str, Any]) -> None: | ||||||
| """Handler for when the page size is changed from the frontend.""" | ||||||
| if not self._initial_load_complete: | ||||||
| return | ||||||
|
|
||||||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.