Skip to content

async schedule [3/N]: support async scheduler#972

Open
Vinkle-hzt wants to merge 11 commits intomainfrom
feat/async3
Open

async schedule [3/N]: support async scheduler#972
Vinkle-hzt wants to merge 11 commits intomainfrom
feat/async3

Conversation

@Vinkle-hzt
Copy link
Copy Markdown
Collaborator

No description provided.

Vinkle-hzt and others added 10 commits May 6, 2026 11:06
Add async prepare plumbing for speculative decoding and MTP execution, including AsyncRunner coverage and CUDA graph integration.

Fold in review fixes for tensor allocation, memory copy plan lifetime, GPU broadcast sync removal, and speculative sampling CUDA test registration.

Squashed from 8 original commits:

- 5746930 feat: support async prepare
- e5db705 chore: fix review issues
- 391d083 fix: release memory copy plan before marking done
- af05149 chore: rm broadcast sync for gpu
- f46f46d refactor: optimize tensor handling in MtpExecutor and AsyncRunner
- 0ccdfcc fix: update tensor allocation in MtpExecutor for CUDA compatibility
- eb7b3e8 fix: address speculative decode review issues
- a473e18 test: register speculative sampling cuda test
- Added CpuTpBroadcaster class to facilitate CPU tensor broadcasting using Unix Domain Sockets, avoiding NCCL's cudaDeviceSynchronize stalls.
- Introduced methods for initialization and broadcasting, ensuring thread safety and proper handling of intra-node tensor communication.
- Updated relevant build files to include the new broadcaster and modified execBroadcastCpu to utilize it for CPU tensors.
- Integrated initialization in Python to ensure proper setup across TP ranks.
perf(mtp,engine): Stage 3 phase 3.1-3.4 + fix hang (squashed)

Squash of 5 commits from async_opt2 rebased on origin/async2:
  a49e858 perf(mtp): phase 3.1 - GPU-resident propose tokens for decode prepare
  2f4233f perf(mtp): phase 3.4 - launch draft-prefill prepare before target verify
  283b177 perf(engine): phase 3.2 - async scheduling scaffold (BatchFuture + result thread)
  0e617b3 perf(scheduler,executor): phase 3.3 - conservative scheduling + linear-attn KV swap event scaffold
  e4b4c1f fix: fix hang

Conflict resolution: 2f4233f removed a duplicate draft_prefill_prepare
launch that async2's MtpExecutor.cc still had after target_verify; the
resolution keeps only the new pre-verify launch site (line ~653).

All new code is dead-path unless RTP_LLM_ASYNC_SCHEDULING=1 is exported.
Phase 3.1 GPU propose-tokens path is gated by kPhase31MinBatchForGpu
batch-size threshold (issuses/phase31/001).

Refs: mtp_stream_async/design_full_async.md, issuses/phase31/001.

perf(mtp): Stage 3 phase 3.2 lite - stream-async 5-step (squashed)

Squash of 5 commits from async_opt2 rebased on G1 (phase 3.1-3.4 scaffold):
  d41a6dc perf(mtp): phase 3.2 lite step 1 - GenerateStream device tensor fields
  92fb613 perf(mtp): phase 3.2 lite step 2 - dispatchDecode fork bookkeeping worker
  c193b6d perf(mtp): phase 3.2 lite step 3 - prepareDecode*Input device-resident path
  ef95908 perf(mtp): phase 3.2 lite step 4 - 1-step async buffer + linear-attn swap event
  8c83f94 perf(mtp): phase 3.2 lite step 5 - env switch + true stream-async

End-to-end behaviour:

GenerateStream gains 4 device-resident fields (accept_len_gpu_ /
accept_tokens_gpu_ / next_seq_len_gpu_ / propose_tokens_gpu_) with
setSpecDecodeDeviceState() plumbing.

MtpExecutor::dispatchDecodeAsync forks a spec_bookkeeping_runner_ worker
(dedicated CUDA stream + thread) that performs D2H + specUpdate + KV
release outside the main thread. The worker waits on rejection_event +
draft_event via cudaStreamWaitEvent (no CPU sync on main stream) and
records a swap-done event on a worker-owned torch::Event so the next
step's main stream can wait via cudaStreamWaitEvent.

