Skip to content

[Cherry-Pick] Fix PD interaction and error response#7539

Closed
juncaipeng wants to merge 1 commit intoPaddlePaddle:release/2.6from
juncaipeng:release/2.6-pd-refine
Closed

[Cherry-Pick] Fix PD interaction and error response#7539
juncaipeng wants to merge 1 commit intoPaddlePaddle:release/2.6from
juncaipeng:release/2.6-pd-refine

Conversation

@juncaipeng
Copy link
Copy Markdown
Collaborator

Motivation

💡 If this PR is a Cherry Pick, the PR title needs to follow the format by adding the [Cherry-Pick] label at the very beginning and appending the original PR ID at the end. For example, [Cherry-Pick][CI] Add check trigger and logic(#5191)

💡 如若此PR是Cherry Pick,PR标题需遵循格式,在最开始加上[Cherry-Pick]标签,以及最后面加上原PR ID,例如[Cherry-Pick][CI] Add check trigger and logic(#5191)

Modifications

Usage or Command

Accuracy Tests

Checklist

  • Add at least a tag in the PR title.
    • Tag list: [[FDConfig],[APIServer],[Engine], [Scheduler], [PD Disaggregation], [Executor], [Graph Optimization], [Speculative Decoding], [RL], [Models], [Quantization], [Loader], [OP], [KVCache], [DataProcessor], [BugFix], [Docs], [CI], [Optimization], [Feature], [Benchmark], [Others], [XPU], [HPU], [GCU], [DCU], [Iluvatar], [Metax]]
    • You can add new tags based on the PR content, but the semantics must be clear.
  • Format your code, run pre-commit before commit.
  • Add unit tests. Please write the reason in this PR if no unit tests.
  • Provide accuracy results.
  • If the current PR is submitting to the release branch, make sure the PR has been submitted to the develop branch, then cherry-pick it to the release branch with the [Cherry-Pick] PR tag.

Copilot AI review requested due to automatic review settings April 21, 2026 11:39
@paddle-bot
Copy link
Copy Markdown

paddle-bot Bot commented Apr 21, 2026

Thanks for your contribution!

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

该 PR 主要围绕 Splitwise(P/D) 模式下的异常交互与错误返回做兼容增强:在出现 PD 侧可重试错误(如 preemption/资源不足)时,补齐错误信息透传、finish_reason 标注,并在 Router 侧支持按策略重试调度。

Changes:

  • Router:新增 decode preemption 场景的重试调度能力,并通过 pd_reschedule finish_reason 触发重试判断
  • Engine/Processor:统一/强化 PD 相关错误码与错误消息(PD Error: ...),并让上层在错误响应时跳过 token 解码流程
  • Scheduler/PD 通信:调整 decode 侧预分配与任务入队时机、cache_sync 发送分组逻辑,并补充部分 metrics 字段

Reviewed changes

Copilot reviewed 14 out of 14 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
tests/v1/test_resource_manager_v1.py 适配 decode 侧预分配/idx 行为变更
tests/splitwise/test_splitwise_connector.py 调整 cache_sync 用例输入;但当前用例语义/覆盖需再对齐实现
tests/router/test_router.py 适配 RouterArgs 新增参数
tests/output/test_token_processor.py 适配错误码与错误字段(error_msg)变更
tests/input/test_ernie_vl_processor.py 适配错误响应跳过解码逻辑(需要提供 outputs
fastdeploy/splitwise/splitwise_connector.py cache_sync 按 addr 统一聚合发送(包含 error_msg 场景)
fastdeploy/router/router.py splitwise 模式新增 preemption 重试与 pd_reschedule 判断逻辑
fastdeploy/output/token_processor.py prefill 失败场景错误码/错误信息更新;batch 输出处理跳过空 task 槽位
fastdeploy/input/base_processor.py 错误响应(outputs=None 或 error_code!=200)直接透传,避免解码报错
fastdeploy/entrypoints/openai/serving_chat.py 错误响应补齐 outputs/finished,且映射 PD Error -> pd_reschedule
fastdeploy/entrypoints/openai/protocol.py 扩展 finish_reason Literal,新增 pd_reschedule(但仍需补齐 abort
fastdeploy/engine/sched/resource_manager_v1.py decode 预分配与 tasks_list 写入时机调整;补充 prompt_token_ids_len metrics
fastdeploy/engine/request.py RequestMetrics 增加 prompt_token_ids_len 字段
fastdeploy/engine/common_engine.py PD 相关错误消息增加 PD Error 前缀并细化错误码

Comment on lines 195 to 211
def test_send_cache_info_to_prefill_groups_by_addr_and_skips_error():
connector = _build_connector()
connector._send_message = Mock()
# Add mock resource_manager with waiting_abort_req_id_set
connector.resource_manager = Mock()
connector.resource_manager.waiting_abort_req_id_set = set()

tasks = [
DummyTask(
request_id="req-1",
disaggregate_info={
"prefill_ip": "10.0.0.1",
"prefill_connector_port": 9001,
"block_tables": [1, 2, 3],
},
),
DummyTask(
request_id="req-err",
disaggregate_info={
"prefill_ip": "10.0.0.2",
"prefill_connector_port": 9002,
"block_tables": [9],
},
error_msg="failed",
),
]
Copy link

Copilot AI Apr 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个用例名仍叫 skips_error,但当前 tasks 列表里已不再包含 error task,测试实际上不再覆盖“error_msg 任务如何处理/是否发送”的行为,且与实现(send_cache_info_to_prefill 现在会把 error_msg 也发到 prefill)不一致。建议:要么更新用例名与断言以覆盖 error task 的分组与发送内容,要么补回 error task 并断言其发送行为。

Copilot generated this review using guidance from repository custom instructions.
draft_logprobs: Optional[LogProbs] = None
prompt_logprobs: Optional[PromptLogprobs] = None
finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop"]]
finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop", "pd_reschedule"]]
Copy link

Copilot AI Apr 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

protocol 里 finish_reason 的 Literal 列表仍未包含 "abort",但 serving_chat/serving_completion 在检测到 "Aborted" 时会设置 finish_reason="abort",这会触发 Pydantic 校验失败并导致接口返回 500。建议把 "abort" 也加入该 Literal(或改用 Enum/str 并在上层统一约束取值)。

Suggested change
finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop", "pd_reschedule"]]
finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop", "pd_reschedule", "abort"]]

Copilot uses AI. Check for mistakes.
draft_logprobs: Optional[LogProbs] = None
prompt_logprobs: Optional[PromptLogprobs] = None
finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop"]] = None
finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop", "pd_reschedule"]] = None
Copy link

Copilot AI Apr 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ChatCompletionResponseStreamChoice.finish_reason 的 Literal 未包含 "abort",但服务端会在 "Aborted" 场景下设置 finish_reason="abort",会导致响应模型校验失败。建议把 "abort" 加入该 Literal(与 ChatCompletionResponseChoice 保持一致)。

Copilot uses AI. Check for mistakes.
prompt_logprobs: Optional[PromptLogprobs] = None
reasoning_content: Optional[str] = None
finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop"]] = None
finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop", "pd_reschedule"]] = None
Copy link

Copilot AI Apr 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CompletionResponseChoice.finish_reason 的 Literal 未包含 "abort",但 serving_completion 会在 "Aborted" 场景下设置 finish_reason="abort",可能导致 Pydantic 校验失败。建议把 "abort" 加入该 Literal(或统一用 Enum)。

Suggested change
finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop", "pd_reschedule"]] = None
finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop", "pd_reschedule", "abort"]] = None

Copilot uses AI. Check for mistakes.
completion_tokens: Optional[str] = None
reasoning_content: Optional[str] = None
finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop"]] = None
finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop", "pd_reschedule"]] = None
Copy link

Copilot AI Apr 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CompletionResponseStreamChoice.finish_reason 的 Literal 未包含 "abort",但 serving_completion 会设置 finish_reason="abort",会导致流式/非流式完成接口在 abort 场景下构造响应失败。建议把 "abort" 加入该 Literal。

Copilot uses AI. Check for mistakes.
Comment on lines 1441 to 1445
allocated_position = self.get_available_position()
request.idx = allocated_position
self.tasks_list[request.idx] = request
self.stop_flags[request.idx] = False
self.requests[request.request_id] = request
self.req_dict[request.request_id] = allocated_position
Copy link

Copilot AI Apr 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

preallocate_resource_in_d 中不再写入 self.tasks_list[request.idx],但该请求已分配 GPU blocks 且 stop_flags[idx]=False;后续 update_metrics/_recycle_resources 里按 tasks_list 统计 blocks_used 会漏算这部分,导致 available_gpu_block_num 等指标在“预分配-收到首 token”窗口期被高估。建议指标统计改为基于 cache_manager 的 free/used blocks,或把预分配请求单独纳入统计(同时保持 token_processor 不会误处理)。

Copilot uses AI. Check for mistakes.
PaddlePaddle-bot

This comment was marked as outdated.

@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented Apr 21, 2026

Codecov Report

❌ Patch coverage is 34.78261% with 60 lines in your changes missing coverage. Please review.
⚠️ Please upload report for BASE (release/2.6@9c91ecb). Learn more about missing BASE report.

Files with missing lines Patch % Lines
fastdeploy/router/router.py 22.41% 44 Missing and 1 partial ⚠️
fastdeploy/entrypoints/openai/serving_chat.py 0.00% 7 Missing and 2 partials ⚠️
fastdeploy/input/base_processor.py 28.57% 3 Missing and 2 partials ⚠️
fastdeploy/splitwise/splitwise_connector.py 75.00% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@              Coverage Diff               @@
##             release/2.6    #7539   +/-   ##
==============================================
  Coverage               ?   73.17%           
==============================================
  Files                  ?      376           
  Lines                  ?    53131           
  Branches               ?     8307           
==============================================
  Hits                   ?    38878           
  Misses                 ?    11517           
  Partials               ?     2736           
Flag Coverage Δ
GPU 73.17% <34.78%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Copy Markdown

@PaddlePaddle-bot PaddlePaddle-bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 AI Code Review | 2026-04-22 18:11:55

📋 Review 摘要

PR 概述:修复 PD(Prefill-Decode)分离场景下的交互逻辑问题,包括错误响应改善、Router 层抢占重试机制、tasks_list 未初始化访问防护等。
变更范围engine/entrypoints/input/output/router/splitwise/
影响面 Tag[PD Disaggregation] [Engine] [APIServer]


📝 PR 规范检查

PR 标题缺少官方 Tag,且 Motivation/Modifications 描述未填写。

标题建议(可直接复制):

[Cherry-Pick][PD Disaggregation][BugFix] Fix PD interaction and error response

描述建议(Motivation/Modifications 部分可直接复制):

## Motivation

修复 PD 分离模式下多处交互缺陷:
1. Prefill 端向 Decode 申请资源失败时错误信息不清晰,缺乏统一前缀标识
2. Decode 端资源不足发生抢占时,未返回结构化的 `pd_reschedule` finish_reason
3. Router 层对非流式请求缺少抢占后的自动重试机制
4. `tasks_list``preallocate_resource_in_d` 阶段赋值过早,导致请求未就绪时可能被调度

## Modifications

- `common_engine.py`:统一错误日志格式,新增 `PD Error:` 前缀,区分 prefill 与 decode 侧错误;decode 抢占错误码由 500 → 502
- `resource_manager_v1.py`:将 `tasks_list` 赋值移至 `add_prefilled_request` 阶段,避免请求未完全就绪时被处理;`_process_batch_output` 新增 `tasks_list[i] is None` 空值判断
- `router.py`:新增 `preempt_retry_count`(默认 3)、`preempt_retry_exclude_decode` 参数,支持非流式请求抢占后自动重试并可选排除上次 decode 节点
- `serving_chat.py`:错误响应中保留已生成 token 文本,返回 `finish_reason=pd_reschedule`
- `protocol.py`:四处 `finish_reason` Literal 新增 `pd_reschedule`
- `base_processor.py``process_response_dict``outputs=None``error_code!=200` 提前返回,避免错误响应进入解码路径
- `token_processor.py`:错误码由 400 → 501,字段名由 `error_message``error_msg` 对齐协议
- `splitwise_connector.py`:简化 `send_cache_info_to_prefill` 逻辑,移除 `waiting_abort_req_id_set` 条件分支,统一将消息(含错误信息)发回 prefill

问题

级别 文件 概述
🔴 Bug serving_chat.py:628 completion_tokens 被赋值为字符串而非 token 数量(整数)
🟡 建议 router.py:264 流式请求直接 return,绕过重试循环,preempt_retry_count 对流式无效

总体评价

本 PR 修复了 PD 分离模式的多个关键问题,tasks_list 提前赋值的并发安全修复、_process_batch_output 的空值守卫均属有价值的改进,Router 重试机制设计合理。需关注 serving_chat.pycompletion_tokens 的类型错误,该字段被错误地设置为解码后的文本字符串而非整数计数,可能影响下游统计或计费逻辑。

text = ""
data["outputs"] = {
"text": text,
"completion_tokens": text,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Bug completion_tokens 字段被赋值为字符串 text,但该字段应为 token 数量(整数)。

这会导致下游消费 completion_tokens 的逻辑(如 usage 统计、计费、限速)得到错误类型,可能引发 TypeError 或静默计算错误。

建议修复:

"completion_tokens": len(completion_token_ids[idx]),

)

async def _generate(
if request_data.get("stream", False):
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 建议 流式请求在重试循环内直接 return,不会触发抢占重试逻辑。

当前实现中,preempt_retry_countpreempt_retry_exclude_decode 参数对流式请求完全无效——无论配置多少次重试,流式请求始终在第一次 attempt 即返回,不会检测 pd_reschedule 并重试。

建议在函数 docstring 或 CLI help 中明确注明「流式模式下不支持抢占重试」,避免用户误配置后产生错误预期。

@juncaipeng juncaipeng closed this Apr 24, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants