Skip to content

feat(p2p): 实现PD分离模式下的P2P KV Cache传输#948

Open
ZhihanYan wants to merge 1 commit intomainfrom
feature/p2p-connector-0428
Open

feat(p2p): 实现PD分离模式下的P2P KV Cache传输#948
ZhihanYan wants to merge 1 commit intomainfrom
feature/p2p-connector-0428

Conversation

@ZhihanYan
Copy link
Copy Markdown
Collaborator

完成P2PConnector实现,支持Prefill与Decode节点间的KV Cache高效传输:

- 新增TransferService proto定义,支持RDMA/TCP传输
- 实现DecodeRpcServerNew2:协调异步prefill调用并执行decode
- 实现PrefillRpcServerNew2:处理prefill请求和P2P缓存加载
- 新增PrefillServerCaller组件,支持异步prefill RPC调用与连接池
- 启用P2PConnector初始化,暴露loadCacheSync接口

@ZhihanYan ZhihanYan requested a review from LLLLKKKK as a code owner April 28, 2026 12:53
@LLLLKKKK
Copy link
Copy Markdown
Collaborator

AI Code Review - PR #948

Status: BLOCKING

Summary: P0/1 · P1/1 · P2/3 · P3/1

Blocking Issues

P0

  • RemoteRpcServiceImpl::stop() 空指针崩溃 @ rtp_llm/cpp/model_rpc/RemoteRpcServiceImpl.h:100
    • 建议:将 stop() 改为对每个 server 指针做 null 检查后再调用 stop(),例如:if (prefill_server_) prefill_server_->stop(); if (decode_server_) decode_server_->stop(); if (decode_server_new2_) decode_server_new2_->stop(); if (prefill_server_new2_) prefill_server_new2_->stop();

P1

  • side_channel_data_map_ 无清理机制导致内存泄漏 @ rtp_llm/cpp/cache/connector/p2p/P2PConnectorResourceStore.cc:199
    • 建议:在 checkTimeout 中同时清理 side_channel_data_map_ 中超过一定时间未被消费的条目,或在 waitAndFillResponse 超时/取消路径中主动调用 consumeSideChannelData 清理。

Non-blocking Suggestions

P2

  • writeCacheToConnector 逻辑在两个 ExecOps.cc 中完全重复 @ rtp_llm/models_py/bindings/core/ExecOps.cc:138
    • 建议:将 writeCacheToConnector 提取到一个共享的 cc_library(如 connector 目录下),两个 ExecOps.cc 都调用同一份实现,避免后续修改时遗漏同步。
  • PrefillServerCallerContext 析构函数中 TryCancel + Shutdown 顺序可能导致 gRPC 警告 @ rtp_llm/cpp/model_rpc/PrefillServerCallerContext.cc:14
    • 建议:在 Shutdown 后 drain completion queue(循环 AsyncNext 直到返回 SHUTDOWN),确保所有 pending tag 被消费。或在析构前确保 checkDone() 已将状态推进到 finished_=true。
  • waitAndFillResponse 中 10ms 轮询增加 CPU 开销 @ rtp_llm/cpp/cache/connector/p2p/P2PConnector.cc:205
    • 建议:改为同时 wait 在 store 级别的 side_channel_cv_ 上(notify_all 已在 notifySideChannelReady 中调用),或使用更长的轮询间隔(如 100ms),减少无效唤醒。

P3

  • 多个新文件缺少末尾换行符 @ rtp_llm/cpp/model_rpc/DecodeRpcServerNew2.h:31
    • 建议:在文件末尾添加换行符,符合 POSIX 文本文件规范。
Review Checklist: 4 pass / 4 fail

General Principles Checklist

Failed

  • [FAIL] 6.1 Architecture: 状态不变量:create/update/failure/retry/rollback 路径保持有效 Linked issue: side_channel_data_map_ 无清理机制导致内存泄漏

Passed

  • [PASS] 6.1 Software Engineering: SRP:每个模块/类有单一变更理由
  • [PASS] 6.1 Tests: 新逻辑有对应的单元测试和集成/smoke 测试
  • [PASS] 6.1 Quality: 逻辑变更不与无关格式化混合

RTP-LLM Checklist

Failed

  • [FAIL] B 正确性与逻辑: 并发路径下资源生命周期正确(锁、CV、引用计数) Linked issue: RemoteRpcServiceImpl::stop() 空指针崩溃
  • [FAIL] D 性能: 热路径无不必要的轮询或分配 Linked issue: waitAndFillResponse 中 10ms 轮询增加 CPU 开销
  • [FAIL] I 代码质量: 无大段重复代码 Linked issue: writeCacheToConnector 逻辑在两个 ExecOps.cc 中完全重复

Passed

  • [PASS] A 兼容性与配置: 新增配置/参数有默认值,不破坏现有部署

Strengths

  • 引入 AsyncEvent 抽象接口解耦了 connector 层对 c10::Event/torch::Event 的直接依赖,使 P2P 模块可独立于 PyTorch 运行时测试
  • side-channel 独立 map 设计解决了 resource entry 被 steal 后 notify 丢失的竞态问题,思路清晰
  • PrefillServerCaller 的 tp_size 缓存设计合理,避免每次请求都发起 GetPeerInfo RPC
  • 测试覆盖较好:StoreWaitContextTest 新增了 EventReady/EventNotReady/NullEvent/Timeout 四种场景

@ZhihanYan ZhihanYan force-pushed the feature/p2p-connector-0428 branch from 6de2f72 to 9044036 Compare April 29, 2026 03:55
@LLLLKKKK
Copy link
Copy Markdown
Collaborator

AI Code Review - PR #948

Status: BLOCKING

Summary: P0/0 · P1/2 · P2/4 · P3/1

Blocking Issues

P1

  • PrefillServerCaller::getPrefillTpSize 探测 deadline 与注释相反,使用 std::max 导致阻塞整段生成预算 @ rtp_llm/cpp/model_rpc/PrefillServerCaller.cc:1597
    • 建议:将 std::max 改为 std::min(并设一个安全下限,例如 max(min(request_timeout_ms, kPeerInfoProbeMaxMs), 50)),保证探测在不可达 prefill 时快速失败而不是占用全部生成超时。
  • P2PConnectorResourceStore::side_channel_data_map_ 缺少过期清理,长时间运行存在内存泄漏 @ rtp_llm/cpp/cache/connector/p2p/P2PConnectorResourceStore.cc:243
    • 建议:为 side_channel_data_map_ 引入 TTL/与 resource_map_ 同步过期清理,或在 checkTimeout/stealResourceEntryLocked 中按 unique_key 同步删除遗留条目;同时上报 metrics 便于线上发现堆积。

Non-blocking Suggestions

P2

  • ~PrefillServerCallerContext Shutdown CompletionQueue 未排空残留 tag,违反 gRPC 契约 @ rtp_llm/cpp/model_rpc/PrefillServerCallerContext.cc:1697
    • 建议:在 Shutdown 后用 while (cq->Next(&tag,&ok)) 排空全部事件再析构;或者改用 Drain 模式,确保所有 outstanding tag 被回收。
  • LocalRpcServer::ExecuteFunction 错误信息缺失上下文,运维难定位 @ rtp_llm/cpp/model_rpc/LocalRpcServer.cc:1282
    • 建议:保留关键标识字段(例如 request type、unique_key、request_id)拼到日志/error_msg,避免 DebugString 体积过大但保留关键诊断信息。
  • transfer/proto/BUILD default_visibility 指向不存在的包路径 @ rtp_llm/cpp/cache/connector/p2p/transfer/proto/BUILD:779
    • 建议:将路径修正为 //rtp_llm/cpp/cache/connector/p2p:subpackages(或与同目录其它 BUILD 一致的可见性)。
  • DecodeRpcServerNew2::GenerateStreamCall 在请求处理路径打印 'init failed' 误导日志 @ rtp_llm/cpp/model_rpc/DecodeRpcServerNew2.cc:1161
    • 建议:将日志改为反映请求级别的失败信息(例如 'request [%ld] missing prefill ip/port' 或 'request [%ld] failed to fetch prefill tp_size'),与 init 阶段的日志区分。

P3

  • 多个新增 BUILD/.h/.cc 文件缺少 EOF 换行 @ rtp_llm/cpp/cache/connector/p2p/transfer/proto/BUILD:792
    • 建议:为上述文件追加结尾换行,避免 git diff/部分工具误报。
Review Checklist: 1 pass / 7 fail

General Principles Checklist

Failed

  • [FAIL] 6.1 Software Engineering: 模块职责单一、错误语义明确 Linked issue: PrefillServerCaller::getPrefillTpSize 探测 deadline 与注释相反,使用 std::max 导致阻塞整段生成预算
  • [FAIL] 6.1 Architecture: 创建/更新/失败/清理路径的状态不变量保持有效 Linked issue: P2PConnectorResourceStore::side_channel_data_map_ 缺少过期清理,长时间运行存在内存泄漏
  • [FAIL] 6.1 Quality: 日志/错误信息可观测、有定位价值 Linked issue: LocalRpcServer::ExecuteFunction 错误信息缺失上下文,运维难定位

Passed

  • [PASS] 6.1 Tests: 新增逻辑有针对性的单测/集成测试覆盖

RTP-LLM Checklist

Failed

  • [FAIL] A 兼容性与配置: 新增 proto/BUILD 的可见性、字段约束符合现有规范 Linked issue: transfer/proto/BUILD default_visibility 指向不存在的包路径
  • [FAIL] C 并发: 跨线程共享资源在生命周期/取消路径下不泄漏不悬挂 Linked issue: ~PrefillServerCallerContext Shutdown CompletionQueue 未排空残留 tag,违反 gRPC 契约
  • [FAIL] E 分布式: PD 分离/远端依赖具备超时与快速失败语义 Linked issue: PrefillServerCaller::getPrefillTpSize 探测 deadline 与注释相反,使用 std::max 导致阻塞整段生成预算
  • [FAIL] I 代码质量: 新增文件遵循仓库格式规范 Linked issue: 多个新增 BUILD/.h/.cc 文件缺少 EOF 换行

Strengths

  • P2PConnectorResourceStore 引入独立的 side_channel_data_map_ 解决了 entry 已被 steal 后再 notify 的竞态,思路合理,并补齐了 waitAndFillResponse 端的 double-check
  • 新增 AsyncEvent 接口将 connector 等待逻辑与 c10::Event 解耦,方便后续在不同设备/平台复用,并且补齐了 P2PConnectorWorkerTest/StoreWaitContextTest 的 ready/not-ready/timeout 用例
  • RemoteRpcServiceImpl::stop() 对四类 server 都加了非空判断,避免了 PD 入口切换后未初始化指针被解引用的隐患

@LLLLKKKK
Copy link
Copy Markdown
Collaborator

AI Code Review - PR #948

Status: BLOCKING

Summary: P0/0 · P1/2 · P2/5 · P3/1

Blocking Issues

P1

  • getPrefillTpSize 的 deadline 仍方向写反,使用 std::max 不会限制探测时长 @ rtp_llm/cpp/model_rpc/PrefillServerCaller.cc:1809
    • 建议:改为 std::min(request_timeout_ms, kPeerInfoProbeMaxMs) 并在过小时设置一个安全下限(例如 std::clamp(request_timeout_ms, 50, kPeerInfoProbeMaxMs)),让 GetPeerInfo 的探测总耗时被 kPeerInfoProbeMaxMs 真正封顶。
  • PrefillServerCallerContext 析构未排空 CompletionQueue 即 Shutdown,违反 gRPC 契约 @ rtp_llm/cpp/model_rpc/PrefillServerCallerContext.cc:1909
    • 建议:析构时 Shutdown 之后用 while(completion_queue_->Next(&tag,&ok)) {} 把残留事件吃掉;或者把 Read/Finish 的 tag 状态机管完整,确保不会出现析构时还有未完成的异步操作。

Non-blocking Suggestions

P2

  • WriteCacheStoreOp 异步 lambda 捕获裸 connector_coordinator 指针,存在生命周期风险 @ rtp_llm/models_py/bindings/common/WriteCacheStoreOp.cc:2449
    • 建议:把 connector_coordinator 改为 weak_ptr/shared_ptr 持有;或在 KVCacheManager 析构前显式 join/drain async_writer,并在文档/注释里固化“async_writer 必须先于 cache_manager 销毁”这一不变量并加 RTP_LLM_CHECK。
  • ExecuteFunction 失败日志被精简到无可观测信息 @ rtp_llm/cpp/model_rpc/LocalRpcServer.cc:1494
    • 建议:至少把 request_id、unique_key 以及 request_case (mem_request/p2p_request 等枚举) 写到日志和 error_msg 里;DebugString 太大可以截断或只输出关键字段,但不能完全丢弃。
  • writeCacheToConnector 在 convertToGlobalLayerId 失败时仅 WARNING 后静默返回 @ rtp_llm/models_py/bindings/core/ExecOps.cc:2596
    • 建议:把这一类失败上抛/记录到 metrics/请求级 error_info;或者在 KVCacheManager 初始化阶段就把所有 (model_id, layer_id) 的全局映射先校验一遍,运行期不允许失败。同时 cache_keys 转换失败的 batch 也应该上报,而不是 continue 跳过。
  • DecodeRpcServerNew2::GenerateStreamCall 内的错误日志写成 "init failed" 误导排查 @ rtp_llm/cpp/model_rpc/DecodeRpcServerNew2.cc:1372
    • 建议:改成 "decode rpc server new2 generate failed: prefill addr unavailable, request_id=%ld" / "... prefill_tp_size unavailable" 之类,把 request_id、prefill_addr 一起带上。
  • DecodeRpcServerNew2 init 中 connector_coordinator 取出后没有任何使用 @ rtp_llm/cpp/model_rpc/DecodeRpcServerNew2.cc:1306
    • 建议:如果只是想保证 P2P 已初始化,加注释说明;如果原本计划把 coordinator 注入 prefill_server_caller_ 或在 GenerateStreamCall 里使用,请补齐;否则删除这段 dead check 避免误导。

P3

  • 多个新增文件缺少行尾换行 @ rtp_llm/cpp/cache/connector/p2p/transfer/proto/BUILD:1004
    • 建议:在三个新文件末尾补一行换行,符合仓库其余 BUILD/.h 的惯例。
Review Checklist: 5 pass / 3 fail

General Principles Checklist

Failed

  • [FAIL] 6.1 Architecture: 跨层资源生命周期/失败/回滚路径不变量保留 Linked issue: WriteCacheStoreOp 异步 lambda 捕获裸 connector_coordinator 指针,存在生命周期风险
  • [FAIL] 6.1 Quality: 提交信息/可观测性与排查友好 Linked issue: ExecuteFunction 失败日志被精简到无可观测信息

Passed

  • [PASS] 6.1 Software Engineering: 模块/接口职责清晰,新增抽象具备明确替换点
  • [PASS] 6.1 Tests: 新逻辑有针对性的边界单测/集成测试

RTP-LLM Checklist

Failed

  • [FAIL] H 测试与 CI: 新 server / caller 路径有针对失败/超时/cancel 的覆盖 Linked issue: PrefillServerCallerContext 析构未排空 CompletionQueue 即 Shutdown,违反 gRPC 契约

Passed

  • [PASS] B 正确性与逻辑: PD 分离/connector wait 路径在 entry 被 steal、超时、cancel 等极端情况下行为正确
  • [PASS] C 并发: 新增/修改的锁与 condition_variable 不引入死锁、漏唤醒
  • [PASS] A 配置 / 兼容性: 新接口/字段对老路径与上下游兼容

Strengths

  • side_channel_data_map_ 现在跟踪独立 deadline,并在 checkTimeout 中按 expired_keys 与 deadline 双路径清理,修复了上一轮 review 指出的内存堆积问题。
  • 引入 rtp_llm::AsyncEvent 抽象 + TorchAsyncEvent 适配,把 connector 侧 wait/check 与 c10::Event/torch::Event 解耦,便于跨平台和单测注入 MockDeviceEvent。
  • 针对 entry 在 notify 之前被 steal 的竞态,waitAndFillResponse 增加 consumeSideChannelData 的 pre-wait 与唤醒后双重检查,并在所有终止路径调用 eraseSideChannelData,路径覆盖完整。
  • 新增 SideChannelData_ExpiredEntryIsAutoRemoved / LateNotificationAfterDeadlineIsDropped / EventReady / EventNotReady / NullEvent / Timeout 等单测,覆盖了 side-channel 和 StoreWaitContext 关键边界。

@ZhihanYan ZhihanYan force-pushed the feature/p2p-connector-0428 branch from 6a3fcaa to 4c15c3d Compare April 29, 2026 09:04
完成P2PConnector实现,支持Prefill与Decode节点间的KV Cache高效传输:

    - 新增TransferService proto定义,支持RDMA/TCP传输
    - 实现DecodeRpcServerNew2:协调异步prefill调用并执行decode
    - 实现PrefillRpcServerNew2:处理prefill请求和P2P缓存加载
    - 新增PrefillServerCaller组件,支持异步prefill RPC调用与连接池
    - 启用P2PConnector初始化,暴露loadCacheSync接口
@ZhihanYan ZhihanYan force-pushed the feature/p2p-connector-0428 branch from 4c15c3d to 72c905d Compare April 29, 2026 09:05
@LLLLKKKK
Copy link
Copy Markdown
Collaborator

AI Code Review - PR #948

Status: BLOCKING

Summary: P0/0 · P1/2 · P2/5 · P3/2

Blocking Issues

P1

  • getPrefillTpSize 用 std::max 反向放大了探测 deadline,与文档承诺相反 @ rtp_llm/cpp/model_rpc/PrefillServerCaller.cc:148
    • 建议:改为 std::min(request_timeout_ms, kPeerInfoProbeMaxMs),并对极小 timeout 单独保底(例如再 max 一个 50ms 下限),以兑现 'bounded min/max' 的契约;同时补一条 prefill 不可达场景的端到端超时上限测试。
  • PrefillServerCallerContext 析构未排空 CompletionQueue,违反 gRPC 销毁契约 @ rtp_llm/cpp/model_rpc/PrefillServerCallerContext.cc:18
    • 建议:在析构里 Shutdown 之后加一个 void* tag; bool ok; while (completion_queue_->Next(&tag, &ok)) {} 把队列彻底排空再让其析构;或者把 drain 责任搬到 cancel/checkDone 流程的同一线程,明确 finished_ 真正等价于 CQ 已 drain。

Non-blocking Suggestions

P2

  • WriteCacheStoreOp 异步 lambda 持有 IKVCacheConnectorCoordinator 裸指针,存在生命周期悬挂风险 @ rtp_llm/models_py/bindings/common/WriteCacheStoreOp.cc:23
    • 建议:把 connector_coordinator 改为按 std::shared_ptr<IKVCacheConnectorCoordinator> 捕获,或者把 cache_manager_ 的 shared_ptr 一并捕获以延长 coordinator 生命周期;同时在 PyCacheStoreInputs 中考虑直接存 shared_ptr 而不是裸指针。
  • writeCacheToConnector 在 convertToGlobalLayerId 失败时仅打印日志静默吞掉写入 @ rtp_llm/models_py/bindings/core/ExecOps.cc:179
    • 建议:至少把 layer_id 和当时 batch_id 一并打出,并通过 metric/error 通道把 P2P 写失败上报;考虑通过返回值或 collector 让 caller 感知失败以便走超时之外的快速错误路径。
  • DecodeRpcServerNew2::GenerateStreamCall 用 'init failed' 文案打印运行期错误,日志严重误导 @ rtp_llm/cpp/model_rpc/DecodeRpcServerNew2.cc:90
    • 建议:去掉 'init failed' 字样,改成 'GenerateStreamCall: missing prefill role_addr' / 'GenerateStreamCall: prefill_tp_size unavailable, prefill=ip:port, request_id=...',并带上 request_id 与 prefill 地址,方便定位。
  • callPrefill 每次请求 detach 一个 1ms 轮询线程,QPS 高时线程膨胀 @ rtp_llm/cpp/model_rpc/PrefillServerCaller.cc:55
    • 建议:改为共享 drain 线程池/单 poller 或直接用 grpc Async API 的 thread pool,让 PrefillServerCallerContext 注册到中央事件循环;单请求最多分摊到 1 个 callback。
  • applyP2PSideChannelToStream 改用 updateWithoutLock 但调用链未明确持锁,存在数据竞争隐患 @ rtp_llm/cpp/engine_base/stream/StreamCacheResource.cc:137
    • 建议:在 applyP2PSideChannelToStream 入口显式 std::lock_guard lock(*stream->mutex()) 或在头注释里写清楚 'caller must hold stream mutex',并补一条单元测试验证并发 update 不会触发 TSAN race。

P3

  • 新增 BUILD 的 default_visibility 路径写错,应为 cache/connector/p2p 而不是 cache/p2p_connector @ rtp_llm/cpp/cache/connector/p2p/transfer/proto/BUILD:1
    • 建议:改成 ["//rtp_llm/cpp/cache/connector/p2p:__subpackages__"] 或直接删掉 default_visibility(每个 target 已显式声明)。同时给文件加上结尾换行。
  • 多个新文件缺少结尾换行(POSIX/clang-format 风格统一性) @ rtp_llm/cpp/model_rpc/DecodeRpcServerNew2.h:31
    • 建议:统一补上结尾换行,避免 grep/diff/clang-format 噪声。
Review Checklist: 1 pass / 9 fail

General Principles Checklist

Failed

  • [FAIL] 6.1 Software Engineering: 公开 API 行为与文档承诺一致(契约一致性) Linked issue: getPrefillTpSize 用 std::max 反向放大了探测 deadline,与文档承诺相反
  • [FAIL] 6.1 Architecture: 异步任务跨线程持有的对象需保证生命周期;优先用 shared_ptr/owner 引用而非裸指针 Linked issue: WriteCacheStoreOp 异步 lambda 持有 IKVCacheConnectorCoordinator 裸指针,存在生命周期悬挂风险
  • [FAIL] 6.1 Tests: 失败/超时/竞争路径有针对性单测或集成测试 Linked issue: getPrefillTpSize 用 std::max 反向放大了探测 deadline,与文档承诺相反
  • [FAIL] 6.1 Quality: 日志/错误信息能准确反映现场,避免误导 Linked issue: DecodeRpcServerNew2::GenerateStreamCall 用 'init failed' 文案打印运行期错误,日志严重误导

RTP-LLM Checklist

Failed

  • [FAIL] C 并发与锁: 跨线程访问的 stream 字段必须明确锁契约,避免无锁写入 Linked issue: applyP2PSideChannelToStream 改用 updateWithoutLock 但调用链未明确持锁,存在数据竞争隐患
  • [FAIL] B 正确性与逻辑: 失败路径要可观测,不能静默吞掉错误 Linked issue: writeCacheToConnector 在 convertToGlobalLayerId 失败时仅打印日志静默吞掉写入
  • [FAIL] D 性能: 每请求线程/per-request busy-poll 在高 QPS 下应避免 Linked issue: callPrefill 每次请求 detach 一个 1ms 轮询线程,QPS 高时线程膨胀
  • [FAIL] G 跨语言/框架陷阱: gRPC CompletionQueue 销毁前必须 drain 完所有 tag Linked issue: PrefillServerCallerContext 析构未排空 CompletionQueue,违反 gRPC 销毁契约
  • [FAIL] I 代码质量: BUILD 文件路径与可见性正确,新文件统一规范 Linked issue: 新增 BUILD 的 default_visibility 路径写错,应为 cache/connector/p2p 而不是 cache/p2p_connector

Passed

  • [PASS] H 测试与 CI: 新引入的关键路径有单元测试覆盖

Strengths

  • side_channel_data_map_ 把 side-channel 数据从 resource entry 解耦,修掉了 stolen-before-notify 竞态,并补上 consumeSideChannelData 双重检查
  • AsyncEvent 抽象把 c10::Event 与连接器解耦,让 P2P 路径与具体设备/torch 版本解绑,新增的 MockDeviceEvent 单测覆盖 ready/not-ready/timeout/null 四种典型路径
  • NormalGenerateStream::updateOutput 把 P2P side-channel 通知挪到 finished_ 赋值之前,修复了 PD-sep prefill 单 token 完成时第一 token 通知丢失的问题
  • pollStreamOutput 把 waitForRemoteGenerate 收紧到 role_type==PREFILL,避免 decode 侧错误进入等待

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants