async schedule [3/N]: support async scheduler#972
Open
Vinkle-hzt wants to merge 11 commits intomainfrom
Open
Conversation
Add async prepare plumbing for speculative decoding and MTP execution, including AsyncRunner coverage and CUDA graph integration. Fold in review fixes for tensor allocation, memory copy plan lifetime, GPU broadcast sync removal, and speculative sampling CUDA test registration. Squashed from 8 original commits: - 5746930 feat: support async prepare - e5db705 chore: fix review issues - 391d083 fix: release memory copy plan before marking done - af05149 chore: rm broadcast sync for gpu - f46f46d refactor: optimize tensor handling in MtpExecutor and AsyncRunner - 0ccdfcc fix: update tensor allocation in MtpExecutor for CUDA compatibility - eb7b3e8 fix: address speculative decode review issues - a473e18 test: register speculative sampling cuda test
- Added CpuTpBroadcaster class to facilitate CPU tensor broadcasting using Unix Domain Sockets, avoiding NCCL's cudaDeviceSynchronize stalls. - Introduced methods for initialization and broadcasting, ensuring thread safety and proper handling of intra-node tensor communication. - Updated relevant build files to include the new broadcaster and modified execBroadcastCpu to utilize it for CPU tensors. - Integrated initialization in Python to ensure proper setup across TP ranks.
perf(mtp,engine): Stage 3 phase 3.1-3.4 + fix hang (squashed) Squash of 5 commits from async_opt2 rebased on origin/async2: a49e858 perf(mtp): phase 3.1 - GPU-resident propose tokens for decode prepare 2f4233f perf(mtp): phase 3.4 - launch draft-prefill prepare before target verify 283b177 perf(engine): phase 3.2 - async scheduling scaffold (BatchFuture + result thread) 0e617b3 perf(scheduler,executor): phase 3.3 - conservative scheduling + linear-attn KV swap event scaffold e4b4c1f fix: fix hang Conflict resolution: 2f4233f removed a duplicate draft_prefill_prepare launch that async2's MtpExecutor.cc still had after target_verify; the resolution keeps only the new pre-verify launch site (line ~653). All new code is dead-path unless RTP_LLM_ASYNC_SCHEDULING=1 is exported. Phase 3.1 GPU propose-tokens path is gated by kPhase31MinBatchForGpu batch-size threshold (issuses/phase31/001). Refs: mtp_stream_async/design_full_async.md, issuses/phase31/001. perf(mtp): Stage 3 phase 3.2 lite - stream-async 5-step (squashed) Squash of 5 commits from async_opt2 rebased on G1 (phase 3.1-3.4 scaffold): d41a6dc perf(mtp): phase 3.2 lite step 1 - GenerateStream device tensor fields 92fb613 perf(mtp): phase 3.2 lite step 2 - dispatchDecode fork bookkeeping worker c193b6d perf(mtp): phase 3.2 lite step 3 - prepareDecode*Input device-resident path ef95908 perf(mtp): phase 3.2 lite step 4 - 1-step async buffer + linear-attn swap event 8c83f94 perf(mtp): phase 3.2 lite step 5 - env switch + true stream-async End-to-end behaviour: GenerateStream gains 4 device-resident fields (accept_len_gpu_ / accept_tokens_gpu_ / next_seq_len_gpu_ / propose_tokens_gpu_) with setSpecDecodeDeviceState() plumbing. MtpExecutor::dispatchDecodeAsync forks a spec_bookkeeping_runner_ worker (dedicated CUDA stream + thread) that performs D2H + specUpdate + KV release outside the main thread. The worker waits on rejection_event + draft_event via cudaStreamWaitEvent (no CPU sync on main stream) and records a swap-done event on a worker-owned torch::Event so the next step's main stream can wait via cudaStreamWaitEvent. MtpBatchStreamProcessor's prepareDecodeDraftModelInput / prepareOneStepSpecDecodeModelInput gain a device-resident "third branch" that builds next-step combo_tokens / lm_output_indexes / prefix_lengths / input_lengths fully on GPU using the per-stream device tensors attached by dispatchDecodeAsync, bypassing the worker's specUpdate dependency. MtpExecutor::decodeStep adds a wait_prev_bookkeeping scope at entry (spec_bookkeeping_runner_.sync) to cap outstanding worker tasks at one step (KV reservation bookkeeping safety) and a wait_pending_linear_attn_swaps scope for linear-attention KV swap correctness. Gated by RTP_LLM_MTP_STREAM_ASYNC=1 env (default 0). When enabled: - dispatch_output 9122us -> 66us (-99.3%) - MTP tok/iter 3.7 preserved (no race) - step CPU wall ~17ms -> 16.4ms (-3.5%) on H20x2 / Qwen3.5-MoE-FP8 Refs: mtp_stream_async/design.md, mtp_stream_async/perf.md commit5_env1 baseline. perf(mtp): Pin D2H accept_len/accept_tokens (squashed) Cherry-pick of 1 commit from async_opt2 rebased on G3 (UDS broadcaster): 4d1fd13 perf(mtp): phase 3.2 lite step 6 - Pin D2H accept_len/accept_tokens MtpBatchStreamProcessor::prepareDecodeSpecUpdateInfo converts the worker thread's D2H of accept_len / accept_tokens from raw `.cpu()` (staging buffer path, ~8.5ms wall) to pinned + non_blocking=true + explicit stream synchronization. The target tensors are allocated with `torch::TensorOptions().pinned_memory(true)` once per stream scope and reused across iterations. Impact measured on pin_d2h_v2 timeline (committed verbatim from async_opt2 history): - cudaMemcpyAsync big D2H: 8630us -> 0 events >1ms (worker-inline memcpy 6-7us) - aten::to: 8640us -> max 119us (-98.6%) - worker total wall: 7091us -> 5226us (round 3, -26%) - wait_prev_bookkeeping: unchanged (~8.6ms, proven to be GPU pipeline depth not D2H staging — see issuses/phase33/001 for the full analysis) Still a real correctness-neutral improvement even though it doesn't shrink wait_prev_bookkeeping — eliminates the D2H staging buffer copy so worker wall time is much tighter. Refs: mtp_stream_async/perf_data/pin_d2h_v2_20260427_2151, issuses/phase33/001-deferred-wait-target-verify-cudagraph-side-effect.md
- Optimize D2H transfer for accept_len and accept_tokens - Remove unnecessary stream synchronization, enhance tensor handling - Device-ify model input indexes - Device-ify attention inputs Squashed from 4 commits.
…st syncs MTP fuse optimizations: - Use nonblocking H2D for gathered inputs - Fuse target verify metadata preparation - Fuse speculative decode metadata preparation - Fuse decode TP sync copies - Fuse post-layer logits allgather CUDA graph host sync optimizations: - Optimize prepare attention inputs (fused fill kernel) - Remove replay prepare host sync - Skip forward event sync when called inline Squashed from 8 commits.
Add async-friendly MTP device-state framework with optional broadcast-sync
elimination behind RTP_LLM_MTP_DROP_BROAD_SYNC (default OFF).
- N0: Add async device-state env gates (default off)
- N1: Group MTP device-state into struct + epoch guard
- N2: Stop bumping host seqLength in dispatchDecodeAsync
- N3: Extract device-state gather into named method
- N4: Wire epoch-guarded clear in spec_bookkeeping worker
- N5: Document stop-word eval contract on MTP async path
- N6: Document TP / linear / multi-step device-state contracts
- N7: Add fallback observability + audit hot-path .cpu()/.item()
- N8: Add RTP_LLM_MTP_DROP_BROAD_SYNC opt-in + main-thread publish of
hidden_states/all_probs to fix duplicate-output race
N8 race + fix detail
--------------------
When RTP_LLM_MTP_DROP_BROAD_SYNC=1 the per-step broad sync at the top of
decodeStep is removed, so the next step's main thread runs concurrently
with the previous step's bookkeeping worker. The worker writes
sp_output_buffer->{hidden_states, all_probs} inside specUpdate after a
~6.5ms D2H wait, while the next step's main reads them within microseconds
of dispatchDecodeAsync return — racing the worker and silently consuming
the previous-previous step's values. With propose_step=4 this dropped
acceptance from 4.20 to 2.42 tok/iter on Qwen2-MTP, even though the
output text stayed correct.
Fix: extend the epoch-guarded MtpAsyncDeviceState with two new fields and
have the main thread compute and publish them inside dispatchDecodeAsync
before forking the worker:
- last_hidden_states_gpu = draft_all_hidden.narrow(per-stream slice)
.index_select(accept_len - 1)
- draft_all_probs_gpu = draft_all_probs.narrow(per-stream slice).clone()
index_select returns a fresh storage; narrow is a view, so all_probs is
cloned to break the alias to draft_prefill_output (held by the worker
lambda and released independently). All ops stay on the main stream — no
.cpu(), no .item(), no synchronize.
Readers in MtpBatchStreamProcessor (gatherHiddenStates,
update{One,Multi}StepDraftSamplerOutput) now prefer the device-state
field, falling back to sp_output_buffer->{hidden_states, all_probs}
only when device-state is undefined. Fallback is safe because the
worker writes sp_output_buffer BEFORE clearMtpAsyncDeviceState (post
bb2d996's clear-after-dispatch reorder).
Verified on Qwen2-MTP, propose_step=4, H20:
baseline DROP=0 accept=4.20 iter=16
fix DROP=1 accept=4.20 iter=16
(per-iter step_output_len byte-identical to baseline;
reported metric 3.94 vs 4.20 is a measurement
artifact of iter_count being bumped by an extra
speculative-only gather() in the async path)
Squashed from 11 commits.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Squashed series 8a74a7e..c335b8a + uncommitted fixes. - MHA paged-attn device planner (FlashInferMlaAttnParams::fillParamsMhaDevice + mhaPagedAttnPlanKernel): drop the host fillParams loop and host->device cudaMemcpyAsync from the four MHA prepare paths (prefill paged/non-paged, decode, decode CUDA-graph replay). Fills the same buf_d slices fillParams uses so existing FlashInfer _paged_kv_*_buf aliases stay valid. - KV kernel block table is now device-resident; drop the per-group _host copies and the spurious _host sentinel branches in the attention bindings. - Gate setupKVCacheForAttentionInputs D2H on use_mla so MHA paths skip the unused copy. - Fixes: * fillParamsMhaDevice batch_reuse_info_size envelope: align with fillParams / fillDecodeCudaGraphParams (batch_size * 4) so a later replay through either path doesn't trip forbid_realloc. * fillParamsMhaDevice input_token_num_upper envelope: was sized by batch_size, undersizing positions_d / batch_indice_d for prefill (1 request x 7890 tokens narrowed against a 512-element buffer -> 'start (0) + length (7890) exceeds dimension size (512)' in base_rotary_embedding_op._apply_rope). Use the same worst-case envelope style as page_num_upper: batch_size * max_blocks_per_bs * seq_size_per_block (no host sync). * MtpBatchStreamProcessor::prepareDecodeDraftModelInput stream-async eligibility: collapse the two-pass defined()-then-select() loop into a single iteration that snapshots getProposeTokensGpu() by value before use. With RTP_LLM_MTP_DROP_BROAD_SYNC=1 the bookkeeping worker can call clearMtpAsyncDeviceState between the original loops, leaving the second loop's select() to dereference an undefined tensor (c10::Error 'Expected a proper Tensor but got None for argument #0 self').
Adds focused re-gathering of kv_cache_kernel_block_id between MTP propose and verify for linear-attention models, then updates the model attention inputs against the fresh page table before the verify forward. Adds a focused updateKVCacheKernelBlockId hook on ModelBase / GraphBase so the re-gather path updates only kv_cache_kernel_block_id-dependent state instead of rebuilding all attention inputs and replaying unrelated CUDA graph mirrors. Validated in original commits with Qwen3.5-35B-A3B-FP8 TP=2 MTP propose=4 curl rounds; follow-up hook build validation passed. Squashed from 2 original commits: - 7b418f443 feat(mtp): re-gather kv_cache_kernel_block_id between propose and verify - 028f77dbc refactor(mtp): focused updateKVCacheKernelBlockId hook for re-gather propagation
Collaborator
AI Code Review - PR #972Status: LGTM Summary: P0/0 · P1/0 · P2/1 · P3/1 lgtm ready to ci Non-blocking SuggestionsP2
P3
Checklist Violations (5 fail / 67 total)General Principles Checklist
RTP-LLM Checklist
Python Static-First Checklist
Strengths
|
Added NormalAsyncDeviceState struct to manage state for normal decode async operations, allowing for efficient handling of sampled tokens and sequence lengths. Implemented methods to set and retrieve this state. Updated relevant components to utilize the new state structure, enhancing the async processing capabilities of the NormalExecutor and related classes.
Collaborator
AI Code Review - PR #972Status: BLOCKING Summary: P0/0 · P1/1 · P2/1 · P3/1 Blocking IssuesP1
Non-blocking SuggestionsP2
P3
Checklist Violations (10 fail / 111 total)General Principles Checklist
RTP-LLM Checklist
Python Static-First Checklist
Strengths
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
No description provided.