[Cherry-Pick] Fix PD interaction and error response#7539
[Cherry-Pick] Fix PD interaction and error response#7539juncaipeng wants to merge 1 commit intoPaddlePaddle:release/2.6from
Conversation
|
Thanks for your contribution! |
There was a problem hiding this comment.
Pull request overview
该 PR 主要围绕 Splitwise(P/D) 模式下的异常交互与错误返回做兼容增强:在出现 PD 侧可重试错误(如 preemption/资源不足)时,补齐错误信息透传、finish_reason 标注,并在 Router 侧支持按策略重试调度。
Changes:
- Router:新增 decode preemption 场景的重试调度能力,并通过
pd_reschedulefinish_reason 触发重试判断 - Engine/Processor:统一/强化 PD 相关错误码与错误消息(
PD Error: ...),并让上层在错误响应时跳过 token 解码流程 - Scheduler/PD 通信:调整 decode 侧预分配与任务入队时机、cache_sync 发送分组逻辑,并补充部分 metrics 字段
Reviewed changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/v1/test_resource_manager_v1.py | 适配 decode 侧预分配/idx 行为变更 |
| tests/splitwise/test_splitwise_connector.py | 调整 cache_sync 用例输入;但当前用例语义/覆盖需再对齐实现 |
| tests/router/test_router.py | 适配 RouterArgs 新增参数 |
| tests/output/test_token_processor.py | 适配错误码与错误字段(error_msg)变更 |
| tests/input/test_ernie_vl_processor.py | 适配错误响应跳过解码逻辑(需要提供 outputs) |
| fastdeploy/splitwise/splitwise_connector.py | cache_sync 按 addr 统一聚合发送(包含 error_msg 场景) |
| fastdeploy/router/router.py | splitwise 模式新增 preemption 重试与 pd_reschedule 判断逻辑 |
| fastdeploy/output/token_processor.py | prefill 失败场景错误码/错误信息更新;batch 输出处理跳过空 task 槽位 |
| fastdeploy/input/base_processor.py | 错误响应(outputs=None 或 error_code!=200)直接透传,避免解码报错 |
| fastdeploy/entrypoints/openai/serving_chat.py | 错误响应补齐 outputs/finished,且映射 PD Error -> pd_reschedule |
| fastdeploy/entrypoints/openai/protocol.py | 扩展 finish_reason Literal,新增 pd_reschedule(但仍需补齐 abort) |
| fastdeploy/engine/sched/resource_manager_v1.py | decode 预分配与 tasks_list 写入时机调整;补充 prompt_token_ids_len metrics |
| fastdeploy/engine/request.py | RequestMetrics 增加 prompt_token_ids_len 字段 |
| fastdeploy/engine/common_engine.py | PD 相关错误消息增加 PD Error 前缀并细化错误码 |
| def test_send_cache_info_to_prefill_groups_by_addr_and_skips_error(): | ||
| connector = _build_connector() | ||
| connector._send_message = Mock() | ||
| # Add mock resource_manager with waiting_abort_req_id_set | ||
| connector.resource_manager = Mock() | ||
| connector.resource_manager.waiting_abort_req_id_set = set() | ||
|
|
||
| tasks = [ | ||
| DummyTask( | ||
| request_id="req-1", | ||
| disaggregate_info={ | ||
| "prefill_ip": "10.0.0.1", | ||
| "prefill_connector_port": 9001, | ||
| "block_tables": [1, 2, 3], | ||
| }, | ||
| ), | ||
| DummyTask( | ||
| request_id="req-err", | ||
| disaggregate_info={ | ||
| "prefill_ip": "10.0.0.2", | ||
| "prefill_connector_port": 9002, | ||
| "block_tables": [9], | ||
| }, | ||
| error_msg="failed", | ||
| ), | ||
| ] |
There was a problem hiding this comment.
这个用例名仍叫 skips_error,但当前 tasks 列表里已不再包含 error task,测试实际上不再覆盖“error_msg 任务如何处理/是否发送”的行为,且与实现(send_cache_info_to_prefill 现在会把 error_msg 也发到 prefill)不一致。建议:要么更新用例名与断言以覆盖 error task 的分组与发送内容,要么补回 error task 并断言其发送行为。
| draft_logprobs: Optional[LogProbs] = None | ||
| prompt_logprobs: Optional[PromptLogprobs] = None | ||
| finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop"]] | ||
| finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop", "pd_reschedule"]] |
There was a problem hiding this comment.
protocol 里 finish_reason 的 Literal 列表仍未包含 "abort",但 serving_chat/serving_completion 在检测到 "Aborted" 时会设置 finish_reason="abort",这会触发 Pydantic 校验失败并导致接口返回 500。建议把 "abort" 也加入该 Literal(或改用 Enum/str 并在上层统一约束取值)。
| finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop", "pd_reschedule"]] | |
| finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop", "pd_reschedule", "abort"]] |
| draft_logprobs: Optional[LogProbs] = None | ||
| prompt_logprobs: Optional[PromptLogprobs] = None | ||
| finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop"]] = None | ||
| finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop", "pd_reschedule"]] = None |
There was a problem hiding this comment.
ChatCompletionResponseStreamChoice.finish_reason 的 Literal 未包含 "abort",但服务端会在 "Aborted" 场景下设置 finish_reason="abort",会导致响应模型校验失败。建议把 "abort" 加入该 Literal(与 ChatCompletionResponseChoice 保持一致)。
| prompt_logprobs: Optional[PromptLogprobs] = None | ||
| reasoning_content: Optional[str] = None | ||
| finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop"]] = None | ||
| finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop", "pd_reschedule"]] = None |
There was a problem hiding this comment.
CompletionResponseChoice.finish_reason 的 Literal 未包含 "abort",但 serving_completion 会在 "Aborted" 场景下设置 finish_reason="abort",可能导致 Pydantic 校验失败。建议把 "abort" 加入该 Literal(或统一用 Enum)。
| finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop", "pd_reschedule"]] = None | |
| finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop", "pd_reschedule", "abort"]] = None |
| completion_tokens: Optional[str] = None | ||
| reasoning_content: Optional[str] = None | ||
| finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop"]] = None | ||
| finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop", "pd_reschedule"]] = None |
There was a problem hiding this comment.
CompletionResponseStreamChoice.finish_reason 的 Literal 未包含 "abort",但 serving_completion 会设置 finish_reason="abort",会导致流式/非流式完成接口在 abort 场景下构造响应失败。建议把 "abort" 加入该 Literal。
| allocated_position = self.get_available_position() | ||
| request.idx = allocated_position | ||
| self.tasks_list[request.idx] = request | ||
| self.stop_flags[request.idx] = False | ||
| self.requests[request.request_id] = request | ||
| self.req_dict[request.request_id] = allocated_position |
There was a problem hiding this comment.
preallocate_resource_in_d 中不再写入 self.tasks_list[request.idx],但该请求已分配 GPU blocks 且 stop_flags[idx]=False;后续 update_metrics/_recycle_resources 里按 tasks_list 统计 blocks_used 会漏算这部分,导致 available_gpu_block_num 等指标在“预分配-收到首 token”窗口期被高估。建议指标统计改为基于 cache_manager 的 free/used blocks,或把预分配请求单独纳入统计(同时保持 token_processor 不会误处理)。
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## release/2.6 #7539 +/- ##
==============================================
Coverage ? 73.17%
==============================================
Files ? 376
Lines ? 53131
Branches ? 8307
==============================================
Hits ? 38878
Misses ? 11517
Partials ? 2736
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
7affbb5 to
ee9dd04
Compare
PaddlePaddle-bot
left a comment
There was a problem hiding this comment.
🤖 AI Code Review |
2026-04-22 18:11:55
📋 Review 摘要
PR 概述:修复 PD(Prefill-Decode)分离场景下的交互逻辑问题,包括错误响应改善、Router 层抢占重试机制、tasks_list 未初始化访问防护等。
变更范围:engine/、entrypoints/、input/、output/、router/、splitwise/
影响面 Tag:[PD Disaggregation] [Engine] [APIServer]
📝 PR 规范检查
PR 标题缺少官方 Tag,且 Motivation/Modifications 描述未填写。
标题建议(可直接复制):
[Cherry-Pick][PD Disaggregation][BugFix] Fix PD interaction and error response
描述建议(Motivation/Modifications 部分可直接复制):
## Motivation
修复 PD 分离模式下多处交互缺陷:
1. Prefill 端向 Decode 申请资源失败时错误信息不清晰,缺乏统一前缀标识
2. Decode 端资源不足发生抢占时,未返回结构化的 `pd_reschedule` finish_reason
3. Router 层对非流式请求缺少抢占后的自动重试机制
4. `tasks_list` 在 `preallocate_resource_in_d` 阶段赋值过早,导致请求未就绪时可能被调度
## Modifications
- `common_engine.py`:统一错误日志格式,新增 `PD Error:` 前缀,区分 prefill 与 decode 侧错误;decode 抢占错误码由 500 → 502
- `resource_manager_v1.py`:将 `tasks_list` 赋值移至 `add_prefilled_request` 阶段,避免请求未完全就绪时被处理;`_process_batch_output` 新增 `tasks_list[i] is None` 空值判断
- `router.py`:新增 `preempt_retry_count`(默认 3)、`preempt_retry_exclude_decode` 参数,支持非流式请求抢占后自动重试并可选排除上次 decode 节点
- `serving_chat.py`:错误响应中保留已生成 token 文本,返回 `finish_reason=pd_reschedule`
- `protocol.py`:四处 `finish_reason` Literal 新增 `pd_reschedule`
- `base_processor.py`:`process_response_dict` 对 `outputs=None` 或 `error_code!=200` 提前返回,避免错误响应进入解码路径
- `token_processor.py`:错误码由 400 → 501,字段名由 `error_message` → `error_msg` 对齐协议
- `splitwise_connector.py`:简化 `send_cache_info_to_prefill` 逻辑,移除 `waiting_abort_req_id_set` 条件分支,统一将消息(含错误信息)发回 prefill问题
| 级别 | 文件 | 概述 |
|---|---|---|
| 🔴 Bug | serving_chat.py:628 |
completion_tokens 被赋值为字符串而非 token 数量(整数) |
| 🟡 建议 | router.py:264 |
流式请求直接 return,绕过重试循环,preempt_retry_count 对流式无效 |
总体评价
本 PR 修复了 PD 分离模式的多个关键问题,tasks_list 提前赋值的并发安全修复、_process_batch_output 的空值守卫均属有价值的改进,Router 重试机制设计合理。需关注 serving_chat.py 中 completion_tokens 的类型错误,该字段被错误地设置为解码后的文本字符串而非整数计数,可能影响下游统计或计费逻辑。
| text = "" | ||
| data["outputs"] = { | ||
| "text": text, | ||
| "completion_tokens": text, |
There was a problem hiding this comment.
🔴 Bug completion_tokens 字段被赋值为字符串 text,但该字段应为 token 数量(整数)。
这会导致下游消费 completion_tokens 的逻辑(如 usage 统计、计费、限速)得到错误类型,可能引发 TypeError 或静默计算错误。
建议修复:
"completion_tokens": len(completion_token_ids[idx]),| ) | ||
|
|
||
| async def _generate( | ||
| if request_data.get("stream", False): |
There was a problem hiding this comment.
🟡 建议 流式请求在重试循环内直接 return,不会触发抢占重试逻辑。
当前实现中,preempt_retry_count 和 preempt_retry_exclude_decode 参数对流式请求完全无效——无论配置多少次重试,流式请求始终在第一次 attempt 即返回,不会检测 pd_reschedule 并重试。
建议在函数 docstring 或 CLI help 中明确注明「流式模式下不支持抢占重试」,避免用户误配置后产生错误预期。
Motivation
Modifications
Usage or Command
Accuracy Tests
Checklist
[FDConfig],[APIServer],[Engine],[Scheduler],[PD Disaggregation],[Executor],[Graph Optimization],[Speculative Decoding],[RL],[Models],[Quantization],[Loader],[OP],[KVCache],[DataProcessor],[BugFix],[Docs],[CI],[Optimization],[Feature],[Benchmark],[Others],[XPU],[HPU],[GCU],[DCU],[Iluvatar],[Metax]]pre-commitbefore commit.releasebranch, make sure the PR has been submitted to thedevelopbranch, then cherry-pick it to thereleasebranch with the[Cherry-Pick]PR tag.