Skip to content

Commit e5cb7b8

Browse files
authored
Add SSE-keepalive to not time out on long prefill on clients (#1803)
## Motivation <!-- Why is this change needed? What problem does it solve? --> <!-- If it fixes an open issue, please link to the issue here --> ## Changes <!-- Describe what you changed in detail --> ## Why It Works <!-- Explain why your approach solves the problem --> ## Test Plan ### Manual Testing <!-- Hardware: (e.g., MacBook Pro M1 Max 32GB, Mac Mini M2 16GB, connected via Thunderbolt 4) --> <!-- What you did: --> <!-- - --> ### Automated Testing <!-- Describe changes to automated tests, or how existing tests cover this change --> <!-- - -->
1 parent 635801d commit e5cb7b8

2 files changed

Lines changed: 52 additions & 11 deletions

File tree

src/exo/api/keepalive.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
from collections.abc import AsyncIterator
2+
from typing import Final
3+
4+
import anyio
5+
6+
_DONE: Final = object()
7+
8+
9+
async def with_sse_keepalive(
10+
generator: AsyncIterator[str],
11+
keepalive_message: str = ": keep-alive\n\n",
12+
interval: float = 10.0,
13+
) -> AsyncIterator[str]:
14+
yield keepalive_message
15+
send, recv = anyio.create_memory_object_stream[str | object]()
16+
17+
async def _consume() -> None:
18+
async for item in generator:
19+
await send.send(item)
20+
await send.send(_DONE)
21+
22+
async with anyio.create_task_group() as tg:
23+
tg.start_soon(_consume)
24+
while True:
25+
item: str | object | None = None
26+
with anyio.move_on_after(interval):
27+
item = await recv.receive()
28+
if item is None:
29+
yield keepalive_message
30+
elif item is _DONE:
31+
break
32+
else:
33+
assert isinstance(item, str)
34+
yield item

src/exo/api/main.py

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
generate_responses_stream,
4747
responses_request_to_text_generation,
4848
)
49+
from exo.api.keepalive import with_sse_keepalive
4950
from exo.api.types import (
5051
AddCustomModelParams,
5152
AdvancedImageParams,
@@ -799,9 +800,11 @@ async def chat_completions(
799800

800801
if payload.stream:
801802
return StreamingResponse(
802-
generate_chat_stream(
803-
command.command_id,
804-
self._token_chunk_stream(command.command_id),
803+
with_sse_keepalive(
804+
generate_chat_stream(
805+
command.command_id,
806+
self._token_chunk_stream(command.command_id),
807+
),
805808
),
806809
media_type="text/event-stream",
807810
headers={
@@ -1404,10 +1407,12 @@ async def claude_messages(
14041407

14051408
if payload.stream:
14061409
return StreamingResponse(
1407-
generate_claude_stream(
1408-
command.command_id,
1409-
payload.model,
1410-
self._token_chunk_stream(command.command_id),
1410+
with_sse_keepalive(
1411+
generate_claude_stream(
1412+
command.command_id,
1413+
payload.model,
1414+
self._token_chunk_stream(command.command_id),
1415+
),
14111416
),
14121417
media_type="text/event-stream",
14131418
headers={
@@ -1438,10 +1443,12 @@ async def openai_responses(
14381443

14391444
if payload.stream:
14401445
return StreamingResponse(
1441-
generate_responses_stream(
1442-
command.command_id,
1443-
payload.model,
1444-
self._token_chunk_stream(command.command_id),
1446+
with_sse_keepalive(
1447+
generate_responses_stream(
1448+
command.command_id,
1449+
payload.model,
1450+
self._token_chunk_stream(command.command_id),
1451+
),
14451452
),
14461453
media_type="text/event-stream",
14471454
headers={

0 commit comments

Comments
 (0)