MtpBatchStreamProcessor's prepareDecodeDraftModelInput /
prepareOneStepSpecDecodeModelInput gain a device-resident "third branch"
that builds next-step combo_tokens / lm_output_indexes / prefix_lengths
/ input_lengths fully on GPU using the per-stream device tensors
attached by dispatchDecodeAsync, bypassing the worker's specUpdate
dependency.

MtpExecutor::decodeStep adds a wait_prev_bookkeeping scope at entry
(spec_bookkeeping_runner_.sync) to cap outstanding worker tasks at one
step (KV reservation bookkeeping safety) and a wait_pending_linear_attn_swaps
scope for linear-attention KV swap correctness.

Gated by RTP_LLM_MTP_STREAM_ASYNC=1 env (default 0). When enabled:
- dispatch_output 9122us -> 66us (-99.3%)
- MTP tok/iter 3.7 preserved (no race)
- step CPU wall ~17ms -> 16.4ms (-3.5%) on H20x2 / Qwen3.5-MoE-FP8

Refs: mtp_stream_async/design.md, mtp_stream_async/perf.md
commit5_env1 baseline.

perf(mtp): Pin D2H accept_len/accept_tokens (squashed)

Cherry-pick of 1 commit from async_opt2 rebased on G3 (UDS broadcaster):
  4d1fd13 perf(mtp): phase 3.2 lite step 6 - Pin D2H accept_len/accept_tokens

MtpBatchStreamProcessor::prepareDecodeSpecUpdateInfo converts the worker
thread's D2H of accept_len / accept_tokens from raw `.cpu()` (staging
buffer path, ~8.5ms wall) to pinned + non_blocking=true + explicit
stream synchronization. The target tensors are allocated with
`torch::TensorOptions().pinned_memory(true)` once per stream scope and
reused across iterations.

Impact measured on pin_d2h_v2 timeline (committed verbatim from
async_opt2 history):
  - cudaMemcpyAsync big D2H: 8630us -> 0 events >1ms (worker-inline
    memcpy 6-7us)
  - aten::to: 8640us -> max 119us (-98.6%)
  - worker total wall: 7091us -> 5226us (round 3, -26%)
  - wait_prev_bookkeeping: unchanged (~8.6ms, proven to be GPU
    pipeline depth not D2H staging — see
    issuses/phase33/001 for the full analysis)

Still a real correctness-neutral improvement even though it doesn't
shrink wait_prev_bookkeeping — eliminates the D2H staging buffer copy
so worker wall time is much tighter.

Refs: mtp_stream_async/perf_data/pin_d2h_v2_20260427_2151,
issuses/phase33/001-deferred-wait-target-verify-cudagraph-side-effect.md
- Optimize D2H transfer for accept_len and accept_tokens
- Remove unnecessary stream synchronization, enhance tensor handling
- Device-ify model input indexes
- Device-ify attention inputs

Squashed from 4 commits.
…st syncs

MTP fuse optimizations:
- Use nonblocking H2D for gathered inputs
- Fuse target verify metadata preparation
- Fuse speculative decode metadata preparation
- Fuse decode TP sync copies
- Fuse post-layer logits allgather

CUDA graph host sync optimizations:
- Optimize prepare attention inputs (fused fill kernel)
- Remove replay prepare host sync
- Skip forward event sync when called inline

Squashed from 8 commits.
Add async-friendly MTP device-state framework with optional broadcast-sync
elimination behind RTP_LLM_MTP_DROP_BROAD_SYNC (default OFF).

- N0: Add async device-state env gates (default off)
- N1: Group MTP device-state into struct + epoch guard
- N2: Stop bumping host seqLength in dispatchDecodeAsync
- N3: Extract device-state gather into named method
- N4: Wire epoch-guarded clear in spec_bookkeeping worker
- N5: Document stop-word eval contract on MTP async path
- N6: Document TP / linear / multi-step device-state contracts
- N7: Add fallback observability + audit hot-path .cpu()/.item()
- N8: Add RTP_LLM_MTP_DROP_BROAD_SYNC opt-in + main-thread publish of
      hidden_states/all_probs to fix duplicate-output race

