Skip to content

async schedule [2/N]: support async prepare#936

Open
Vinkle-hzt wants to merge 2 commits intoalibaba:mainfrom
Vinkle-hzt:async2
Open

async schedule [2/N]: support async prepare#936
Vinkle-hzt wants to merge 2 commits intoalibaba:mainfrom
Vinkle-hzt:async2

Conversation

@Vinkle-hzt
Copy link
Copy Markdown
Collaborator

@Vinkle-hzt Vinkle-hzt commented Apr 26, 2026

based on #521

@Vinkle-hzt Vinkle-hzt requested a review from LLLLKKKK as a code owner April 26, 2026 11:32
@Vinkle-hzt Vinkle-hzt changed the title [2/N] Aysnc scheduler: support async prepare async schedule [2/N]:: support async prepare Apr 26, 2026
@Vinkle-hzt Vinkle-hzt changed the title async schedule [2/N]:: support async prepare async schedule [2/N]: support async prepare Apr 26, 2026
@LLLLKKKK
Copy link
Copy Markdown
Collaborator

🤖 AI Code Review — PR #936

async schedule [2/N]: support async prepare by @Vinkle-hzt

概述

本 PR 实现两个功能:(1) MTP speculative decoding 中 attention input 准备异步化(AsyncRunner);(2) vocab pruning 支持(d2t/t2d 映射 + 新 rejection sampling kernel)。

问题

🔴 P0(阻塞)

  1. 测试编译失败SpeculativeSamplerOutput{vector<Tensor>, vector<int>} 改为 {Tensor, Tensor},但 MtpExecutorTest.cc(L694, L869)和 MtpBatchStreamProcessorTest.cc(L156-161)仍使用旧类型构造,无法编译。

  2. total_accept_len 未赋值MtpBatchStreamProcessor::updateDecodePostDraftModelInput 中旧的 std::accumulate 被注释掉但无替代赋值,total_accept_len 始终为 0。下游 execute_token_sizegenerate_tpstotal_accepted_token_num 等 metrics 全部报 0。

🟡 P1(重要)

  1. 30+ 行注释掉的旧代码残留MtpBatchStreamProcessor.cc:261-296)——应直接删除。

  2. prepared_attention_inputs_ 标志在 PyWrappedModel 和 CudaGraphRunner 两处独立维护——当 canRun() 在 prepare 和 forward 之间结果不同时,可能导致 attention inputs 被重复准备或跳过。建议统一为单一标志源。

  3. AsyncRunner worker 异常未传播——task.fn() 抛异常会导致 std::terminate,调用方无法感知。建议 catch 存入 exception_ptrsync() 时 rethrow。

  4. AsyncRunner 无单元测试——新增并发组件建议补充独立测试。

🔵 P2(建议)

  1. Vocab pruning 和 async prepare 是两个独立功能,建议拆分为独立 PR。
  2. W.h ~60 行纯格式化改动混入功能变更。
  3. tpSyncModelInputsstatic auto host_tensor_stream 多 engine 实例共享。
  4. batchSampleoutput_token_ids_d 等 tensor per-forward 分配,可参考 draft_probs_padding_buffer_ 做 grow-only 复用。
  5. AsyncRunner::workerLoopstream_.synchronize() 限制 overlap 收益。

亮点

  • AsyncRunner 封装简洁,线程同步逻辑清晰
  • vocab pruning 的 d2t_map padding 方案合理,grow-only buffer 避免 per-forward 分配
  • rejection sampling kernel 同时支持 CUDA 和 ROCm

结论

存在 2 个 P0 阻塞问题(测试编译失败 + metrics 报 0),不建议合入。修复后可重新 review。

@LLLLKKKK
Copy link
Copy Markdown
Collaborator

🤖 AI Code Review — PR #936 (增量 v2)

async schedule [2/N]: support async prepare by @Vinkle-hzt

增量 review(1 个新 commit)

新 commit 100410be 移除 GatherBatchScheduler 中对已删除字段 load_python_model 的引用,并修复 FakeFastTopKSampler/FakeSpeculativeSampler 构造函数。新 commit 本身无问题。

前次 P0 问题状态

# 问题 状态
P0-1 测试编译失败:MtpExecutorTest.cc:694/869MtpBatchStreamProcessorTest.cc:156-161 仍使用 vector 初始化构造 SpeculativeSamplerOutput ❌ 未修复
P0-2 total_accept_len 未赋值(MtpBatchStreamProcessor.cc),metrics 全部报 0 ❌ 未修复

结论

2 个 P0 阻塞问题仍未修复,不建议合入。

1 similar comment
@LLLLKKKK
Copy link
Copy Markdown
Collaborator

🤖 AI Code Review — PR #936 (增量 v2)

async schedule [2/N]: support async prepare by @Vinkle-hzt

增量 review(1 个新 commit)

新 commit 100410be 移除 GatherBatchScheduler 中对已删除字段 load_python_model 的引用,并修复 FakeFastTopKSampler/FakeSpeculativeSampler 构造函数。新 commit 本身无问题。

前次 P0 问题状态

# 问题 状态
P0-1 测试编译失败:MtpExecutorTest.cc:694/869MtpBatchStreamProcessorTest.cc:156-161 仍使用 vector 初始化构造 SpeculativeSamplerOutput ❌ 未修复
P0-2 total_accept_len 未赋值(MtpBatchStreamProcessor.cc),metrics 全部报 0 ❌ 未修复

结论

2 个 P0 阻塞问题仍未修复,不建议合入。

Comment thread rtp_llm/models_py/bindings/core/OpData.h
@LLLLKKKK
Copy link
Copy Markdown
Collaborator

🤖 AI Code Review — PR #936

PR 概述

Title: async schedule [2/N]: support async prepare
Author: Vinkle-hzt
规模: 42 files, +3634/-706
Review 模式: 全量 review(检测到 force push/rebase)

本 PR 包含两个主要功能:(1) Async Prepare — 将 speculative decoding 中 attention input 准备工作异步化;(2) Vocab Prune — 支持 draft model 使用裁剪后的小词表。


Review 意见

🔴 P0 (阻塞)

  1. AsyncRunner worker 线程异常未传播
    AsyncRunner::workerLoop()task.fn() 如果抛出异常,会在 worker 线程中未被捕获导致 std::terminate()。调用方的 sync() 无法感知任务失败。
    建议:catch 异常存储到 std::exception_ptr,在 sync() 和下一次 launch() 时 rethrow。

  2. forceSpAccept 路径被删除,新代码无等价处理
    batchSample() 中有 forceSpAccept() 分支强制接受所有 draft tokens。新代码删除了该逻辑,rejection sampling kernel 中没有等价处理。
    建议:在 kernel 调用前处理 forceSpAccept,或通过 uniform_samples=0 使所有 token 必定被接受。

