Skip to content

Commit 89b82ab

Browse files
patkivikramwbarnha
andauthored
fix for issue 419 (#420)
* fix for issue 419 * Revert "fix for issue 419" This reverts commit e68821e. * throw in ensure_future to make sure coroutines get converted * lint conductor.pyx Co-authored-by: William Barnhart <william.barnhart@he360.com> Co-authored-by: William Barnhart <williambbarnhart@gmail.com>
1 parent a48a7c4 commit 89b82ab

2 files changed

Lines changed: 3 additions & 3 deletions

File tree

faust/transport/_cython/conductor.pyx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# cython: language_level=3
2-
from asyncio import ALL_COMPLETED, wait
2+
from asyncio import ALL_COMPLETED, ensure_future, wait
33

44
from faust.exceptions import KeyDecodeError, ValueDecodeError
55

@@ -75,7 +75,7 @@ cdef class ConductorHandler:
7575
continue
7676
delivered.add(chan)
7777
if full:
78-
await wait([self._handle_full(event, chan, delivered)
78+
await wait([ensure_future(self._handle_full(event, chan, delivered))
7979
for event, chan in full],
8080
return_when=ALL_COMPLETED)
8181
except KeyDecodeError as exc:

faust/transport/conductor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ async def on_message(message: Message) -> None:
168168
on_topic_buffer_full(dest_chan)
169169
await asyncio.wait(
170170
[
171-
dest_chan.put(dest_event)
171+
asyncio.ensure_future(dest_chan.put(dest_event))
172172
for dest_event, dest_chan in full
173173
],
174174
return_when=asyncio.ALL_COMPLETED,

0 commit comments

Comments
 (0)