N8 race + fix detail
--------------------
When RTP_LLM_MTP_DROP_BROAD_SYNC=1 the per-step broad sync at the top of
decodeStep is removed, so the next step's main thread runs concurrently
with the previous step's bookkeeping worker. The worker writes
sp_output_buffer->{hidden_states, all_probs} inside specUpdate after a
~6.5ms D2H wait, while the next step's main reads them within microseconds
of dispatchDecodeAsync return — racing the worker and silently consuming
the previous-previous step's values. With propose_step=4 this dropped
acceptance from 4.20 to 2.42 tok/iter on Qwen2-MTP, even though the
output text stayed correct.

Fix: extend the epoch-guarded MtpAsyncDeviceState with two new fields and
have the main thread compute and publish them inside dispatchDecodeAsync
before forking the worker:

  - last_hidden_states_gpu = draft_all_hidden.narrow(per-stream slice)
                                              .index_select(accept_len - 1)
  - draft_all_probs_gpu    = draft_all_probs.narrow(per-stream slice).clone()

index_select returns a fresh storage; narrow is a view, so all_probs is
cloned to break the alias to draft_prefill_output (held by the worker
lambda and released independently). All ops stay on the main stream — no
.cpu(), no .item(), no synchronize.

Readers in MtpBatchStreamProcessor (gatherHiddenStates,
update{One,Multi}StepDraftSamplerOutput) now prefer the device-state
field, falling back to sp_output_buffer->{hidden_states, all_probs}
only when device-state is undefined. Fallback is safe because the
worker writes sp_output_buffer BEFORE clearMtpAsyncDeviceState (post
bb2d996's clear-after-dispatch reorder).

Verified on Qwen2-MTP, propose_step=4, H20:
  baseline DROP=0     accept=4.20  iter=16
  fix      DROP=1     accept=4.20  iter=16
                      (per-iter step_output_len byte-identical to baseline;
                       reported metric 3.94 vs 4.20 is a measurement
                       artifact of iter_count being bumped by an extra
                       speculative-only gather() in the async path)

Squashed from 11 commits.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Squashed series 8a74a7e..c335b8a + uncommitted fixes.

- MHA paged-attn device planner (FlashInferMlaAttnParams::fillParamsMhaDevice
  + mhaPagedAttnPlanKernel): drop the host fillParams loop and host->device
  cudaMemcpyAsync from the four MHA prepare paths (prefill paged/non-paged,
  decode, decode CUDA-graph replay). Fills the same buf_d slices fillParams
  uses so existing FlashInfer _paged_kv_*_buf aliases stay valid.
- KV kernel block table is now device-resident; drop the per-group _host
  copies and the spurious _host sentinel branches in the attention bindings.
- Gate setupKVCacheForAttentionInputs D2H on use_mla so MHA paths skip the
  unused copy.
- Fixes:
  * fillParamsMhaDevice batch_reuse_info_size envelope: align with
    fillParams / fillDecodeCudaGraphParams (batch_size * 4) so a later
    replay through either path doesn't trip forbid_realloc.
  * fillParamsMhaDevice input_token_num_upper envelope: was sized by
    batch_size, undersizing positions_d / batch_indice_d for prefill (1
    request x 7890 tokens narrowed against a 512-element buffer ->
    'start (0) + length (7890) exceeds dimension size (512)' in
    base_rotary_embedding_op._apply_rope). Use the same worst-case envelope
    style as page_num_upper: batch_size * max_blocks_per_bs *
    seq_size_per_block (no host sync).
  * MtpBatchStreamProcessor::prepareDecodeDraftModelInput stream-async
    eligibility: collapse the two-pass defined()-then-select() loop into a
    single iteration that snapshots getProposeTokensGpu() by value before
    use. With RTP_LLM_MTP_DROP_BROAD_SYNC=1 the bookkeeping worker can call
    clearMtpAsyncDeviceState between the original loops, leaving the second
    loop's select() to dereference an undefined tensor (c10::Error
    'Expected a proper Tensor but got None for argument #0 self').