🟡 P1 (重要)

  1. prepared_attention_inputs_ 异常路径未重置 — forward() 异常时标志不会被重置,下次调用会使用过期的 attention_inputs_。建议使用 RAII guard。

  2. AsyncRunner 无单元测试 — 新增的并发组件涉及线程同步、CUDA stream 管理,但没有独立测试。

  3. tpSyncModelInputs 改动无多卡测试 — broadcast 策略从合并改为分开发送且引入独立 stream,需要 TP>1 测试验证。

  4. model_input_copy 浅拷贝可能共享 tensor 底层数据 — 异步 lambda 捕获的 model_input_copy 中未重新赋值的 tensor 字段与主线程共享底层存储,需确认 worker 线程不会修改这些共享 tensor。

  5. static host_tensor_stream 多实例共享tpSyncModelInputs 中的 static stream 被所有 engine 实例共享,多实例并发时可能数据损坏。

  6. decodeStep 中 tpSyncModelInputs 替换为部分 broadcast — 只 broadcast 了 3 个 tensor,原 tpSyncModelInputs 同步的字段更多,需确认其他字段已通过其他方式同步。

🔵 P2 (建议)

  • CUDA/ROCm 两侧 rejectionSampling/mappingDraft2Target 代码几乎完全重复,建议提取公共模板
  • W.h 纯格式化对齐改动与功能改动混在同一 PR
  • 建议拆分为两个独立 PR(async prepare + vocab prune)
  • Commit 不原子:commit 1 message 与实际内容不符
  • MtpBatchStreamProcessor 中约 30 行注释掉的旧代码未清理
  • batchSample() 中多个 tensor 每次 forward 都分配,可考虑预分配

亮点

  • rejection sampling CUDA kernel 实现质量高,测试覆盖充分
  • AsyncRunner 封装良好,职责单一
  • draft_probs_padding_buffer_ 使用 grow-only 预分配
  • ROCm 对称实现完整

整体评价

存在 P0 问题(异常传播缺失 + forceSpAccept 路径丢失),不建议合入。建议先修复 P0 后再进入 CI 验证。

@LLLLKKKK
Copy link
Copy Markdown
Collaborator

🤖 Code Review — PR #936

async schedule [2/N]: support async prepare by @Vinkle-hzt
45 files, +3853/−725. Reviewed the full diff.


🔴 P0 — combo_tokens now includes padding tokens passed to the model

In MtpBatchStreamProcessor.cc, updateDecodePostDraftModelInput was rewritten:

int total_tokens         = (propose_step_ + 1) * batch_size;
model_input.combo_tokens = speculative_sampler_output.accept_tokens.reshape({(int64_t)total_tokens});

The old code built combo_tokens from only the accepted tokens (variable length per stream). The new code passes ALL (propose_step_+1)*batch_size tokens including the -1 padding slots (reset to 0 by index_put_({output_token_ids_d == -1}, 0) in batchSample). Token 0 is a valid vocabulary token. The model now processes a fixed-size batch regardless of actual accept lengths, producing incorrect hidden states for padding positions and corrupting last_hidden_states used in the next draft step.

🔴 P0 — vocab_prune is a class variable, not an instance variable

In rtp_llm/models/qwen_v2.py:

class QwenV2MTPWeight(QWenV2Weight):
    vocab_prune = False   # class-level — shared across ALL instances

_process_meta sets self.vocab_prune = True which creates an instance attribute shadowing the class attribute. This is fragile and non-idiomatic. The correct pattern is self.vocab_prune = False in __init__. More critically, if multiple QwenV2MTPWeight instances are created in the same process (e.g., TP shards), the class attribute is never reset between instances, which can cause unexpected behavior if instance creation order matters.

🟡 P1 — prepared_attention_inputs_ flag has no memory ordering guarantee

PyWrappedModel::prepareAttentionInputs() sets prepared_attention_inputs_ = true on the AsyncRunner worker thread. PyWrappedModel::forward() reads it on the main thread. The ordering is safe only because AsyncRunner::sync() provides a happens-before edge via cv_done_.wait(). But prepared_attention_inputs_ is a plain bool — no std::atomic, no fence. If the compiler reorders the store past the condition variable signal, the main thread could read a stale value. This should be std::atomic<bool> or the flag should be set inside the mutex before cv_done_.notify_one().

🟡 P1 — Static host_tensor_stream in tpSyncModelInputs is process-wide

// rtp_llm/cpp/models/ModelTypes.cc
static auto host_tensor_stream = cuda_graph::graphGetStreamFromPool(true);

The comment acknowledges multi-engine unsafety. The deeper risk: tpSyncModelInputs is called from TP worker threads (one per rank). All ranks share the same static CUDA stream. If two TP ranks call tpSyncModelInputs concurrently, they both push work onto the same stream, serializing them unintentionally, and host_tensor_stream.synchronize() on one thread blocks while the other's work is still in flight. This stream should be per-engine-instance, not static.

🟡 P1 — ROCm invokeRejectionSampling forward declaration is at global scope

In CudaSampleOp.cc ROCm path, after } // namespace rtp_llm closes the namespace:

template<typename DType, typename IdType>
hipError_t invokeRejectionSampling(...);   // global namespace, not rtp_llm::

The CUDA path declares the same function inside namespace rtp_llm. If the ROCm implementation is in rtp_llm::, the call site ::invokeRejectionSampling(...) will fail to link. This namespace inconsistency needs to be verified and made consistent.

🔵 P2 — #define private public in test

rtp_llm/cpp/normal_engine/test/AsyncRunnerTest.cc uses #define private public. This is undefined behavior (ODR violation). The test doesn't actually access any private members — all cases use only the public launch()/sync() interface. Remove the #define.

🔵 P2 — t2d_map loaded but unused at runtime

Weights::t2d_map is loaded in WeightsConverter.cc with the comment "Reserved for future use. Loaded but not yet used at runtime." For large models this wastes GPU memory. Either use it or gate the load behind a flag, similar to how vocab_prune gates d2t_map loading in Python.

🔵 P2 — Missing newline at end of new files

mapping.cu, mapping.h, sampling.h, sampling.cu all end with \ No newline at end of file. This violates POSIX and will trigger pre-commit/clang-format warnings.

🔵 P2 — draft_probs_padding_buffer_ grow-only strategy can waste memory

The reallocation logic in SpeculativeSampler::batchSample grows the buffer but never shrinks it. If batch size spikes temporarily, the buffer stays large for the lifetime of the sampler. Consider documenting the expected max batch size or adding a shrink policy.

🟢 Strengths

  • AsyncRunner design is clean: single-task queue, RAII destructor join, at::ThreadLocalState propagation for correct PyTorch TLS on the worker thread — this is the right approach.
  • PreparedFlagGuard RAII in both CudaGraphRunner::forward and PyWrappedModel::forward correctly handles exception paths.
  • The CRITICAL ORDERING comment explaining why fusedCopy must precede graph_runner_->prepareAttentionInputs is excellent documentation of a subtle ordering constraint.
  • The GIL acquisition in prepareAttentionInputs is well-documented with the exact crash mechanism.
  • Replacing vector<Tensor> with batched tensors in SpeculativeSamplerOutput is the right direction for GPU-resident rejection sampling.
  • Deleting rocm/speculative_sampling/sampling.cu (464 lines) and replacing with the unified CUDA kernel is a good cleanup.

