fix(rl): add exception handling and cleanup guarantee to actor_cli#3238
fix(rl): add exception handling and cleanup guarantee to actor_cli#3238wadeKeith wants to merge 1 commit intohuggingface:mainfrom
Conversation
Fixes huggingface#3059. When act_with_policy throws an uncaught exception, queue closing and process joining logic was skipped, causing resource leaks (GPU/CPU). Wrapped the main body in try/except/finally to ensure: 1. Exceptions are logged with full traceback for debugging 2. shutdown_event is set on crash to signal child processes 3. Queue close + process join always runs in the finally block 4. Clear exit logging for both normal and abnormal termination
There was a problem hiding this comment.
Pull request overview
Improves robustness/observability of the RL Actor entrypoint (actor_cli) by ensuring exceptions from act_with_policy() are logged and cleanup is attempted via a try/except/finally structure.
Changes:
- Wraps
act_with_policy()intry/except/finallyto log crashes with traceback and run cleanup consistently. - Signals shutdown (
shutdown_event.set()) on exception to help dependent worker loops terminate. - Adds additional lifecycle logs around normal completion and cleanup.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| transitions_queue.close() | ||
| interactions_queue.close() | ||
| parameters_queue.close() | ||
|
|
||
| transitions_process.join() | ||
| logging.info("[ACTOR] Transitions process joined") | ||
| interactions_process.join() | ||
| logging.info("[ACTOR] Interactions process joined") | ||
| receive_policy_process.join() | ||
| logging.info("[ACTOR] Receive policy process joined") | ||
|
|
||
| logging.info("[ACTOR] Cancelling queue join threads") | ||
| transitions_queue.cancel_join_thread() | ||
| interactions_queue.cancel_join_thread() | ||
| parameters_queue.cancel_join_thread() |
There was a problem hiding this comment.
The finally cleanup sequence (queue.close(), process.join(), cancel_join_thread()) can itself raise (e.g., if a queue is already closed or a process is in a bad state). If that happens, it can mask the original exception from act_with_policy(). Consider wrapping individual cleanup steps with try/except (logging failures) or contextlib.suppress(...) so diagnostics from the original crash are preserved.
| transitions_queue.close() | |
| interactions_queue.close() | |
| parameters_queue.close() | |
| transitions_process.join() | |
| logging.info("[ACTOR] Transitions process joined") | |
| interactions_process.join() | |
| logging.info("[ACTOR] Interactions process joined") | |
| receive_policy_process.join() | |
| logging.info("[ACTOR] Receive policy process joined") | |
| logging.info("[ACTOR] Cancelling queue join threads") | |
| transitions_queue.cancel_join_thread() | |
| interactions_queue.cancel_join_thread() | |
| parameters_queue.cancel_join_thread() | |
| try: | |
| transitions_queue.close() | |
| except Exception: | |
| logging.exception("[ACTOR] Failed to close transitions_queue during cleanup") | |
| try: | |
| interactions_queue.close() | |
| except Exception: | |
| logging.exception("[ACTOR] Failed to close interactions_queue during cleanup") | |
| try: | |
| parameters_queue.close() | |
| except Exception: | |
| logging.exception("[ACTOR] Failed to close parameters_queue during cleanup") | |
| try: | |
| transitions_process.join() | |
| logging.info("[ACTOR] Transitions process joined") | |
| except Exception: | |
| logging.exception("[ACTOR] Failed to join transitions process during cleanup") | |
| try: | |
| interactions_process.join() | |
| logging.info("[ACTOR] Interactions process joined") | |
| except Exception: | |
| logging.exception("[ACTOR] Failed to join interactions process during cleanup") | |
| try: | |
| receive_policy_process.join() | |
| logging.info("[ACTOR] Receive policy process joined") | |
| except Exception: | |
| logging.exception("[ACTOR] Failed to join receive policy process during cleanup") | |
| logging.info("[ACTOR] Cancelling queue join threads") | |
| try: | |
| transitions_queue.cancel_join_thread() | |
| except Exception: | |
| logging.exception("[ACTOR] Failed to cancel join thread for transitions_queue during cleanup") | |
| try: | |
| interactions_queue.cancel_join_thread() | |
| except Exception: | |
| logging.exception("[ACTOR] Failed to cancel join thread for interactions_queue during cleanup") | |
| try: | |
| parameters_queue.cancel_join_thread() | |
| except Exception: | |
| logging.exception("[ACTOR] Failed to cancel join thread for parameters_queue during cleanup") |
| logging.info("[ACTOR] Actor process exited") | ||
|
|
||
| logging.info("[ACTOR] queues closed") | ||
|
|
There was a problem hiding this comment.
logging.info("[ACTOR] queues closed") is outside the try/except/finally, so it will never run on the exception path because the exception is re-raised. If you want this log to be emitted regardless of success/failure, move it into the finally block (or adjust the message to reflect that it only logs on normal exit).
| try: | ||
| act_with_policy( | ||
| cfg=cfg, | ||
| shutdown_event=shutdown_event, | ||
| parameters_queue=parameters_queue, | ||
| transitions_queue=transitions_queue, | ||
| interactions_queue=interactions_queue, | ||
| ) | ||
| logging.info("[ACTOR] act_with_policy completed normally") | ||
| except Exception: | ||
| logging.exception("[ACTOR] act_with_policy crashed with an unhandled exception") | ||
| shutdown_event.set() | ||
| raise | ||
| finally: | ||
| logging.info("[ACTOR] Closing queues") | ||
| transitions_queue.close() | ||
| interactions_queue.close() | ||
| parameters_queue.close() | ||
|
|
||
| transitions_process.join() | ||
| logging.info("[ACTOR] Transitions process joined") | ||
| interactions_process.join() | ||
| logging.info("[ACTOR] Interactions process joined") | ||
| receive_policy_process.join() | ||
| logging.info("[ACTOR] Receive policy process joined") |
There was a problem hiding this comment.
After act_with_policy() returns normally, shutdown_event is not set, but send_transitions() / send_interactions() loops run while not shutdown_event.is_set(). In that case the subsequent join() calls can hang, or the children may crash when the queues are closed. Consider setting shutdown_event on the normal-completion path (or unconditionally at the start of finally) before joining so the streaming loops exit cleanly.
|
Friendly ping — just checking in on this PR to see if you have any feedback. Happy to address any questions! 🙏 |
|
Hi @wadeKeith, thank you for making this PR! |
Problem
Fixes #3059.
The
actor_clifunction is the core entry point for the Actor process. Whenact_with_policy()throws an uncaught exception, all subsequent cleanup logic is skipped:transitions_queue.close(),interactions_queue.close(),parameters_queue.close()never execute, leaving GPU/CPU resources consumedFix
Wrapped the
act_with_policy()call and cleanup logic intry/except/finally:try: Runsact_with_policy()and logs normal completionexcept: Logs the exception with full traceback vialogging.exception(), setsshutdown_eventto signal child processes, then re-raisesfinally: Always executes queue close + process join + queue cancel_join_thread, guaranteeing cleanup regardless of howact_with_policy()exitsChanges
src/lerobot/rl/actor.py: Restructuredactor_clicleanup intotry/except/finallyblock