Skip to content

Commit eaf186c

Browse files
committed
Add SSE-keepalive to not time out on long prefill on clients
1 parent b4f223e commit eaf186c

2 files changed

Lines changed: 52 additions & 11 deletions

File tree

src/exo/api/keepalive.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
from collections.abc import AsyncIterator
2+
from typing import Final
3+
4+
import anyio
5+
6+
_DONE: Final = object()
7+
8+
9+
async def with_sse_keepalive(
10+
generator: AsyncIterator[str],
11+
keepalive_message: str = ": keep-alive\n\n",
12+
interval: float = 10.0,
13+
) -> AsyncIterator[str]:
14+
yield keepalive_message
15+
send, recv = anyio.create_memory_object_stream[str | object]()
16+
17+
async def _consume() -> None:
18+
async for item in generator:
19+
await send.send(item)
20+
await send.send(_DONE)
21+
22+
async with anyio.create_task_group() as tg:
23+
tg.start_soon(_consume)
24+
while True:
25+
item: str | object | None = None
26+
with anyio.move_on_after(interval):
27+
item = await recv.receive()
28+
if item is None:
29+
yield keepalive_message
30+
elif item is _DONE:
31+
break
32+
else:
33+
assert isinstance(item, str)
34+
yield item

src/exo/api/main.py

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
generate_responses_stream,
4747
responses_request_to_text_generation,
4848
)
49+
from exo.api.keepalive import with_sse_keepalive
4950
from exo.api.types import (
5051
AddCustomModelParams,
5152
AdvancedImageParams,
@@ -780,9 +781,11 @@ async def chat_completions(
780781

781782
if payload.stream:
782783
return StreamingResponse(
783-
generate_chat_stream(
784-
command.command_id,
785-
self._token_chunk_stream(command.command_id),
784+
with_sse_keepalive(
785+
generate_chat_stream(
786+
command.command_id,
787+
self._token_chunk_stream(command.command_id),
788+
),
786789
),
787790
media_type="text/event-stream",
788791
headers={
@@ -1385,10 +1388,12 @@ async def claude_messages(
13851388

13861389
if payload.stream:
13871390
return StreamingResponse(
1388-
generate_claude_stream(
1389-
command.command_id,
1390-
payload.model,
1391-
self._token_chunk_stream(command.command_id),
1391+
with_sse_keepalive(
1392+
generate_claude_stream(
1393+
command.command_id,
1394+
payload.model,
1395+
self._token_chunk_stream(command.command_id),
1396+
),
13921397
),
13931398
media_type="text/event-stream",
13941399
headers={
@@ -1419,10 +1424,12 @@ async def openai_responses(
14191424

14201425
if payload.stream:
14211426
return StreamingResponse(
1422-
generate_responses_stream(
1423-
command.command_id,
1424-
payload.model,
1425-
self._token_chunk_stream(command.command_id),
1427+
with_sse_keepalive(
1428+
generate_responses_stream(
1429+
command.command_id,
1430+
payload.model,
1431+
self._token_chunk_stream(command.command_id),
1432+
),
14261433
),
14271434
media_type="text/event-stream",
14281435
headers={

0 commit comments

Comments
 (0)