Summary: Two P0 correctness issues need resolution before merge — padding tokens being passed as real tokens in updateDecodePostDraftModelInput, and the class-vs-instance variable for vocab_prune. The static shared stream in tpSyncModelInputs and the ROCm namespace mismatch are P1 safety issues. The async prepare architecture itself is sound and well-structured.

🤖 Generated by RTP-LLM Code Review Bot

@Vinkle-hzt Vinkle-hzt force-pushed the async2 branch 3 times, most recently from c44c17f to d0dcfbc Compare April 27, 2026 10:04
@LLLLKKKK
Copy link
Copy Markdown
Collaborator

🤖 Code Review — PR #936

async schedule [2/N]: support async prepare by @Vinkle-hzt
48 files, +3925/−766. Reviewed the full diff.


🔴 P0 — Sampler.cc: non_blocking=true contradicts the ROCm safety comment

rtp_llm/cpp/models/Sampler.cc line 50:

// Use blocking transfer: on ROCm, hipMemcpyAsync from pageable memory is truly async
// and can cause memory access faults if a kernel reads the buffer before transfer completes.
-    auto inputs_token_ids_cuda = inputs.token_ids.to(torch::kCUDA);
+    auto inputs_token_ids_cuda = inputs.token_ids.to(torch::kCUDA, true);

The comment was written specifically to justify blocking transfer on ROCm. Adding true (non_blocking) directly contradicts it and reintroduces the memory access fault risk on ROCm. If this is intentional (e.g. the caller now guarantees the tensor is already pinned), the comment must be updated and the guarantee documented. Otherwise revert to blocking.


🔴 P0 — AsyncRunner: exception in worker thread causes permanent deadlock

rtp_llm/cpp/normal_engine/AsyncRunner.cc workerLoop() has no try/catch:

task.fn();
stream_.synchronize();
event_.record(stream_);
// task_done_ set to true only after this block

If task.fn() throws (e.g. CUDA error, pybind11 exception), the exception is silently swallowed by std::thread, task_done_ is never set to true, and any subsequent call to sync() or launch() will block forever on cv_done_.wait(lk, [this] { return task_done_; }). The engine hangs with no error message.

Fix: wrap the task body in try/catch, store the exception, and rethrow in sync():

try {
    task.fn();
    stream_.synchronize();
    event_.record(stream_);
} catch (...) {
    pending_exception_ = std::current_exception();
}
// then in sync(): if (pending_exception_) std::rethrow_exception(pending_exception_);

🟡 P1 — Partial TP broadcast in decodeStep missing execSyncCommunication

rtp_llm/cpp/normal_engine/speculative/MtpExecutor.cc:

if (parallelism_config_.tp_size > 1) {
    execBroadcast({{model_input.combo_tokens}, 0});
    execBroadcast({{model_input.last_hidden_states}, 0});
    execBroadcast({{model_input.lm_output_indexes}, 0});
}

The original tpSyncModelInputs always called execSyncCommunication(false) after each broadcast group and cudaSyncAndCheck() to ensure all ranks are synchronized before proceeding. These three bare execBroadcast calls have no barrier. Non-root TP ranks may proceed to the draft model forward before the broadcast completes, reading stale combo_tokens / last_hidden_states. Add execSyncCommunication(false) + cudaSyncAndCheck() after the broadcast block.


🟡 P1 — input_lengths not updated after rejection sampling

updateDecodePostDraftModelInput previously set model_input.input_lengths[i] = accept_lens[i] per batch item. The new implementation removes this entirely. The draft model forward now receives input_lengths from the pre-rejection state (all set to propose_step_+1 from the target verify prepare copy). This is intentional per the test changes (expect {5, 5} instead of actual accept lengths), but it means the draft model always processes propose_step_+1 tokens per sequence regardless of how many were accepted. If the draft model uses input_lengths for causal masking or KV cache writes, this will produce incorrect hidden states for the padding positions. Please confirm the draft model is robust to this and add a comment explaining the invariant.


🟡 P1 — Static host_tensor_stream in tpSyncModelInputs is process-global

rtp_llm/cpp/models/ModelTypes.cc:

// NOTE: This static stream is shared across all engine instances in the process.
static auto host_tensor_stream = cuda_graph::graphGetStreamFromPool(true);

The comment acknowledges the risk but defers the fix. In the test suite, MtpExecutorTest creates multiple engine instances and calls tpSyncModelInputs from different test cases in the same process. Concurrent use of a single stream from multiple engine instances is a data race. This should be fixed before merging, not deferred — either pass the stream as a parameter or make it a member of the engine.


🟡 P1 — total_accept_len out-parameter is never set in new implementation

The old updateDecodePostDraftModelInput set total_accept_len via std::accumulate. The new implementation comments it out but the parameter is still passed by reference and initialized to 0 in decodeStep. While total_accept_len doesn't appear to be used after the call in the current diff, it remains a dangling out-parameter that will silently return 0 to any future caller. Either remove the parameter from the signature or set it correctly (e.g. total_accept_len = batch_size * (propose_step_ + 1)).


🔵 P2 — Large commented-out code blocks should be removed

rtp_llm/cpp/normal_engine/speculative/MtpBatchStreamProcessor.cc has ~20 lines of commented-out old implementation left in updateDecodePostDraftModelInput. Dead code in a hot path is noise. Remove it; git history preserves the old version.


🔵 P2 — t2d_map loaded but never used at runtime

rtp_llm/cpp/models/models_weight/Weights.h:

// Reserved for future target-to-draft mapping (e.g. logit projection). Loaded but not yet used at runtime.
torch::Tensor t2d_map;

Loading a potentially large int64 tensor into GPU memory for a feature that isn't implemented yet wastes VRAM. Either defer loading until the feature is ready, or at minimum add a RTP_LLM_LOG_WARNING when it's present in the checkpoint so operators know it's being ignored.


🔵 P2 — Missing newline at end of new files

rtp_llm/models_py/bindings/common/kernels/vocab_prune/mapping.cu, mapping.h, and MtpExecutor.h all end without a trailing newline (\ No newline at end of file). This violates POSIX and triggers compiler warnings on some toolchains.


🔵 P2 — AsyncRunner only supports one in-flight task; no true pipelining

launch() blocks until the previous task completes (cv_done_.wait). This means the async overlap only works if the main thread has enough work to do between launch() and sync(). Consider documenting the expected overlap window (e.g. "draft model decode takes ~Xms, prepare takes ~Yms") so reviewers can verify the design achieves its latency goal.


🟢 Strengths

  • The PreparedFlagGuard RAII pattern in both CudaGraphRunner::forward and PyWrappedModel::forward is clean and correctly handles the exception path.
  • The GIL acquisition comment in CudaGraphRunner::prepareAttentionInputs is thorough and explains exactly why it's needed.
  • The fusedCopy ordering fix (flush H2D copies before graph_runner_->prepareAttentionInputs) is well-documented with the root cause.
  • ExpertBalancer world_size guard (if (world_size > 1)) correctly avoids NCCL calls in single-GPU setups.
  • AsyncRunnerTest covers the key scenarios (basic, sequential, order, CUDA tensor work, destructor join, stress).
  • The vocab pruning kernel (mapping_draft2target_kernel) handles boundary conditions (negative token IDs, out-of-range IDs) correctly.
  • Replacing std::vector<torch::Tensor> accept_tokens with a single batched tensor is the right direction for GPU-side rejection sampling.

Summary: Two P0s need fixing before merge — the ROCm non_blocking regression in Sampler.cc and the AsyncRunner exception→deadlock. The P1 TP broadcast missing sync is a correctness risk under multi-GPU. The static stream and input_lengths issues need clarification. The overall async-prepare architecture is sound and the code quality is high.

🤖 Generated by RTP-LLM Code Review Bot

@LLLLKKKK
Copy link
Copy Markdown
Collaborator

🤖 Code Review — PR #936

async schedule [2/N]: support async prepare by @Vinkle-hzt
48 files, +3925/−766. Reviewed the full diff.


🔴 P0 — AsyncRunner: no exception handling → process crash

rtp_llm/cpp/normal_engine/AsyncRunner.cc workerLoop() has no try/catch. If prepareAttentionInputs throws (CUDA error, Python exception, OOM), the exception propagates out of the thread function, invoking std::terminate() and crashing the entire serving process. Additionally, task_done_ is never set to true on the exception path, so any concurrent sync() call would deadlock before the crash.

Fix: wrap the task body in try/catch, store the exception via std::exception_ptr, set task_done_ = true unconditionally, and rethrow in sync().

🔴 P0 — total_accept_len never set → speculative decoding metrics always 0

rtp_llm/cpp/normal_engine/speculative/MtpBatchStreamProcessor.cc removes:

-    total_accept_len  = std::accumulate(accept_lens.begin(), accept_lens.end(), 0);

But total_accept_len (initialized to 0 at MtpExecutor.cc:530) is still consumed at lines 707, 714, 715, 717:

executor_collector.execute_token_size += total_accept_len;   // always 0
tps_collector.generate_tps = total_accept_len;               // always 0
sp_engine_collector.total_accepted_token_num = total_accept_len; // always 0

All speculative decoding throughput metrics will be permanently zeroed. The new updateDecodePostDraftModelInput must compute and assign total_accept_len, e.g.:

total_accept_len = speculative_sampler_output.accept_len.sum().item<size_t>();

🟡 P1 — Sampler.cc non_blocking=true: unsafe for non-pinned callers on ROCm

The old comment explicitly warned:

// Use blocking transfer: on ROCm, hipMemcpyAsync from pageable memory is truly async
// and can cause memory access faults if a kernel reads the buffer before transfer completes.

The new code switches to inputs.token_ids.to(torch::kCUDA, true) with the assumption that all callers use pinned memory. But Sampler::forward() is called from multiple paths (normal engine, MTP executor, unit tests). Any caller that passes a non-pinned CPU tensor on ROCm will silently produce memory access faults. Either assert inputs.token_ids.is_pinned() or keep the blocking transfer.

🟡 P1 — TP broadcast after rejection sampling: missing execSyncCommunication

rtp_llm/cpp/normal_engine/speculative/MtpExecutor.cc new partial broadcast:

execBroadcast({{model_input.combo_tokens}, 0});
execBroadcast({{model_input.last_hidden_states}, 0});
execBroadcast({{model_input.lm_output_indexes}, 0});

The old tpSyncModelInputs called execSyncCommunication(false) after each broadcast to ensure completion before proceeding. The new code omits this. On multi-GPU TP setups, non-root ranks may read stale data before the broadcast completes.

🟡 P1 — prepared_attention_inputs_ is a plain bool shared between threads

rtp_llm/cpp/models/PyWrappedModel.h adds bool prepared_attention_inputs_ as a plain member. prepareAttentionInputs() writes it from AsyncRunner's worker thread, while forward() reads and writes it from the engine main thread. Without std::atomic<bool> or explicit synchronization, this is a data race (undefined behavior under the C++ memory model). The AsyncRunner::sync() call provides a happens-before edge for the CUDA stream, but not for the CPU-side flag.


🔵 P2 — cout in ExpertBalancer constructor

rtp_llm/cpp/models/eplb/ExpertBalancer.cc:166:

cout << "ExpertBalancer constructed with " << log_exp_num << " logical experts" << endl;

Should use RTP_LLM_LOG_INFO.

🔵 P2 — Large commented-out code blocks

rtp_llm/cpp/normal_engine/speculative/MtpBatchStreamProcessor.cc leaves ~30 lines of commented-out old implementation. Dead code should be removed, not commented out.

🔵 P2 — t2d_map loaded but never used at runtime

rtp_llm/cpp/models/models_weight/Weights.h:

// Reserved for future target-to-draft mapping. Loaded but not yet used at runtime.
torch::Tensor t2d_map;

Loading unused weights wastes GPU memory. Either use it or don't load it yet.

🔵 P2 — Static stream in tpSyncModelInputs is process-global

rtp_llm/cpp/models/ModelTypes.cc:

static auto host_tensor_stream = cuda_graph::graphGetStreamFromPool(true);

The comment acknowledges this breaks with multiple engine instances in the same process. This is a latent bug for test environments and future multi-engine deployments.


🟢 Strengths

  • The async prepare architecture is well-motivated and the ordering constraints (fusedCopy before graph_runner->prepareAttentionInputs) are clearly documented in comments.
  • GIL acquisition in CudaGraphRunner::prepareInputs is correctly identified and fixed.
  • RAII PreparedFlagGuard pattern in both CudaGraphRunner::forward and PyWrappedModel::forward is clean and exception-safe.
  • EPLB single-node fix (world_size guard for collective ops) is correct and well-scoped.
  • The new rejection sampling kernel with per-stream do_sample control is a meaningful improvement over the old chain sampling approach.
  • AsyncRunner test coverage is thorough (7 test cases including stress and destructor join).
  • Vocab pruning (d2t_map) integration is clean end-to-end from weight loading through kernel invocation.

Summary: 2 P0 blockers (AsyncRunner crash on exception, total_accept_len metrics zeroed), 3 P1 issues (ROCm non_blocking safety, TP broadcast missing sync, prepared_attention_inputs_ data race). The async prepare concept is sound but needs these fixes before merge.

🤖 Generated by RTP-LLM Code Review Bot

Copy link
Copy Markdown
Collaborator

@LLLLKKKK LLLLKKKK left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review — PR #936

Summary: Adds async attention-input preparation to the MTP speculative decoding pipeline, overlapping CPU-side KV cache setup with GPU compute. Also introduces vocab pruning (draft-to-target token mapping), a new rejection sampling kernel replacing the old chain speculative sampling, and several supporting changes (EPLB world_size guard, TP broadcast optimization, Sampler non-blocking H2D).

P0 — Must fix

  1. Race on d2d_copies_ between prepareAttentionInputs and forward in PyWrappedModel (rtp_llm/cpp/models/PyWrappedModel.cc). prepareAttentionInputs() calls d2d_copies_.clear() then fusedCopy(d2d_copies_). But d2d_copies_ is a member field also used by tensorHoldHostAndToCuda() called from forward(). When prepareAttentionInputs runs on the AsyncRunner worker thread while forward() runs on the main thread, both can touch d2d_copies_ concurrently. The comment "No fusedCopy here" in forward() is the only guard — this is fragile. Recommend either (a) giving prepareAttentionInputs its own local FusedD2DCopyParams, or (b) adding an explicit assertion that d2d_copies_ is exclusively owned by prepareAttentionInputs and must not be touched in forward.

  2. attention_inputs_ and graph_state_ written by async thread, read by main thread (PyWrappedModel.cc:317-333, PyWrappedModel.h:102-104). The acquire/release pair on the atomic bool is technically correct for the current code path where sync() is called before forward() in MtpExecutor::decodeStep. However, this invariant is not enforced at the PyWrappedModel level — if anyone calls forward() without prior sync(), the behavior is undefined. Consider adding an assertion or documenting this contract explicitly.

P1 — Should fix

  1. force_mask[idx] = true is a CPU scalar write to a GPU tensor (SpeculativeSampler.cc:1557). The force_mask tensor is created on target_device (CUDA), but the loop does scalar CPU writes to a CUDA tensor, triggering implicit sync per element. Build on CPU first, then move to CUDA (similar to how do_sample is handled nearby).

  2. Partial TP broadcast in decodeStep may miss fields (MtpExecutor.cc:696-700). The new code broadcasts only combo_tokens, last_hidden_states, and lm_output_indexes after rejection sampling. input_lengths is no longer explicitly updated per-batch in updateDecodePostDraftModelInput. Verify that the draft prefill model does not depend on per-sequence input_lengths being updated after rejection.

  3. model_input_copy shallow-copies tensor fields (MtpExecutor.cc:597-614). The async thread and main thread share underlying tensor storage for fields like kv_cache_block_id. This appears safe because prepareAttentionInputs only reads these fields and draftModelDecode creates new tensors rather than modifying in-place — but this is a subtle invariant worth documenting.

  4. collect_metrics_stream_ synchronization (MtpExecutor.cc:775-781). .item<int>() inside a StreamGuard triggers device sync on the metrics stream. If on the hot path, this adds latency. Consider making this truly async or moving off the critical path.

  5. Missing newline at end of file in mapping.cu, mapping.h, sampling.cu, sampling.h. -Wnewline-eof will warn.

  6. pinned_check_remaining_ decrement moved to prepareAttentionInputs (PyWrappedModel.cc:300-302). Placement is confusing since the pinned check was originally a forward-count guard. Consider keeping the decrement in forward() for clarity.

P2 — Nits

  1. vec_dtypes.cuh is 1522 lines of vendored FlashInfer code. Consider sharing with the existing 3rdparty/flashinfer dependency instead of duplicating.

  2. W.h has 34 lines of whitespace-only reformatting — adds noise to diff and blame. Consider separating formatting from functional changes.

  3. SpeculativeSamplingParams changed from reference members to value members (OpData.h:361-389). Good change, but the removed constructor relies on aggregate initialization — fine for C++17 but worth noting.

  4. output_token_ids_d created with .requires_grad(false) (SpeculativeSampler.cc:1474-1476). Integer tensors never have gradients — redundant for kInt32.

  5. ExpertBalancer world_size_ default of 1 means old call sites would silently get single-rank behavior. Verify all call sites are updated.

  6. createMinFakeDecodeStream adds a vocab_size parameter that shadows the member vocab_size_. Consider naming it draft_vocab_size for consistency.

Architecture assessment

The async prepare pattern is well-structured: AsyncRunner is a clean single-task worker with proper exception propagation, RAII shutdown, and CUDA stream isolation. The split of prepareInputs into prepareInputData + prepareAttentionInputs is a sensible decomposition. The main risk area is the implicit contract between PyWrappedModel::prepareAttentionInputs (async) and forward (main thread) sharing mutable member state — this works today but is fragile under future modifications. Recommend hardening the ownership boundaries before merge.

@Vinkle-hzt Vinkle-hzt force-pushed the async2 branch 2 times, most recently from 1b2d3d2 to 9b30851 Compare April 28, 2026 09:03
@LLLLKKKK
Copy link
Copy Markdown
Collaborator

AI Code Review - PR #936

Status: BLOCKING

Summary: P0/0 · P1/1 · P2/2 · P3/1

Blocking Issues

P1

  • tp_sync_post_rejection 中 lm_output_indexes 在 rank0 与非 rank0 设备不一致 @ rtp_llm/cpp/normal_engine/speculative/MtpExecutor.cc:723
    • 建议:非 rank0 也应在 CUDA 上分配 lm_output_indexes(torch::empty({batch_size}, kInt32, kCUDA)),与 rank0 保持设备一致;或者在 broadcast 之前把 rank0 的 lm_output_indexes .to(kCPU) 后再广播。建议直接对齐到 CUDA,避免额外 D2H/H2D。

Non-blocking Suggestions

P2

  • force_mask 逐元素写入 CUDA 张量造成 batch_size 次 kernel 启动与隐式同步 @ rtp_llm/cpp/normal_engine/speculative/SpeculativeSampler.cc:147
    • 建议:改为 CPU 上构建:auto force_mask_h = torch::zeros({batch_size}, kBool); 循环里写 force_mask_h.data_ptr()[idx] = ...;最后 auto force_mask = force_mask_h.to(target_device, /non_blocking=/true)。同时 has_force 仍可在 CPU 循环里维持。
  • Sampler::forward 改为 non_blocking H2D 依赖所有调用方传 pinned tensor,缺少防御 @ rtp_llm/cpp/models/Sampler.cc:50
    • 建议:在 Sampler.cc 入口加 RTP_LLM_CHECK(inputs.token_ids.is_pinned() || inputs.token_ids.is_cuda()) 把约束显式化,或在异常路径上 fallback 到 blocking H2D。注释里也建议明确指出 ROCm 上的 pageable+async 风险。

P3

  • 多个新增/修改的 .cu/.h 文件缺少文件末尾换行 @ rtp_llm/models_py/bindings/common/kernels/vocab_prune/mapping.cu:48
    • 建议:在所有新增/修改文件末尾补一个换行符,pre-commit hook 即可一并修复。
Review Checklist: 5 pass / 4 fail

General Principles Checklist

Failed

  • [FAIL] 6.1 Architecture: 状态不变量与跨层调用:异步路径的状态、设备与生命周期保持自洽 Linked issue: tp_sync_post_rejection 中 lm_output_indexes 在 rank0 与非 rank0 设备不一致
  • [FAIL] 6.1 Quality: 热路径无明显冗余分配与隐式同步 Linked issue: force_mask 逐元素写入 CUDA 张量造成 batch_size 次 kernel 启动与隐式同步

Passed

  • [PASS] 6.1 Software Engineering: SRP/抽象边界:跨模块新行为有清晰责任划分,未在中心逻辑里散落新分支
  • [PASS] 6.1 Tests: 新增公共逻辑/算子有专门单测覆盖关键场景

RTP-LLM Checklist

Failed

  • [FAIL] F Cross-platform: ROCm/CUDA 同步语义切换需要在跨平台路径上等价或留出兜底 Linked issue: Sampler::forward 改为 non_blocking H2D 依赖所有调用方传 pinned tensor,缺少防御
  • [FAIL] I Code quality: 新增文件遵循项目 clang-format 与文件末尾换行约定 Linked issue: 多个新增/修改的 .cu/.h 文件缺少文件末尾换行

Passed

  • [PASS] C Concurrency: 跨线程共享状态使用 atomic 与 release/acquire 排序,并对异常路径有兜底
  • [PASS] B Correctness/Logic: speculative decoding 与 vocab pruning 的尺寸/口径在 sampler/executor/fake stream 一致

Python Static-First Checklist

Passed

  • [PASS] P.A Static correctness: 新增/修改的 Python 路径名称、字段、序列化与 C++ 端一致

Strengths

  • AsyncRunner 设计干净:单生产者-单消费者模型,使用 at::ThreadLocalState/at::ThreadLocalStateGuard 正确传递 PyTorch 线程局部状态,cv_done_/cv_task_ 双条件变量配合 task_done_ 与 pending_exception_ 提供清晰的异常透传;析构 join 线程;并配套了 LaunchAndSync/Sequential/ExecutionOrder/CudaTensorWork/Destructor/Rethrows*/Stress 等覆盖度较高的 GoogleTest。
  • PyWrappedModel::forward 与 CudaGraphRunner::forward 都用 RAII Guard 强制 prepared_attention_inputs_ 在 scope 退出时复位,避免异常路径下旗标残留导致下次 forward 误用陈旧 attention_inputs_/graph_state_。
  • KVCacheMemoryConnector::asyncWrite 把 resource_copy.reset() 移出 if (success) 块,修复了失败路径下不释放 block ref 的资源泄漏隐患;并通过 task_copy_plan 在 thread pool 任务内显式 reset,提前释放 block refs。
  • ExpertBalancer 在 world_size <= 1 时短路 execAllReduce/execBroadcast,既避免单卡浪费 NCCL 调用,也规避 ‘x = ReduceSum(x) 在 tp8 下溢出归零’ 注释里描述的反复求和风险。
  • NormalEngine::mayAddFakeStream 现在使用 propose 模型自身的 vocab_size(mtp_vocab_size)创建 MTP fake decode stream 的 SP buffer,与 MtpExecutor 中 draft_vocab_size_ 的口径一致,修复了 vocab pruning 模型上 fake stream 的尺寸不匹配。
  • CudaGraphRunner::prepareInputs 中调用 attn_pyobj.attr("prepare_cuda_graph")(...) 之前显式 py::gil_scoped_acquire,防止从 AsyncRunner worker 线程进入 Python 时 PyTuple_New 段错误,并在注释里把根因写清楚。

@LLLLKKKK
Copy link
Copy Markdown
Collaborator

AI Code Review - PR #936

Status: BLOCKING

Summary: P0/0 · P1/2 · P2/3 · P3/2

Blocking Issues

P1

  • fake decode stream accept_tokens 形状与 reshape 不一致 @ rtp_llm/cpp/normal_engine/speculative/MtpExecutor.cc:642
    • 建议:fake_stream 分支应直接构造 accept_tokens=zeros({1, propose_step_+1})、accept_len=full({1}, propose_step_+1)(或者按 total_tokens 形状构造),保证后续 reshape 与 lm_output_indexes 计算一致。
  • do_sample 走非 pinned 的 non_blocking H2D,在 ROCm 上不安全 @ rtp_llm/cpp/normal_engine/speculative/SpeculativeSampler.cc:62
    • 建议:构造 do_sample 时显式 .pin_memory()(或在写完后 .pin_memory()),与 NormalSamplerInputGatherer 对其它 sampler tensor 的处理保持一致;或在 .to(...) 后做一次显式 stream sync。

Non-blocking Suggestions

P2

  • 新增 CudaSpeculativeSamplingTest.cc 缺少 BUILD 注册,CI 不会编译/执行 @ rtp_llm/models_py/bindings/cuda/test/CudaSpeculativeSamplingTest.cc:1
    • 建议:在 rtp_llm/models_py/bindings/cuda/test/BUILD 增加 cc_test target(参考 rtp_llm/cpp/normal_engine/test/BUILD 中本 PR 新增的 async_runner_test),依赖 :kernels_speculative_sampling 与 :vocab_prune_kernels,并在 exec_properties 指定 GPU。
  • SpeculativeSampler::batchSample 在 vocab 不一致但 d2t_map 未加载时会崩溃 @ rtp_llm/cpp/normal_engine/speculative/SpeculativeSampler.cc:108
    • 建议:在 vocab 维度不一致分支前加 RTP_LLM_CHECK_WITH_INFO(d2t_map_.defined() && d2t_map_.numel()>0, "d2t_map missing while draft/target vocab differ");同时在 MtpExecutor 构造打印的 d2t_map 大小已是 0 时给出更显眼的告警/启动期校验。
  • tpSyncModelInputs 用 thread_local 静态流可能在长生命周期线程池中泄漏 CUDA stream @ rtp_llm/cpp/models/ModelTypes.cc:14
    • 建议:记录调用线程集合是有限且固定的(doc 里点名 executor/AsyncRunner),或改为按 ParallelismConfig 共享一组 named stream(和 collect_metrics_stream_ 一样从外部注入),避免依赖隐式 thread_local 的生命周期。

P3

  • 多个新增 .cu/.h/.cc 文件结尾缺少换行 @ rtp_llm/models_py/bindings/common/kernels/vocab_prune/mapping.cu:48
    • 建议:在新增文件末尾补换行,保持仓库统一风格(pre-commit 通常会处理;可能是文件未走 hook)。
  • vec_dtypes.cuh 含拼写错误的 #pragma 指令 @ rtp_llm/models_py/bindings/cuda/kernels/speculative_sampling/vec_dtypes.cuh:1
    • 建议:把所有 '#pragma unoll' 修正为 '#pragma unroll'(这是 FlashInfer 上游的历史拼写,可顺手提个 upstream patch)。
Review Checklist: 8 pass / 5 fail

General Principles Checklist

Failed

  • [FAIL] 6.1 Tests: 新逻辑有针对性测试覆盖(C++ UT/集成) Linked issue: 新增 CudaSpeculativeSamplingTest.cc 缺少 BUILD 注册,CI 不会编译/执行

Passed

  • [PASS] 6.1 Software Engineering: SRP/职责拆分:CudaGraphRunner 的 prepareInputs 拆成 prepareInputData / prepareAttentionInputs,PyWrappedModel 把 prepare 与 forward 拆开
  • [PASS] 6.1 Architecture: 新概念放在合适层次,避免泄漏内部细节
  • [PASS] 6.1 Quality: 提交不混入大量无关格式化

RTP-LLM Checklist

Failed

  • [FAIL] B Correctness: speculative sampler / kernel 输出形状与下游消费者一致 Linked issue: fake decode stream accept_tokens 形状与 reshape 不一致
  • [FAIL] F Cross-Platform: ROCm 路径下 H2D 异步拷贝必须使用 pinned memory Linked issue: do_sample 走非 pinned 的 non_blocking H2D,在 ROCm 上不安全
  • [FAIL] H Tests: 新增 GPU 测试有 BUILD 注册并指定 exec_properties Linked issue: 新增 CudaSpeculativeSamplingTest.cc 缺少 BUILD 注册,CI 不会编译/执行

Passed

  • [PASS] C Concurrency: GIL 在跨线程 pybind 调用前必须 acquire
  • [PASS] D Performance: 热路径避免每次 forward 重新分配 GPU buffer
  • [PASS] E Distributed: TP/EP 通信分支与单卡/多卡路径都正确

Python Static-First Checklist

Failed

  • [FAIL] P.B Python 控制流与异常: 条件性注册的权重 + C++ 消费者具备完备的缺失校验 Linked issue: SpeculativeSampler::batchSample 在 vocab 不一致但 d2t_map 未加载时会崩溃

Passed

  • [PASS] P.A Python 静态正确性: 新增/修改的 Python 类型注解、属性初始化、继承覆盖与父类约定一致
  • [PASS] P.E Python 与 C++ 接口对齐: Python 端常量/权重 key 与 C++ 消费者保持一致

Strengths

  • AsyncRunner 抽象简洁,配套 AsyncRunnerTest.cc 覆盖了 launch/sync/异常重抛/析构 join/无 launch sync 等典型场景,锁与 cv 用法正确。
  • PyWrappedModel::forward 与 CudaGraphRunner::forward 都用 RAII PreparedFlagGuard 复位 prepared_attention_inputs_,对异常路径友好。
  • 代码注释清楚指出关键时序约束(fusedCopy 必须在 graph_runner_->prepareAttentionInputs 之前、prepare_cuda_graph 必须 GIL acquire)和动机(async-prepare 拆分破坏了原有顺序),减少未来误改风险。
  • ExpertBalancer 在 world_size==1 时跳过 allreduce/broadcast 的 fast path 是单卡场景的明显改进,且向上调用链一致传入 world_size。

@LLLLKKKK
Copy link
Copy Markdown
Collaborator

LLLLKKKK commented May 1, 2026

AI Code Review - PR #936

Status: BLOCKING

Summary: P0/0 · P1/1 · P2/0 · P3/0

Blocking Issues

P1

  • CudaSpeculativeSamplingTest.cc 未注册到任何 BUILD,测试不会被编译/运行 @ rtp_llm/models_py/bindings/cuda/test/CudaSpeculativeSamplingTest.cc:1
    • 建议:新增 rtp_llm/models_py/bindings/cuda/test/BUILD,参考 rtp_llm/models_py/bindings/cuda/ops/tests/BUILD 添加 cc_test_wrapper 规则(srcs 指向 CudaSpeculativeSamplingTest.cc,deps 包含 sampling.h/mapping.h 所在 cc_library 与 device_test_utils,exec_properties 设置合适 GPU 标签如 H20),并确认 bazel test 能解析到该 target。

Checklist Violations (2 fail / 60 total)

General Principles Checklist

  • [6.1] Tests — 新逻辑有聚焦单测 + 相关集成/smoke 测试 → issue CudaSpeculativeSamplingTest.cc 未注册到任何 BUILD,测试不会被编译/运行
    前一轮 PR 引入的 rejection_sampling/d2t mapping kernel 的 GPU 单测文件 CudaSpeculativeSamplingTest.cc 至今未挂入任何 BUILD,本次增量提交未补齐;fake stream shape 与 do_sample pinned 两项修复也无新增单测复现/守护原 bug

RTP-LLM Checklist

  • [H] 测试与 CI — 测试覆盖充分:大重构等价覆盖,新功能端到端测试 → issue CudaSpeculativeSamplingTest.cc 未注册到任何 BUILD,测试不会被编译/运行
    rejection_sampling kernel + d2t mapping 的 GPU 单测文件未在任何 BUILD 中注册,等同于不存在端到端覆盖;本次增量提交亦未补齐

Strengths

  • fake stream 路径将 accept_len/accept_tokens 的形状从 {1}/{1,1} 调整为 {1}/{1, propose_step_+1},与 updateDecodePostDraftModelInput 中 total_tokens=(propose_step_+1)*batch_size 的 reshape 契约一致,修复了原本 propose_step_>0 时 reshape 必崩的隐患
  • SpeculativeSampler.batchSample 的 do_sample CPU tensor 加上 pinned_memory(true),使得后续 .to(target_device, true) 真正异步,消除 ROCm 上的隐式 H2D 同步
  • AsyncRunner 已实现基于 std::exception_ptr 的异常传播(workerLoop catch + sync/launch 时 rethrowPendingExceptionIfAny),前一轮 review 提到的 worker 线程异常 std::terminate 风险已闭环

@LLLLKKKK
Copy link
Copy Markdown
Collaborator

LLLLKKKK commented May 1, 2026

AI Code Review - PR #936

Status: LGTM

Summary: P0/0 · P1/0 · P2/1 · P3/2

lgtm ready to ci

Non-blocking Suggestions

P2

  • 新 BUILD 未设置 TEST_USING_DEVICE,与同 PR 内 async_runner_test 不一致 @ rtp_llm/models_py/bindings/cuda/test/BUILD:13
    • 建议:对齐同 PR 内 async_runner_test,加 env = {'TEST_USING_DEVICE': 'CUDA'},避免后续若 device_test_utils 引入 env-driven 初始化时静默回退到 CPU。

P3

  • forceSpAccept 路径在 GPU 上逐元素 index_put 构建 mask 触发多次 kernel launch @ rtp_llm/cpp/normal_engine/speculative/SpeculativeSampler.cc:159
    • 建议:改为先在 pinned CPU bool tensor 上填充,循环结束后一次 .to(target_device, /non_blocking=/true),与上方 do_sample 的写法保持一致。
  • 多个新文件末尾缺少换行符 @ rtp_llm/models_py/bindings/common/kernels/vocab_prune/mapping.cu:48
    • 建议:依赖 pre-commit/clang-format 或手动补换行;POSIX 文本文件惯例避免 git diff 噪声。

Checklist Violations (4 fail / 111 total)

General Principles Checklist

  • [6.1] Tests — 分布式/跨平台变更有对应覆盖 → checklist-only
    ROCm 端 rejection_sampling_kernel 与 mappingDraft2Target 实现新增,但未见 ROCm 平台单测;tpSyncModelInputs 改 host_tensor_stream + 拆分广播无多卡集成测试。属本 PR 既有缺口,非本次增量引入。
  • [6.1] Quality — 逻辑变更未混入无关格式化 → checklist-only
    rtp_llm/cpp/models/models_weight/W.h 大量字段对齐空白调整与功能字段(multi_tokens_predict_d2t/t2d_map)混在一起;属上一轮已观察到的格式噪声,本次增量未引入新增。