Adds focused re-gathering of kv_cache_kernel_block_id between MTP
propose and verify for linear-attention models, then updates the model
attention inputs against the fresh page table before the verify forward.

Adds a focused updateKVCacheKernelBlockId hook on ModelBase / GraphBase
so the re-gather path updates only kv_cache_kernel_block_id-dependent
state instead of rebuilding all attention inputs and replaying unrelated
CUDA graph mirrors.

Validated in original commits with Qwen3.5-35B-A3B-FP8 TP=2 MTP
propose=4 curl rounds; follow-up hook build validation passed.

Squashed from 2 original commits:
- 7b418f443 feat(mtp): re-gather kv_cache_kernel_block_id between propose and verify
- 028f77dbc refactor(mtp): focused updateKVCacheKernelBlockId hook for re-gather propagation
@Vinkle-hzt Vinkle-hzt requested a review from LLLLKKKK as a code owner May 7, 2026 09:37
@LLLLKKKK
Copy link
Copy Markdown
Collaborator

LLLLKKKK commented May 7, 2026

AI Code Review - PR #972

Status: LGTM

Summary: P0/0 · P1/0 · P2/1 · P3/1

lgtm ready to ci

Non-blocking Suggestions

P2

  • GPU 快路径阈值 -1 实际强制启用小 batch 路径 @ rtp_llm/cpp/normal_engine/speculative/MtpBatchStreamProcessor.cc:347
    • 建议:将判断改为 kPhase31MinBatchForGpu >= 0 && batch_size >= kPhase31MinBatchForGpu,或把默认阈值设为已评估的正数。

P3

  • prepareDecodeDraftModelInput 回退日志会在热路径刷屏 @ rtp_llm/cpp/normal_engine/speculative/MtpBatchStreamProcessor.cc:304
    • 建议:改为 DEBUG 或复用已有 fallback 计数器做限频日志,只在首几次和周期性采样输出。

Checklist Violations (5 fail / 67 total)

General Principles Checklist

  • [6.1] Architecture — 可观测性:日志/指标/超时可操作、非噪声 → issue prepareDecodeDraftModelInput 回退日志会在热路径刷屏
    prepareDecodeDraftModelInput 中 stream_async_eligible=false 直接 warning,未限频。
  • [6.1] Quality — PR description 说明动机与设计 → checklist-only
    本地 packet 只有标题和 diff,未包含完整 PR body;现有缺口不足以定位到具体代码缺陷。
  • [6.1] Quality — 无 per-forward 调试日志 / 噪声热路径输出 → issue prepareDecodeDraftModelInput 回退日志会在热路径刷屏
    prepareDecodeDraftModelInput fallback warning 位于 decode 热路径。

RTP-LLM Checklist

  • [D] 性能 — 硬编码容量值 → issue GPU 快路径阈值 -1 实际强制启用小 batch 路径
    kPhase31MinBatchForGpu=-1 与 >= 判断组合后违背“负数禁用”注释。

Python Static-First Checklist

  • [P.H] 类型标注 — Any 必须附注释说明原因 → checklist-only
    _process_meta 使用 Any 继承父类 meta_dicts 形态,但新增签名未说明原因。

Strengths

  • 异步 worker 通过 AsyncRunner 捕获异常并在 sync 阶段重新抛出,避免线程异常直接终止进程。
  • 新增 device-resident MTP 状态和 CUDA graph prepare 路径配套了多处单测,覆盖状态 epoch 与 speculative sampling kernel 基本行为。

Added NormalAsyncDeviceState struct to manage state for normal decode async operations, allowing for efficient handling of sampled tokens and sequence lengths. Implemented methods to set and retrieve this state. Updated relevant components to utilize the new state structure, enhancing the async processing capabilities of the NormalExecutor and related classes.
@LLLLKKKK
Copy link
Copy Markdown
Collaborator

LLLLKKKK commented May 7, 2026

AI Code Review - PR #972

Status: BLOCKING

Summary: P0/0 · P1/1 · P2/1 · P3/1

Blocking Issues

P1

  • prefix cache 场景下 context_total_kv_length 被截断为输入 token 数 @ rtp_llm/cpp/models/PyWrappedModel.cc:193
    • 建议:保留带 prefix 的 KV 总长度计算;可仅在 prefix_lengths 非零时同步 CPU 标量,或让 fused_rope 路径消费 cu_kv_seqlens 的 device 值。

Non-blocking Suggestions

P2

  • GPU 快路径阈值 -1 没有禁用小 batch 路径 @ rtp_llm/cpp/normal_engine/speculative/MtpBatchStreamProcessor.cc:353
    • 建议:将判断改为 kPhase31MinBatchForGpu >= 0 && batch_size >= kPhase31MinBatchForGpu,或把默认阈值设为已验证的正数。

P3

  • stream_async_eligible fallback 在 decode 热路径未限频 @ rtp_llm/cpp/normal_engine/speculative/MtpBatchStreamProcessor.cc:309
    • 建议:改为 DEBUG/INFO 并接入 shouldLogFallback 计数限频,只在首次和周期采样输出。

Checklist Violations (10 fail / 111 total)

General Principles Checklist

  • [6.1] Architecture — 状态不变量:创建/更新/失败/重试/回滚路径有效 → issue prefix cache 场景下 context_total_kv_length 被截断为输入 token 数
    context_total_kv_length 少算 prefix_lengths,破坏 prefix cache KV 总长不变量。
  • [6.1] Quality — 无 per-forward 调试日志 / 噪声热路径输出 → issue stream_async_eligible fallback 在 decode 热路径未限频
    decode fallback warning 位于热路径且未限频。
  • [6.1] Tests — 边界 case 覆盖(空、单元素、最大值) → issue prefix cache 场景下 context_total_kv_length 被截断为输入 token 数
    未看到 context prefill + prefix_lengths > 0 的回归覆盖。

RTP-LLM Checklist

  • [B] 正确性与逻辑 — 逻辑错误、off-by-one、null/zero 检查 → issue prefix cache 场景下 context_total_kv_length 被截断为输入 token 数
    context_total_kv_length 从 KV 总长退化为输入 token 数。
  • [B] 正确性与逻辑 — 边界 case(空输入、单元素、最大值) → issue prefix cache 场景下 context_total_kv_length 被截断为输入 token 数
    prefix_lengths > 0 的 context prefill 未覆盖。
  • [D] 性能 — 通信 / buffer 开销评估 → issue GPU 快路径阈值 -1 没有禁用小 batch 路径
    阈值禁用失效会让小 batch 也承担额外 GPU/NCCL 开销。
  • [D] 性能 — 硬编码容量值 → issue GPU 快路径阈值 -1 没有禁用小 batch 路径
    kPhase31MinBatchForGpu=-1 与 >= 判断组合后违背负数禁用语义。
  • [H] 测试与 CI — 测试覆盖充分:大重构等价覆盖,新功能端到端测试 → issue prefix cache 场景下 context_total_kv_length 被截断为输入 token 数
    新增 device metadata 路径缺少 prefix cache 场景回归。

Python Static-First Checklist

  • [P.A] 静态结构与类型纪律 — 禁止 getattr/setattr literal 访问 → checklist-only
    py_flashinfer_mha/xqa 的 getattr 用于新增 pybind 灰度兼容,暂不作为独立缺陷。
  • [P.H] 类型标注 — Any 必须附注释说明原因 → checklist-only
    QwenV2MTPWeight 继承父类 meta_dicts: Any 签名,属于既有接口形态。

Strengths

  • AsyncRunner 捕获 worker 线程异常并在 sync/launch 侧重新抛出,避免后台线程异常直接终止进程。
  • MTP device-state 引入 epoch 防陈旧清理,并补充了 GenerateStream 相关单测。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants