Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions fastdeploy/engine/common_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -959,14 +959,18 @@ def _fetch_request():
status, msg = self.split_connector.check_decode_allocated(task)
task.metrics.ask_decode_resource_finish_time = time.time()
if not status:
self.llm_logger.error(f"{task.request_id} prefill failed with msg:{msg}.")
error_msg = (
f"PD Error: prefill failed to apply for resource from decode, "
f"req: {task.request_id}, msg:{msg}."
)
self.llm_logger.error(error_msg)
self.scheduler.put_results(
[
RequestOutput(
request_id=task.request_id,
finished=True,
error_code=500,
error_msg=msg,
error_msg=error_msg,
)
]
)
Expand Down Expand Up @@ -1072,14 +1076,17 @@ def _fetch_request():
if self.cfg.scheduler_config.splitwise_role == "decode":
for task in tasks:
if task.task_type == RequestType.PREEMPTED:
msg = f"{task.request_id} decode not enough blocks, need to be rescheduled."
msg = (
f"PD Error: decode does not have enough blocks for "
f"preallocated request. req:{task.request_id} "
)
self.llm_logger.error(msg)
self.scheduler.put_results(
[
RequestOutput(
request_id=task.request_id,
finished=True,
error_code=500,
error_code=502,
error_msg=msg,
)
]
Expand Down
2 changes: 2 additions & 0 deletions fastdeploy/engine/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ def __init__(
self.metrics = RequestMetrics()
else:
self.metrics = metrics
self.metrics.prompt_token_ids_len = self.prompt_token_ids_len
# from ChatCompletionRequest or CompletionRequest
self.user = user
self.metadata = metadata
Expand Down Expand Up @@ -872,6 +873,7 @@ class RequestMetrics:
speculate_metrics: Optional[SpeculateMetrics] = None

# cache related
prompt_token_ids_len: Optional[int] = None
gpu_cache_token_num: Optional[int] = 0
cpu_cache_token_num: Optional[int] = 0
storage_cache_token_num: Optional[int] = 0
Expand Down
4 changes: 3 additions & 1 deletion fastdeploy/engine/sched/resource_manager_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -1309,6 +1309,7 @@ def get_prefix_cached_blocks(self, request: Request):
request.metrics.storage_cache_token_num = metrics["storage_match_token_num"]
request.metrics.cpu_cache_prepare_time = metrics["cpu_cache_prepare_time"]
request.metrics.storage_cache_prepare_time = metrics["storage_cache_prepare_time"]
request.metrics.prompt_token_ids_len = request.prompt_token_ids_len

main_process_metrics.prefix_cache_token_num.inc(request.num_computed_tokens)
main_process_metrics.prefix_gpu_cache_token_num.inc(request.metrics.gpu_cache_token_num)
Expand Down Expand Up @@ -1439,7 +1440,6 @@ def preallocate_resource_in_d(self, request: Request):
request.disaggregate_info["block_tables"] = request.block_tables
allocated_position = self.get_available_position()
request.idx = allocated_position
self.tasks_list[request.idx] = request
self.stop_flags[request.idx] = False
self.requests[request.request_id] = request
self.req_dict[request.request_id] = allocated_position
Comment on lines 1441 to 1445
Copy link

Copilot AI Apr 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

preallocate_resource_in_d 中不再写入 self.tasks_list[request.idx],但该请求已分配 GPU blocks 且 stop_flags[idx]=False;后续 update_metrics/_recycle_resources 里按 tasks_list 统计 blocks_used 会漏算这部分,导致 available_gpu_block_num 等指标在“预分配-收到首 token”窗口期被高估。建议指标统计改为基于 cache_manager 的 free/used blocks,或把预分配请求单独纳入统计(同时保持 token_processor 不会误处理)。

Copilot uses AI. Check for mistakes.
Expand Down Expand Up @@ -1483,6 +1483,8 @@ def add_prefilled_request(self, request_output: RequestOutput):
request.metrics = copy.deepcopy(request_output.metrics)
request.metrics.decode_inference_start_time = time.time()
request.metrics.update_decoder_start_time()

self.tasks_list[request.idx] = request
self.running.append(request)

def _free_blocks(self, request: Request):
Expand Down
8 changes: 4 additions & 4 deletions fastdeploy/entrypoints/openai/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ class ChatCompletionResponseChoice(BaseModel):
logprobs: Optional[LogProbs] = None
draft_logprobs: Optional[LogProbs] = None
prompt_logprobs: Optional[PromptLogprobs] = None
finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop", "abort"]]
finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop", "abort", "pd_reschedule"]]
speculate_metrics: Optional[SpeculateMetrics] = None


Expand Down Expand Up @@ -333,7 +333,7 @@ class ChatCompletionResponseStreamChoice(BaseModel):
logprobs: Optional[LogProbs] = None
draft_logprobs: Optional[LogProbs] = None
prompt_logprobs: Optional[PromptLogprobs] = None
finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop", "abort"]] = None
finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop", "abort", "pd_reschedule"]] = None
arrival_time: Optional[float] = None
speculate_metrics: Optional[SpeculateMetrics] = None

Expand Down Expand Up @@ -369,7 +369,7 @@ class CompletionResponseChoice(BaseModel):
draft_logprobs: Optional[CompletionLogprobs] = None
prompt_logprobs: Optional[PromptLogprobs] = None
reasoning_content: Optional[str] = None
finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop", "abort"]] = None
finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop", "abort", "pd_reschedule"]] = None
tool_calls: Optional[List[DeltaToolCall | ToolCall]] = None
speculate_metrics: Optional[SpeculateMetrics] = None

Expand Down Expand Up @@ -415,7 +415,7 @@ class CompletionResponseStreamChoice(BaseModel):
prompt_tokens: Optional[str] = None
completion_tokens: Optional[str] = None
reasoning_content: Optional[str] = None
finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop", "abort"]] = None
finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop", "abort", "pd_reschedule"]] = None
tool_calls: Optional[List[DeltaToolCall | ToolCall]] = None
speculate_metrics: Optional[SpeculateMetrics] = None

Expand Down
27 changes: 24 additions & 3 deletions fastdeploy/entrypoints/openai/serving_chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -614,10 +614,28 @@ async def chat_completion_full_generator(
request=request,
)
async for data in generator:
if data.get("error_code", 200) != 200:
raise ValueError("{}".format(data["error_msg"]))
idx = int(data["request_id"].split("_")[-1])
# api_server_logger.debug(f"Client {request_id} received: {data}")
if data.get("error_code", 200) != 200:
# Error response - include already-generated tokens in the response
if completion_token_ids[idx]:
text = self.engine_client.data_processor.tokenizer.decode(
completion_token_ids[idx], skip_special_tokens=True
)
else:
text = ""
data["outputs"] = {

This comment was marked as outdated.

"text": text,
"completion_tokens": text,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Bug completion_tokens 字段被赋值为字符串 text,但该字段应为 token 数量(整数)。

这会导致下游消费 completion_tokens 的逻辑(如 usage 统计、计费、限速)得到错误类型,可能引发 TypeError 或静默计算错误。

建议修复:

"completion_tokens": len(completion_token_ids[idx]),

"reasoning_content": "",
"tool_calls": None,
"reasoning_token_num": 0,
"num_image_tokens": 0,
"token_ids": [],
"top_logprobs": None,
"draft_top_logprobs": None,
}
data["metrics"] = data.get("metrics") or {}
data["finished"] = True
previous_num_tokens[idx] += len(data["outputs"]["token_ids"])
completion_token_ids[idx].extend(data["outputs"]["token_ids"])
# The logprob for handling the response
Expand Down Expand Up @@ -804,6 +822,9 @@ async def _create_chat_completion_choice(

if data.get("error_msg", None) is not None and "Aborted" in data["error_msg"]:
finish_reason = "abort"

if data.get("error_msg", None) is not None and "PD Error" in data["error_msg"]:
finish_reason = "pd_reschedule"
return ChatCompletionResponseChoice(
index=idx,
message=message,
Expand Down
11 changes: 11 additions & 0 deletions fastdeploy/input/base_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,17 @@ def process_response_dict(self, response_dict, **kwargs):

``stream`` is read from ``kwargs`` (default: True).
"""
# Error responses (e.g., preemption) have outputs=None or error_code!=200.
# Skip token decoding and return as-is to let upstream error handling take over.
if isinstance(response_dict, dict):
outputs = response_dict.get("outputs")
error_code = response_dict.get("error_code", 200)
else:
outputs = getattr(response_dict, "outputs", None)
error_code = getattr(response_dict, "error_code", 200)
if outputs is None or error_code != 200:
return response_dict

stream = kwargs.get("stream", True)
if stream:
return self.process_response_dict_streaming(response_dict, **kwargs)
Expand Down
9 changes: 6 additions & 3 deletions fastdeploy/output/token_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,8 +528,11 @@ def _recycle_resources(self, task_id, index, task, result=None, is_prefill=False
self.prefill_result_status[finished_task_id[0]] = finished_task_id[1]
if task_id in self.prefill_result_status:
if self.prefill_result_status[task_id] != "finished":
result.error_code = 400
result.error_message = f"{task_id} failed to {self.prefill_result_status[task_id]}"
result.error_code = 501
result.error_msg = (
f"PD Error: prefill failed to send cache to decode, "
f"{task_id}, {self.prefill_result_status[task_id]}"
)
llm_logger.info(
f"wait for sending cache, request_id: {task_id}, cost seconds: {time.time()-start_time:.5f}"
)
Expand Down Expand Up @@ -737,7 +740,7 @@ def _process_batch_output(self):
batch_result = list()
# reschedule
for i in range(batch):
if self.resource_manager.stop_flags[i]:
if self.resource_manager.stop_flags[i] or self.resource_manager.tasks_list[i] is None:
continue

recovery_stop = False
Expand Down
Loading
Loading