[Bug] Ignore duplicate entries in cache#997
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughTrack duplicate Futures for identical task keys through file-based scheduler refresh and shutdown; set results/exceptions on duplicates when output appears. Wrap HDF5 dataset creation in contextlib.suppress(ValueError). Add tests that submit identical tasks twice and assert both Futures complete and only one cache artifact is produced. ChangesDuplicate Future handling, HDF5 write guard, and tests
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/executorlib/standalone/hdf.py`:
- Around line 36-37: In dump(), the code accesses group_dict[data_key] before
checking membership, which raises KeyError for unknown keys; change the logic to
first test "if data_key in group_dict" and only then compute path = "/" +
group_dict[data_key] (or otherwise use group_dict.get(data_key) with a guard) so
the membership check protects the access to group_dict; locate the condition
using the identifiers group_dict, data_key, fname and update the order so the
membership check precedes any subscript access.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 096b5236-3136-49cd-903e-685efd411d9f
📒 Files selected for processing (1)
src/executorlib/standalone/hdf.py
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #997 +/- ##
==========================================
+ Coverage 94.16% 94.19% +0.03%
==========================================
Files 39 39
Lines 2090 2103 +13
==========================================
+ Hits 1968 1981 +13
Misses 122 122 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
🧹 Nitpick comments (2)
tests/unit/executor/test_single_cache.py (2)
78-83: ⚡ Quick winTest doesn't verify caching behavior, only that results match.
The test submits the same function twice and checks results are equal, but doesn't verify the second submission actually used the cache. Consider strengthening by:
- Checking that only one output HDF5 file (
task_key + "_o.h5") is created in the cache directory, or- Using
get_cache_data(cache_directory)to verify cache contents, or- Timing both executions to confirm the second is significantly faster (cache hit vs execution).
As written, the test would pass even if both submissions executed independently.
💡 Example: Verify cache file count
def test_cache_duplicate_function(self): cache_directory = os.path.abspath("cache_duplicate") with SingleNodeExecutor(hostname_localhost=True, cache_directory=cache_directory) as exe: f1 = exe.submit(sum_with_wait, 1, 1) f2 = exe.submit(sum_with_wait, 1, 1) self.assertEqual(f1.result(), f2.result()) + + cache_lst = get_cache_data(cache_directory=cache_directory) + self.assertEqual(len(cache_lst), 1, "Should only have one cache entry for duplicate submissions")🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/unit/executor/test_single_cache.py` around lines 78 - 83, Update test_cache_duplicate_function to assert that the second submit used the cache rather than just matching results: after submitting via SingleNodeExecutor and awaiting results from f1 and f2, inspect the cache_directory to ensure only one output file (task_key + "_o.h5") exists (or call get_cache_data(cache_directory) and assert a single cache entry), or alternatively record timing for f1 and f2 and assert f2 is significantly faster; reference the test function name test_cache_duplicate_function and the executor SingleNodeExecutor when locating the test to add the cache-file or get_cache_data assertions or the timing check.
21-24: 💤 Low valueConsider verifying execution time or documenting the sleep purpose.
The
sleepcall adds delay but the test doesn't verify timing differences between cached vs non-cached execution. Either:
- Add timing assertions to confirm the second submission uses cache (should be much faster), or
- Add a comment explaining why the sleep is necessary for cache coordination.
Without timing verification, it's unclear whether the sleep is essential or just adds test overhead.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/unit/executor/test_single_cache.py` around lines 21 - 24, The test's artificial delay in function sum_with_wait is undeclared and unverified; either (A) add timing assertions around the two executor submissions that call sum_with_wait (use time.monotonic or time.perf_counter) and assert the second call is significantly faster than the first (e.g., second < first * 0.5 or an absolute threshold) to prove caching, or (B) if you intentionally keep the sleep only to allow cache coordination, add a concise comment above sum_with_wait explaining the sleep's purpose and why timing assertions are omitted; reference the sum_with_wait function when implementing the timing measurement or adding the comment.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@tests/unit/executor/test_single_cache.py`:
- Around line 78-83: Update test_cache_duplicate_function to assert that the
second submit used the cache rather than just matching results: after submitting
via SingleNodeExecutor and awaiting results from f1 and f2, inspect the
cache_directory to ensure only one output file (task_key + "_o.h5") exists (or
call get_cache_data(cache_directory) and assert a single cache entry), or
alternatively record timing for f1 and f2 and assert f2 is significantly faster;
reference the test function name test_cache_duplicate_function and the executor
SingleNodeExecutor when locating the test to add the cache-file or
get_cache_data assertions or the timing check.
- Around line 21-24: The test's artificial delay in function sum_with_wait is
undeclared and unverified; either (A) add timing assertions around the two
executor submissions that call sum_with_wait (use time.monotonic or
time.perf_counter) and assert the second call is significantly faster than the
first (e.g., second < first * 0.5 or an absolute threshold) to prove caching, or
(B) if you intentionally keep the sleep only to allow cache coordination, add a
concise comment above sum_with_wait explaining the sleep's purpose and why
timing assertions are omitted; reference the sum_with_wait function when
implementing the timing measurement or adding the comment.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: bb432d07-d6ac-4390-a826-9a76a3d193ac
📒 Files selected for processing (1)
tests/unit/executor/test_single_cache.py
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/executorlib/task_scheduler/file/shared.py (1)
449-497:⚠️ Potential issue | 🟠 Major | ⚡ Quick winShutdown cancel doesn’t touch duplicate futures (real hang risk).
In
src/executorlib/task_scheduler/file/shared.py, identical submissions are tracked separately: the first future goes inmemory_dict, while additional futures go intoduplicate_dict[task_key](lines ~181-183)._shutdown_executor()only cancels/refreshesmemory_dict—it never cancels or resolves anything induplicate_dict. Aftercancel_futures=True, primaries inmemory_dictbecomedone(), and_refresh_memory_dict()filters them out viaif not value.done()so_check_task_output()never runs for those keys, meaningduplicate_dictentries are left pending.
tests/unit/executor/test_api.pycovers duplicate completion (test_duplicate_futures) and shutdown cancellation (test_shutdown_*), but there’s no test exercising duplicates during shutdown cancel paths, so this behavior isn’t currently guarded.🔒 Proposed fix — cancel duplicate futures too
elif wait and cancel_futures: for value in memory_dict.values(): if not value.done(): value.cancel() + for duplicate_lst in duplicate_dict.values(): + _cancel_futures(future_dict={i: f for i, f in enumerate(duplicate_lst)}) while len(memory_dict) > 0: memory_dict = _refresh_memory_dict( ... ) elif cancel_futures: # wait is False _cancel_processes( ... ) _cancel_futures(future_dict=memory_dict) + for duplicate_lst in duplicate_dict.values(): + _cancel_futures(future_dict={i: f for i, f in enumerate(duplicate_lst)}) else: # wait is False and cancel_futures is False memory_dict = _refresh_memory_dict(...) _cancel_futures(future_dict=memory_dict) + for duplicate_lst in duplicate_dict.values(): + _cancel_futures(future_dict={i: f for i, f in enumerate(duplicate_lst)})🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/executorlib/task_scheduler/file/shared.py` around lines 449 - 497, _shutdown_executor currently only cancels/refreshes futures in memory_dict and leaves duplicate_dict entries untouched, causing duplicate futures to remain pending during cancel_futures paths; modify the shutdown logic (the branch handling cancel_futures and the final else where _cancel_futures is called) to also cancel and/or mark all futures stored in duplicate_dict: iterate duplicate_dict.values() (or flatten all lists/sets of futures it contains) and call the same cancellation/cleanup helper used for primaries (e.g., reuse _cancel_futures or call future.cancel() on each), and ensure duplicate entries are removed/filtered so _refresh_memory_dict and _check_task_output cannot leave them hanging (reference duplicate_dict, memory_dict, _cancel_futures, _refresh_memory_dict, and _cancel_processes).
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/executorlib/task_scheduler/file/shared.py`:
- Around line 221-227: The duplicate futures in the duplicate_dict loop must be
guarded against already-done or cancelled futures to avoid InvalidStateError in
_check_task_output; update the block that iterates duplicate_dict[task_key] to
check duplicate_future.done() (or duplicate_future.cancelled()) and only call
duplicate_future.set_result(result) or duplicate_future.set_exception(result)
when not done, skipping any finished/cancelled futures (and still delete
duplicate_dict[task_key] after processing). Ensure you reference duplicate_dict,
duplicate_future and the surrounding _check_task_output logic so the change
replaces the direct set_result/set_exception calls with a guarded conditional
(or minimal try/except that ignores InvalidStateError) to prevent the scheduler
thread from crashing.
---
Outside diff comments:
In `@src/executorlib/task_scheduler/file/shared.py`:
- Around line 449-497: _shutdown_executor currently only cancels/refreshes
futures in memory_dict and leaves duplicate_dict entries untouched, causing
duplicate futures to remain pending during cancel_futures paths; modify the
shutdown logic (the branch handling cancel_futures and the final else where
_cancel_futures is called) to also cancel and/or mark all futures stored in
duplicate_dict: iterate duplicate_dict.values() (or flatten all lists/sets of
futures it contains) and call the same cancellation/cleanup helper used for
primaries (e.g., reuse _cancel_futures or call future.cancel() on each), and
ensure duplicate entries are removed/filtered so _refresh_memory_dict and
_check_task_output cannot leave them hanging (reference duplicate_dict,
memory_dict, _cancel_futures, _refresh_memory_dict, and _cancel_processes).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: d1fcc6dc-d65c-4785-8f30-0d815a1e04d0
📒 Files selected for processing (2)
src/executorlib/task_scheduler/file/shared.pytests/unit/executor/test_api.py
| if task_key in duplicate_dict: | ||
| for duplicate_future in duplicate_dict[task_key]: | ||
| if exec_flag and no_error_flag: | ||
| duplicate_future.set_result(result) | ||
| elif exec_flag: | ||
| duplicate_future.set_exception(result) | ||
| del duplicate_dict[task_key] |
There was a problem hiding this comment.
Guard duplicate futures against already-resolved/cancelled state.
The primary future_obj is only resolved for non-done futures (the caller filters with if not value.done() at Line 337), but duplicate futures get no such guard. In this file-based scheduler the futures stay PENDING until a result is set, so a caller can successfully cancel() a duplicate future while it is still tracked in duplicate_dict. When the output then appears, set_result/set_exception on that cancelled (or otherwise done) duplicate raises InvalidStateError, which propagates out of _check_task_output and kills the scheduler thread.
🛡️ Proposed guard
if task_key in duplicate_dict:
for duplicate_future in duplicate_dict[task_key]:
+ if duplicate_future.done():
+ continue
if exec_flag and no_error_flag:
duplicate_future.set_result(result)
elif exec_flag:
duplicate_future.set_exception(result)
del duplicate_dict[task_key]🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/executorlib/task_scheduler/file/shared.py` around lines 221 - 227, The
duplicate futures in the duplicate_dict loop must be guarded against
already-done or cancelled futures to avoid InvalidStateError in
_check_task_output; update the block that iterates duplicate_dict[task_key] to
check duplicate_future.done() (or duplicate_future.cancelled()) and only call
duplicate_future.set_result(result) or duplicate_future.set_exception(result)
when not done, skipping any finished/cancelled futures (and still delete
duplicate_dict[task_key] after processing). Ensure you reference duplicate_dict,
duplicate_future and the surrounding _check_task_output logic so the change
replaces the direct set_result/set_exception calls with a guarded conditional
(or minimal try/except that ignores InvalidStateError) to prevent the scheduler
thread from crashing.
Summary by CodeRabbit
Bug Fixes
Tests