RTP-LLM Checklist

  • [D] 性能 — CPU 标量写入 CUDA tensor 触发隐式同步 → issue forceSpAccept 路径在 GPU 上逐元素 index_put 构建 mask 触发多次 kernel launch
    SpeculativeSampler.cc 中 force_mask 直接在 CUDA 上构造并按下标 force_mask[idx] = true 多次写入,每次触发 index_put kernel;属性能子优化项。
  • [H] 测试与 CI — Bazel genrule glob 范围不过宽 → checklist-only
    kernels_speculative_sampling 用 glob(["speculative_sampling/.cu", "speculative_sampling/.h", "speculative_sampling/*.cuh"]),当前目录内文件均为该 kernel 自身资源,可接受;但若后续在该目录新增非 sampling 内容会被自动卷入。

Strengths

  • 本次增量正面回应上一轮 P1:cuda_speculative_sampling_test 已挂载到 //rtp_llm/models_py/bindings/cuda/test:cuda_speculative_sampling_test,CI 可在 H20 实际执行新 rejection sampling/d2t mapping kernel 单测。
  • AsyncRunner 异常通过 pending_exception_ 在 sync 与下次 launch 处显式 rethrow,并配套 RethrowsWorkerExceptionFrom{Sync,NextLaunch} 用例覆盖。
  • PyWrappedModel/CudaGraphRunner 双侧均用 RAII guard 管理 prepared_attention_inputs_,杜绝异常路径标志泄漏导致下次复用陈旧 attention_inputs_。
  • rejection_sampling_kernel 在 pos==num_speculative_tokens 时通过 if (pos != num_speculative_tokens) 守卫避免 draft_probs OOB load,bonus 槽位 fallback 为纯 target_probs 采样。
  • ExpertBalancer 在 world_size==1 时跳过 allreduce/broadcast,单卡场景去除冗余 collective。

Vinkle-hzt added 2 commits May 6, 2026 11:06
Add async prepare plumbing for speculative decoding and MTP execution, including AsyncRunner coverage and CUDA graph integration.

Fold in review fixes for tensor allocation, memory copy plan lifetime, GPU broadcast sync removal, and speculative sampling CUDA test registration.

Squashed from 8 original commits:

- 5746930 feat: support async prepare
- e5db705 chore: fix review issues
- 391d083 fix: release memory copy plan before marking done
- af05149 chore: rm broadcast sync for gpu
- f46f46d refactor: optimize tensor handling in MtpExecutor and AsyncRunner
- 0ccdfcc fix: update tensor allocation in MtpExecutor for CUDA compatibility
- eb7b3e8 fix: address speculative decode review issues
- a473e18 test: register speculative sampling cuda test
@LLLLKKKK
Copy link
Copy Markdown
Collaborator

LLLLKKKK commented May 6, 2026

AI Code Review - PR #936

Status: LGTM

Summary: P0/0 · P1/0 · P2/5 · P3/0

lgtm ready to ci

Non-blocking Suggestions

P2

  • MtpExecutor 浅拷贝 model_input 注释不完整 @ rtp_llm/cpp/normal_engine/speculative/MtpExecutor.cc:597
    • 建议:补全注释列出所有被 worker 读取的字段及它们在 launch→sync 之间不被 in-place 修改的依据;或对仍共享的字段做 .clone() 以解耦 worker 与主线程。
  • SpeculativeSampler::batchSample 每次 forward 分配新 GPU tensor @ rtp_llm/cpp/normal_engine/speculative/SpeculativeSampler.cc:1
    • 建议:将 batchSize 上界相关的中间 tensor 抽成 grow-only 复用 buffer,与 draft_probs_padding_buffer_ 风格一致。
  • W.h 大量纯格式化对齐与功能改动混入同一 PR @ rtp_llm/cpp/models/models_weight/W.h:1
    • 建议:将纯对齐改动拆为独立 commit 或独立 PR,与新增 vocab prune 字段分离。
  • PR 同时包含 async prepare 与 vocab prune 两个独立 feature @ rtp_llm/cpp/normal_engine/AsyncRunner.h:1
    • 建议:建议拆为两个 PR:async prepare 一个、vocab prune 一个,便于独立 bisect 与 rollback。
  • input_lengths 始终保持 propose_step+1 依赖 token-0 安全填充 @ rtp_llm/cpp/normal_engine/speculative/MtpBatchStreamProcessor.cc:255
    • 建议:用模型实际的 pad/safe token 替代硬编码 0,或增加断言确保 token 0 在词表中安全;并在注释中显式说明 lm_output_indexes 为何屏蔽了 padding 位置的影响。

Checklist Violations (6 fail / 88 total)

General Principles Checklist

  • [6.1] Quality — 逻辑变更未混入无关格式化 → issue W.h 大量纯格式化对齐与功能改动混入同一 PR
    W.h 大量纯对齐改动混入功能 PR
  • [6.1] Quality — Mega-PR 已拆分为独立变更 → issue PR 同时包含 async prepare 与 vocab prune 两个独立 feature
    async prepare 与 vocab prune 两功能未拆分
  • [6.1] Quality — Commit 原子、message 与行为匹配 → issue PR 同时包含 async prepare 与 vocab prune 两个独立 feature
    commit 'feat: support vocab prune' 含 AsyncRunner 改动

RTP-LLM Checklist

  • [B] 正确性与逻辑 — 共享对象原地修改需浅拷贝 → issue MtpExecutor 浅拷贝 model_input 注释不完整
    model_input shallow copy 注释不完整
  • [B] 正确性与逻辑 — Bypass/shortcut 路径包含新增变换步骤 → issue input_lengths 始终保持 propose_step+1 依赖 token-0 安全填充
    input_lengths 保持 propose_step+1,依赖 token 0 填充
  • [D] 性能 — hot path per-forward 内存分配 → issue SpeculativeSampler::batchSample 每次 forward 分配新 GPU tensor
    batchSample 中 uniform_samples_d 等每次新建

Strengths

  • AsyncRunner workerLoop 用 std::exception_ptr 捕获并在 launch/sync 中 rethrow,避免 worker 异常导致 std::terminate
  • CudaGraphRunner::forward 使用 RAII PreparedFlagGuard 保证 prepared_attention_inputs_ 在异常路径下重置
  • forceSpAccept 改用 post-rejection-sampling override(force_mask + torch::where)等价处理,语义保留
  • AsyncRunner 提供 6 个单测覆盖 launch/sync/顺序/CUDA tensor/析构 join/sync without launch
  • ROCm 侧对称实现 rejectionSampling 与 mappingDraft2Target,跨平台清理一致
  • rejection_sampling_kernel 测试覆盖 all-accept / immediate-reject / partial-accept / batch=0 等边界 case
  • tpSyncModelInputs 后只 broadcast rejection 后真正变化的 3 个 tensor,并给出明确注释
  • prepareAttentionInputs 在被 AsyncRunner worker 调用前正确加 py::gil_scoped_acquire

@wht21
Copy link
Copy Markdown
Collaborator

wht21 commented May 6, 2026

internal source has been updated, please review the changes!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants