Skip to content

[Bug] Ignore duplicate entries in cache#997

Merged
jan-janssen merged 14 commits into
mainfrom
duplicate_cache
Jun 3, 2026
Merged

[Bug] Ignore duplicate entries in cache#997
jan-janssen merged 14 commits into
mainfrom
duplicate_cache

Conversation

@jan-janssen
Copy link
Copy Markdown
Member

@jan-janssen jan-janssen commented Jun 3, 2026

Summary by CodeRabbit

  • Bug Fixes

    • Avoids HDF5 write failures from duplicate dataset creation, improving reliability when saving state.
    • Ensures duplicate task submissions share results correctly and are consistently completed/cancelled during executor shutdown.
  • Tests

    • Added tests validating caching/duplicate-task behavior and extended cleanup for the new cache scenario.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Jun 3, 2026

Review Change Stack

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Track duplicate Futures for identical task keys through file-based scheduler refresh and shutdown; set results/exceptions on duplicates when output appears. Wrap HDF5 dataset creation in contextlib.suppress(ValueError). Add tests that submit identical tasks twice and assert both Futures complete and only one cache artifact is produced.

Changes

Duplicate Future handling, HDF5 write guard, and tests

Layer / File(s) Summary
Task scheduler: duplicate Future tracking
src/executorlib/task_scheduler/file/shared.py
Introduce duplicate_dict in execute_tasks_h5, record secondary Futures for the same task_key, pass duplicate_dict into _refresh_memory_dict, _check_task_output, and _shutdown_executor, and set results/exceptions on duplicate Futures when outputs appear.
HDF5 dataset creation guard
src/executorlib/standalone/hdf.py
Add import contextlib and wrap create_dataset(...) for recognized data_key paths in contextlib.suppress(ValueError) to avoid failing when the dataset already exists.
Duplicate-task cache tests
tests/unit/executor/test_single_cache.py, tests/unit/executor/test_api.py
Add sum_with_wait helper and test_cache_duplicate_function to test_single_cache.py; extend tearDown to remove cache_duplicate. Add test_duplicate_futures to test_api.py that submits identical tasks twice and asserts both Futures complete and only a single cache file is created.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Possibly related PRs

Poem

I hop through code with careful cheer,
I hush the ValueError when datasets reappear,
Two matching tasks, one cached delight,
Both futures finish, their outputs right,
I nibble tests and clean the cache tonight. 🐇

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 37.50% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and concisely summarizes the main purpose of the changes: handling duplicate cache entries gracefully rather than failing.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch duplicate_cache

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/executorlib/standalone/hdf.py`:
- Around line 36-37: In dump(), the code accesses group_dict[data_key] before
checking membership, which raises KeyError for unknown keys; change the logic to
first test "if data_key in group_dict" and only then compute path = "/" +
group_dict[data_key] (or otherwise use group_dict.get(data_key) with a guard) so
the membership check protects the access to group_dict; locate the condition
using the identifiers group_dict, data_key, fname and update the order so the
membership check precedes any subscript access.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 096b5236-3136-49cd-903e-685efd411d9f

📥 Commits

Reviewing files that changed from the base of the PR and between a81752a and 92cff67.

📒 Files selected for processing (1)
  • src/executorlib/standalone/hdf.py

Comment thread src/executorlib/standalone/hdf.py Outdated
@codecov
Copy link
Copy Markdown

codecov Bot commented Jun 3, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 94.19%. Comparing base (a81752a) to head (d1701ef).

Additional details and impacted files
@@            Coverage Diff             @@
##             main     #997      +/-   ##
==========================================
+ Coverage   94.16%   94.19%   +0.03%     
==========================================
  Files          39       39              
  Lines        2090     2103      +13     
==========================================
+ Hits         1968     1981      +13     
  Misses        122      122              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (2)
tests/unit/executor/test_single_cache.py (2)

78-83: ⚡ Quick win

Test doesn't verify caching behavior, only that results match.

The test submits the same function twice and checks results are equal, but doesn't verify the second submission actually used the cache. Consider strengthening by:

  1. Checking that only one output HDF5 file (task_key + "_o.h5") is created in the cache directory, or
  2. Using get_cache_data(cache_directory) to verify cache contents, or
  3. Timing both executions to confirm the second is significantly faster (cache hit vs execution).

As written, the test would pass even if both submissions executed independently.

💡 Example: Verify cache file count
     def test_cache_duplicate_function(self):
         cache_directory = os.path.abspath("cache_duplicate")
         with SingleNodeExecutor(hostname_localhost=True, cache_directory=cache_directory) as exe:
             f1 = exe.submit(sum_with_wait, 1, 1)
             f2 = exe.submit(sum_with_wait, 1, 1)
             self.assertEqual(f1.result(), f2.result())
+        
+        cache_lst = get_cache_data(cache_directory=cache_directory)
+        self.assertEqual(len(cache_lst), 1, "Should only have one cache entry for duplicate submissions")
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/unit/executor/test_single_cache.py` around lines 78 - 83, Update
test_cache_duplicate_function to assert that the second submit used the cache
rather than just matching results: after submitting via SingleNodeExecutor and
awaiting results from f1 and f2, inspect the cache_directory to ensure only one
output file (task_key + "_o.h5") exists (or call get_cache_data(cache_directory)
and assert a single cache entry), or alternatively record timing for f1 and f2
and assert f2 is significantly faster; reference the test function name
test_cache_duplicate_function and the executor SingleNodeExecutor when locating
the test to add the cache-file or get_cache_data assertions or the timing check.

21-24: 💤 Low value

Consider verifying execution time or documenting the sleep purpose.

The sleep call adds delay but the test doesn't verify timing differences between cached vs non-cached execution. Either:

  1. Add timing assertions to confirm the second submission uses cache (should be much faster), or
  2. Add a comment explaining why the sleep is necessary for cache coordination.

Without timing verification, it's unclear whether the sleep is essential or just adds test overhead.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/unit/executor/test_single_cache.py` around lines 21 - 24, The test's
artificial delay in function sum_with_wait is undeclared and unverified; either
(A) add timing assertions around the two executor submissions that call
sum_with_wait (use time.monotonic or time.perf_counter) and assert the second
call is significantly faster than the first (e.g., second < first * 0.5 or an
absolute threshold) to prove caching, or (B) if you intentionally keep the sleep
only to allow cache coordination, add a concise comment above sum_with_wait
explaining the sleep's purpose and why timing assertions are omitted; reference
the sum_with_wait function when implementing the timing measurement or adding
the comment.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@tests/unit/executor/test_single_cache.py`:
- Around line 78-83: Update test_cache_duplicate_function to assert that the
second submit used the cache rather than just matching results: after submitting
via SingleNodeExecutor and awaiting results from f1 and f2, inspect the
cache_directory to ensure only one output file (task_key + "_o.h5") exists (or
call get_cache_data(cache_directory) and assert a single cache entry), or
alternatively record timing for f1 and f2 and assert f2 is significantly faster;
reference the test function name test_cache_duplicate_function and the executor
SingleNodeExecutor when locating the test to add the cache-file or
get_cache_data assertions or the timing check.
- Around line 21-24: The test's artificial delay in function sum_with_wait is
undeclared and unverified; either (A) add timing assertions around the two
executor submissions that call sum_with_wait (use time.monotonic or
time.perf_counter) and assert the second call is significantly faster than the
first (e.g., second < first * 0.5 or an absolute threshold) to prove caching, or
(B) if you intentionally keep the sleep only to allow cache coordination, add a
concise comment above sum_with_wait explaining the sleep's purpose and why
timing assertions are omitted; reference the sum_with_wait function when
implementing the timing measurement or adding the comment.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: bb432d07-d6ac-4390-a826-9a76a3d193ac

📥 Commits

Reviewing files that changed from the base of the PR and between 4e16887 and 9d4c0ad.

📒 Files selected for processing (1)
  • tests/unit/executor/test_single_cache.py

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/executorlib/task_scheduler/file/shared.py (1)

449-497: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Shutdown cancel doesn’t touch duplicate futures (real hang risk).

In src/executorlib/task_scheduler/file/shared.py, identical submissions are tracked separately: the first future goes in memory_dict, while additional futures go into duplicate_dict[task_key] (lines ~181-183). _shutdown_executor() only cancels/refreshes memory_dict—it never cancels or resolves anything in duplicate_dict. After cancel_futures=True, primaries in memory_dict become done(), and _refresh_memory_dict() filters them out via if not value.done() so _check_task_output() never runs for those keys, meaning duplicate_dict entries are left pending.

tests/unit/executor/test_api.py covers duplicate completion (test_duplicate_futures) and shutdown cancellation (test_shutdown_*), but there’s no test exercising duplicates during shutdown cancel paths, so this behavior isn’t currently guarded.

🔒 Proposed fix — cancel duplicate futures too
     elif wait and cancel_futures:
         for value in memory_dict.values():
             if not value.done():
                 value.cancel()
+        for duplicate_lst in duplicate_dict.values():
+            _cancel_futures(future_dict={i: f for i, f in enumerate(duplicate_lst)})
         while len(memory_dict) > 0:
             memory_dict = _refresh_memory_dict(
                 ...
             )
     elif cancel_futures:  # wait is False
         _cancel_processes(
             ...
         )
         _cancel_futures(future_dict=memory_dict)
+        for duplicate_lst in duplicate_dict.values():
+            _cancel_futures(future_dict={i: f for i, f in enumerate(duplicate_lst)})
     else:  # wait is False and cancel_futures is False
         memory_dict = _refresh_memory_dict(...)
         _cancel_futures(future_dict=memory_dict)
+        for duplicate_lst in duplicate_dict.values():
+            _cancel_futures(future_dict={i: f for i, f in enumerate(duplicate_lst)})
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/executorlib/task_scheduler/file/shared.py` around lines 449 - 497,
_shutdown_executor currently only cancels/refreshes futures in memory_dict and
leaves duplicate_dict entries untouched, causing duplicate futures to remain
pending during cancel_futures paths; modify the shutdown logic (the branch
handling cancel_futures and the final else where _cancel_futures is called) to
also cancel and/or mark all futures stored in duplicate_dict: iterate
duplicate_dict.values() (or flatten all lists/sets of futures it contains) and
call the same cancellation/cleanup helper used for primaries (e.g., reuse
_cancel_futures or call future.cancel() on each), and ensure duplicate entries
are removed/filtered so _refresh_memory_dict and _check_task_output cannot leave
them hanging (reference duplicate_dict, memory_dict, _cancel_futures,
_refresh_memory_dict, and _cancel_processes).
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/executorlib/task_scheduler/file/shared.py`:
- Around line 221-227: The duplicate futures in the duplicate_dict loop must be
guarded against already-done or cancelled futures to avoid InvalidStateError in
_check_task_output; update the block that iterates duplicate_dict[task_key] to
check duplicate_future.done() (or duplicate_future.cancelled()) and only call
duplicate_future.set_result(result) or duplicate_future.set_exception(result)
when not done, skipping any finished/cancelled futures (and still delete
duplicate_dict[task_key] after processing). Ensure you reference duplicate_dict,
duplicate_future and the surrounding _check_task_output logic so the change
replaces the direct set_result/set_exception calls with a guarded conditional
(or minimal try/except that ignores InvalidStateError) to prevent the scheduler
thread from crashing.

---

Outside diff comments:
In `@src/executorlib/task_scheduler/file/shared.py`:
- Around line 449-497: _shutdown_executor currently only cancels/refreshes
futures in memory_dict and leaves duplicate_dict entries untouched, causing
duplicate futures to remain pending during cancel_futures paths; modify the
shutdown logic (the branch handling cancel_futures and the final else where
_cancel_futures is called) to also cancel and/or mark all futures stored in
duplicate_dict: iterate duplicate_dict.values() (or flatten all lists/sets of
futures it contains) and call the same cancellation/cleanup helper used for
primaries (e.g., reuse _cancel_futures or call future.cancel() on each), and
ensure duplicate entries are removed/filtered so _refresh_memory_dict and
_check_task_output cannot leave them hanging (reference duplicate_dict,
memory_dict, _cancel_futures, _refresh_memory_dict, and _cancel_processes).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: d1fcc6dc-d65c-4785-8f30-0d815a1e04d0

📥 Commits

Reviewing files that changed from the base of the PR and between 501f340 and 7fdbe1c.

📒 Files selected for processing (2)
  • src/executorlib/task_scheduler/file/shared.py
  • tests/unit/executor/test_api.py

Comment on lines +221 to +227
if task_key in duplicate_dict:
for duplicate_future in duplicate_dict[task_key]:
if exec_flag and no_error_flag:
duplicate_future.set_result(result)
elif exec_flag:
duplicate_future.set_exception(result)
del duplicate_dict[task_key]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Guard duplicate futures against already-resolved/cancelled state.

The primary future_obj is only resolved for non-done futures (the caller filters with if not value.done() at Line 337), but duplicate futures get no such guard. In this file-based scheduler the futures stay PENDING until a result is set, so a caller can successfully cancel() a duplicate future while it is still tracked in duplicate_dict. When the output then appears, set_result/set_exception on that cancelled (or otherwise done) duplicate raises InvalidStateError, which propagates out of _check_task_output and kills the scheduler thread.

🛡️ Proposed guard
     if task_key in duplicate_dict:
         for duplicate_future in duplicate_dict[task_key]:
+            if duplicate_future.done():
+                continue
             if exec_flag and no_error_flag:
                 duplicate_future.set_result(result)
             elif exec_flag:
                 duplicate_future.set_exception(result)
         del duplicate_dict[task_key]
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/executorlib/task_scheduler/file/shared.py` around lines 221 - 227, The
duplicate futures in the duplicate_dict loop must be guarded against
already-done or cancelled futures to avoid InvalidStateError in
_check_task_output; update the block that iterates duplicate_dict[task_key] to
check duplicate_future.done() (or duplicate_future.cancelled()) and only call
duplicate_future.set_result(result) or duplicate_future.set_exception(result)
when not done, skipping any finished/cancelled futures (and still delete
duplicate_dict[task_key] after processing). Ensure you reference duplicate_dict,
duplicate_future and the surrounding _check_task_output logic so the change
replaces the direct set_result/set_exception calls with a guarded conditional
(or minimal try/except that ignores InvalidStateError) to prevent the scheduler
thread from crashing.

@jan-janssen jan-janssen merged commit 0a7214e into main Jun 3, 2026
89 of 94 checks passed
@jan-janssen jan-janssen deleted the duplicate_cache branch June 3, 2026 09:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants