async schedule [2/N]: support async prepare#936
async schedule [2/N]: support async prepare#936Vinkle-hzt wants to merge 2 commits intoalibaba:mainfrom
Conversation
🤖 AI Code Review — PR #936async schedule [2/N]: support async prepare by @Vinkle-hzt 概述本 PR 实现两个功能:(1) MTP speculative decoding 中 attention input 准备异步化(AsyncRunner);(2) vocab pruning 支持(d2t/t2d 映射 + 新 rejection sampling kernel)。 问题🔴 P0(阻塞)
🟡 P1(重要)
🔵 P2(建议)
亮点
结论存在 2 个 P0 阻塞问题(测试编译失败 + metrics 报 0),不建议合入。修复后可重新 review。 |
🤖 AI Code Review — PR #936 (增量 v2)async schedule [2/N]: support async prepare by @Vinkle-hzt 增量 review(1 个新 commit)新 commit 前次 P0 问题状态
结论2 个 P0 阻塞问题仍未修复,不建议合入。 |
1 similar comment
🤖 AI Code Review — PR #936 (增量 v2)async schedule [2/N]: support async prepare by @Vinkle-hzt 增量 review(1 个新 commit)新 commit 前次 P0 问题状态
结论2 个 P0 阻塞问题仍未修复,不建议合入。 |
|
🤖 AI Code Review — PR #936 PR 概述Title: 本 PR 包含两个主要功能:(1) Async Prepare — 将 speculative decoding 中 attention input 准备工作异步化;(2) Vocab Prune — 支持 draft model 使用裁剪后的小词表。 Review 意见🔴 P0 (阻塞)
🟡 P1 (重要)
🔵 P2 (建议)
亮点
整体评价存在 P0 问题(异常传播缺失 + forceSpAccept 路径丢失),不建议合入。建议先修复 P0 后再进入 CI 验证。 |
🤖 Code Review — PR #936async schedule [2/N]: support async prepare by @Vinkle-hzt 🔴 P0 —
|
c44c17f to
d0dcfbc
Compare
🤖 Code Review — PR #936async schedule [2/N]: support async prepare by @Vinkle-hzt 🔴 P0 — Sampler.cc: non_blocking=true contradicts the ROCm safety comment
// Use blocking transfer: on ROCm, hipMemcpyAsync from pageable memory is truly async
// and can cause memory access faults if a kernel reads the buffer before transfer completes.
- auto inputs_token_ids_cuda = inputs.token_ids.to(torch::kCUDA);
+ auto inputs_token_ids_cuda = inputs.token_ids.to(torch::kCUDA, true);The comment was written specifically to justify blocking transfer on ROCm. Adding 🔴 P0 — AsyncRunner: exception in worker thread causes permanent deadlock
task.fn();
stream_.synchronize();
event_.record(stream_);
// task_done_ set to true only after this blockIf Fix: wrap the task body in try/catch, store the exception, and rethrow in try {
task.fn();
stream_.synchronize();
event_.record(stream_);
} catch (...) {
pending_exception_ = std::current_exception();
}
// then in sync(): if (pending_exception_) std::rethrow_exception(pending_exception_);🟡 P1 — Partial TP broadcast in
|
🤖 Code Review — PR #936async schedule [2/N]: support async prepare by @Vinkle-hzt 🔴 P0 — AsyncRunner: no exception handling → process crash
Fix: wrap the task body in try/catch, store the exception via 🔴 P0 — total_accept_len never set → speculative decoding metrics always 0
- total_accept_len = std::accumulate(accept_lens.begin(), accept_lens.end(), 0);But executor_collector.execute_token_size += total_accept_len; // always 0
tps_collector.generate_tps = total_accept_len; // always 0
sp_engine_collector.total_accepted_token_num = total_accept_len; // always 0All speculative decoding throughput metrics will be permanently zeroed. The new total_accept_len = speculative_sampler_output.accept_len.sum().item<size_t>();🟡 P1 — Sampler.cc non_blocking=true: unsafe for non-pinned callers on ROCmThe old comment explicitly warned: // Use blocking transfer: on ROCm, hipMemcpyAsync from pageable memory is truly async
// and can cause memory access faults if a kernel reads the buffer before transfer completes.The new code switches to 🟡 P1 — TP broadcast after rejection sampling: missing execSyncCommunication
execBroadcast({{model_input.combo_tokens}, 0});
execBroadcast({{model_input.last_hidden_states}, 0});
execBroadcast({{model_input.lm_output_indexes}, 0});The old 🟡 P1 — prepared_attention_inputs_ is a plain bool shared between threads
🔵 P2 — cout in ExpertBalancer constructor
cout << "ExpertBalancer constructed with " << log_exp_num << " logical experts" << endl;Should use 🔵 P2 — Large commented-out code blocks
🔵 P2 — t2d_map loaded but never used at runtime
// Reserved for future target-to-draft mapping. Loaded but not yet used at runtime.
torch::Tensor t2d_map;Loading unused weights wastes GPU memory. Either use it or don't load it yet. 🔵 P2 — Static stream in tpSyncModelInputs is process-global
static auto host_tensor_stream = cuda_graph::graphGetStreamFromPool(true);The comment acknowledges this breaks with multiple engine instances in the same process. This is a latent bug for test environments and future multi-engine deployments. 🟢 Strengths
Summary: 2 P0 blockers (AsyncRunner crash on exception, total_accept_len metrics zeroed), 3 P1 issues (ROCm non_blocking safety, TP broadcast missing sync, prepared_attention_inputs_ data race). The async prepare concept is sound but needs these fixes before merge. 🤖 Generated by RTP-LLM Code Review Bot |
LLLLKKKK
left a comment
There was a problem hiding this comment.
Code Review — PR #936
Summary: Adds async attention-input preparation to the MTP speculative decoding pipeline, overlapping CPU-side KV cache setup with GPU compute. Also introduces vocab pruning (draft-to-target token mapping), a new rejection sampling kernel replacing the old chain speculative sampling, and several supporting changes (EPLB world_size guard, TP broadcast optimization, Sampler non-blocking H2D).
P0 — Must fix
-
Race on
d2d_copies_betweenprepareAttentionInputsandforwardin PyWrappedModel (rtp_llm/cpp/models/PyWrappedModel.cc).prepareAttentionInputs()callsd2d_copies_.clear()thenfusedCopy(d2d_copies_). Butd2d_copies_is a member field also used bytensorHoldHostAndToCuda()called fromforward(). WhenprepareAttentionInputsruns on the AsyncRunner worker thread whileforward()runs on the main thread, both can touchd2d_copies_concurrently. The comment "No fusedCopy here" inforward()is the only guard — this is fragile. Recommend either (a) givingprepareAttentionInputsits own localFusedD2DCopyParams, or (b) adding an explicit assertion thatd2d_copies_is exclusively owned byprepareAttentionInputsand must not be touched inforward. -
attention_inputs_andgraph_state_written by async thread, read by main thread (PyWrappedModel.cc:317-333,PyWrappedModel.h:102-104). The acquire/release pair on the atomic bool is technically correct for the current code path wheresync()is called beforeforward()inMtpExecutor::decodeStep. However, this invariant is not enforced at the PyWrappedModel level — if anyone callsforward()without priorsync(), the behavior is undefined. Consider adding an assertion or documenting this contract explicitly.
P1 — Should fix
-
force_mask[idx] = trueis a CPU scalar write to a GPU tensor (SpeculativeSampler.cc:1557). Theforce_masktensor is created ontarget_device(CUDA), but the loop does scalar CPU writes to a CUDA tensor, triggering implicit sync per element. Build on CPU first, then move to CUDA (similar to howdo_sampleis handled nearby). -
Partial TP broadcast in
decodeStepmay miss fields (MtpExecutor.cc:696-700). The new code broadcasts onlycombo_tokens,last_hidden_states, andlm_output_indexesafter rejection sampling.input_lengthsis no longer explicitly updated per-batch inupdateDecodePostDraftModelInput. Verify that the draft prefill model does not depend on per-sequenceinput_lengthsbeing updated after rejection. -
model_input_copyshallow-copies tensor fields (MtpExecutor.cc:597-614). The async thread and main thread share underlying tensor storage for fields likekv_cache_block_id. This appears safe becauseprepareAttentionInputsonly reads these fields anddraftModelDecodecreates new tensors rather than modifying in-place — but this is a subtle invariant worth documenting. -
collect_metrics_stream_synchronization (MtpExecutor.cc:775-781)..item<int>()inside aStreamGuardtriggers device sync on the metrics stream. If on the hot path, this adds latency. Consider making this truly async or moving off the critical path. -
Missing newline at end of file in
mapping.cu,mapping.h,sampling.cu,sampling.h.-Wnewline-eofwill warn. -
pinned_check_remaining_decrement moved toprepareAttentionInputs(PyWrappedModel.cc:300-302). Placement is confusing since the pinned check was originally a forward-count guard. Consider keeping the decrement inforward()for clarity.
P2 — Nits
-
vec_dtypes.cuhis 1522 lines of vendored FlashInfer code. Consider sharing with the existing3rdparty/flashinferdependency instead of duplicating. -
W.hhas 34 lines of whitespace-only reformatting — adds noise to diff and blame. Consider separating formatting from functional changes. -
SpeculativeSamplingParamschanged from reference members to value members (OpData.h:361-389). Good change, but the removed constructor relies on aggregate initialization — fine for C++17 but worth noting. -
output_token_ids_dcreated with.requires_grad(false)(SpeculativeSampler.cc:1474-1476). Integer tensors never have gradients — redundant forkInt32. -
ExpertBalancerworld_size_default of 1 means old call sites would silently get single-rank behavior. Verify all call sites are updated. -
createMinFakeDecodeStreamadds avocab_sizeparameter that shadows the membervocab_size_. Consider naming itdraft_vocab_sizefor consistency.
Architecture assessment
The async prepare pattern is well-structured: AsyncRunner is a clean single-task worker with proper exception propagation, RAII shutdown, and CUDA stream isolation. The split of prepareInputs into prepareInputData + prepareAttentionInputs is a sensible decomposition. The main risk area is the implicit contract between PyWrappedModel::prepareAttentionInputs (async) and forward (main thread) sharing mutable member state — this works today but is fragile under future modifications. Recommend hardening the ownership boundaries before merge.
1b2d3d2 to
9b30851
Compare
AI Code Review - PR #936Status: BLOCKING Summary: P0/0 · P1/1 · P2/2 · P3/1 Blocking IssuesP1
Non-blocking SuggestionsP2
P3
Review Checklist: 5 pass / 4 failGeneral Principles ChecklistFailed
Passed
RTP-LLM ChecklistFailed
Passed
Python Static-First ChecklistPassed
Strengths
|
AI Code Review - PR #936Status: BLOCKING Summary: P0/0 · P1/2 · P2/3 · P3/2 Blocking IssuesP1
Non-blocking SuggestionsP2
P3
Review Checklist: 8 pass / 5 failGeneral Principles ChecklistFailed
Passed
RTP-LLM ChecklistFailed
Passed
Python Static-First ChecklistFailed
Passed
Strengths
|
AI Code Review - PR #936Status: BLOCKING Summary: P0/0 · P1/1 · P2/0 · P3/0 Blocking IssuesP1
Checklist Violations (2 fail / 60 total)General Principles Checklist
RTP-LLM Checklist
Strengths
|
AI Code Review - PR #936Status: LGTM Summary: P0/0 · P1/0 · P2/1 · P3/2 lgtm ready to ci Non-blocking SuggestionsP2
P3
Checklist Violations (4 fail / 111 total)General Principles Checklist
RTP-LLM Checklist
Strengths
|
Add async prepare plumbing for speculative decoding and MTP execution, including AsyncRunner coverage and CUDA graph integration. Fold in review fixes for tensor allocation, memory copy plan lifetime, GPU broadcast sync removal, and speculative sampling CUDA test registration. Squashed from 8 original commits: - 5746930 feat: support async prepare - e5db705 chore: fix review issues - 391d083 fix: release memory copy plan before marking done - af05149 chore: rm broadcast sync for gpu - f46f46d refactor: optimize tensor handling in MtpExecutor and AsyncRunner - 0ccdfcc fix: update tensor allocation in MtpExecutor for CUDA compatibility - eb7b3e8 fix: address speculative decode review issues - a473e18 test: register speculative sampling cuda test
AI Code Review - PR #936Status: LGTM Summary: P0/0 · P1/0 · P2/5 · P3/0 lgtm ready to ci Non-blocking SuggestionsP2
Checklist Violations (6 fail / 88 total)General Principles Checklist
RTP-LLM Checklist
Strengths
|
|
internal source has been updated, please review the changes! |
based on #521