@@ -46,30 +46,62 @@ OMQ brings all of this to Ruby without C extensions or FFI.
4646
4747## Task tree
4848
49- Every socket spawns a tree of Async tasks. All tasks are ** transient** --
50- they don't prevent the reactor from exiting when user code finishes.
51- The reactor stays alive during linger because ` Socket#close ` blocks
52- (inside the user's fiber) until send queues drain -- transient tasks
53- are cleaned up afterward.
49+ A bare ` OMQ::PUSH.new ` spawns nothing. The first ` #bind ` or ` #connect `
50+ captures the caller's Async parent and lazily creates a
51+ ** socket-level ` Async::Barrier ` ** under it. From that point on, every
52+ pump, listener, reconnect loop, and per-connection supervisor lives
53+ under that one barrier, so teardown is a single call -- ` barrier.stop `
54+ cascade-cancels every descendant at once. The parent capture is
55+ one-shot: subsequent bind/connect calls reuse the same barrier.
56+
57+ All spawned tasks are ** transient** so they don't prevent the reactor
58+ from exiting when user code finishes. The reactor stays alive during
59+ linger because ` Socket#close ` blocks (inside the user's fiber) until
60+ send queues drain.
5461
5562```
56- Async (user code)
57- |-- tcp accept tcp://... per bind endpoint
58- |-- conn tcp://... [accepted] per accepted peer
59- | |-- heartbeat PING/PONG keepalive
60- | |-- recv pump conn -> recv_queue (or reaper for write-only)
61- | +-- (subscription listener) PUB/RADIO: reads SUBSCRIBE/JOIN commands
62- |-- conn tcp://... [connected] per outgoing peer
63- | |-- heartbeat
64- | +-- recv pump
65- | +-- send pump conn <- send_queue (per-connection)
66- +-- reconnect tcp://... outgoing endpoint retry loop
63+ parent_task Async::Task.current, Reactor root,
64+ | or user's parent (bind/connect parent:)
65+ +-- SocketLifecycle#barrier Async::Barrier -- single cascade handle
66+ |
67+ |-- listener accept loops one per bind endpoint
68+ |-- mechanism maintenance CURVE key refresh, etc.
69+ |-- reconnect loops one per dialed endpoint
70+ |
71+ |-- conn <ep> task spawn_connection (handshake + done.wait)
72+ |-- conn <ep> supervisor waits on per-conn barrier, runs lost!
73+ |
74+ +-- (ConnectionLifecycle#barrier, nested under socket barrier)
75+ |-- send pump per-connection, work-stealing on socket queue
76+ |-- recv pump or reaper for write-only sockets
77+ |-- heartbeat PING/PONG keepalive
78+ +-- (subscription listener) PUB/XPUB/RADIO only
6779```
6880
69- ** Per-connection subtree.** Each connection gets its own task whose children
70- are the heartbeat, recv pump (or reaper), the send pump, and any protocol
71- listeners. When the connection dies, the entire subtree is cleaned up by
72- Async. No orphaned tasks, no reparenting.
81+ ** Two barriers.** ` SocketLifecycle#barrier ` is the socket-level cascade
82+ handle -- ` Engine#close ` /` #stop ` call ` .stop ` on it once and every
83+ descendant unwinds. ` ConnectionLifecycle#barrier ` is a nested per-connection
84+ barrier (its parent is the socket barrier) so a supervisor can detect
85+ "one of * this* connection's pumps exited" via ` @barrier.wait { ... } ` and
86+ cascade-cancel only that connection's siblings -- without taking down
87+ peers on other connections.
88+
89+ ** Supervisor pattern.** Each connection has a supervisor task spawned
90+ on the * socket* barrier (deliberately not on the per-conn barrier) that
91+ waits for the first pump to finish and runs ` lost! ` in ` ensure ` . Placing
92+ the supervisor outside the per-conn barrier avoids the self-stop footgun:
93+ ` tear_down! ` can safely call ` @barrier.stop ` on the per-conn barrier
94+ because the current task (the supervisor) is not a member of it. If the
95+ supervisor were inside, stopping the barrier would raise ` Async::Cancel `
96+ on itself synchronously and unwind mid-teardown.
97+
98+ ** User-provided parents.** ` Socket#bind(ep, parent: my_barrier) ` and
99+ ` Socket#connect(ep, parent: ...) ` wire the socket barrier under the
100+ caller's Async parent -- any object responding to ` #async `
101+ (` Async::Task ` , ` Async::Barrier ` , ` Async::Semaphore ` ). The whole socket
102+ subtree then participates in the caller's teardown tree: stopping the
103+ caller's barrier cascades through the socket barrier and every pump.
104+ The first bind/connect captures the parent; subsequent calls are no-ops.
73105
74106** Send pumps are per-connection, queue is per-socket.** Each connection runs
75107its own send pump fiber that dequeues from the * socket-level* send queue and
@@ -82,8 +114,8 @@ to all matching subscribers.
82114
83115** Reaper tasks.** Write-only sockets (PUSH, SCATTER) have no recv pump.
84116Instead, a "reaper" task calls ` receive_message ` which blocks until the peer
85- disconnects, then triggers ` connection_lost ` . Without it, a dead peer is only
86- detected on the next send.
117+ disconnects -- the supervisor observes the pump exit and runs ` lost! ` .
118+ Without the reaper, a dead peer is only detected on the next send.
87119
88120## Engine lifecycle
89121
@@ -111,10 +143,24 @@ close
111143 |-- stop listeners (if connections exist)
112144 |-- linger: drain send queues (keep listeners if no peers yet)
113145 |-- stop remaining listeners
114- |-- close all connections
115- +-- stop routing + reconnect tasks
146+ |-- socket barrier.stop -- cascade-cancels every pump, supervisor,
147+ | reconnect loop, accept loop in one call
148+ +-- routing.stop
116149```
117150
151+ ** Close vs. stop.** ` Socket#close ` is the graceful verb: linger drain,
152+ then cascade. ` Socket#stop ` skips the linger drain and goes straight to
153+ ` barrier.stop ` -- used when the caller wants an immediate hard stop
154+ (e.g., on signal, or when the caller's own parent barrier is being
155+ stopped and pending sends should be dropped).
156+
157+ ** Convergent teardown.** ` Socket#close ` , ` Socket#stop ` , and
158+ peer-disconnect (supervisor-driven ` lost! ` ) all funnel into the same
159+ ` ConnectionLifecycle#tear_down! ` with identical ordering: routing
160+ removal -> connection close -> monitor event -> done-promise resolve ->
161+ per-conn ` barrier.stop ` . The state guard (` :closed ` ) makes it idempotent
162+ so racing pumps can't double-fire side effects.
163+
118164** Linger.** On close, send queues are drained for up to ` linger ` seconds.
119165If no peers are connected, listeners stay open so late-arriving peers can
120166still receive queued messages. ` linger=0 ` closes immediately.
@@ -199,6 +245,27 @@ For fan-out (PUB/RADIO), one published message is written to all matching
199245subscribers before flushing -- so N subscribers see 1 flush each, not N
200246flushes per message.
201247
248+ ## Cancellation safety
249+
250+ The ` barrier.stop ` cascade can deliver ` Async::Cancel ` to a send-pump
251+ fiber at any await point. The unencrypted ZMTP path issues two
252+ separate ` @io.write ` calls per frame (header, then body), so a cancel
253+ arriving between them would leave the peer's framer reading a body
254+ that never arrives -- unrecoverable without closing the connection.
255+
256+ ` Protocol::ZMTP::Connection ` wraps every wire-write entry point
257+ (` send_message ` , ` write_message ` , ` write_messages ` , ` write_wire ` ,
258+ ` send_command ` ) in ` Async::Task#defer_cancel ` . Cancellation requested
259+ during a write is held until the block exits at a frame boundary, then
260+ re-raised normally. The mechanism is orthogonal to the connection's
261+ internal ` Mutex ` -- the mutex serializes thread races, ` defer_cancel `
262+ serializes fiber cancellations. Both are required.
263+
264+ ` defer_cancel ` only delays cancellation arriving from outside the
265+ block. Exceptions raised by the write itself (` EPIPE ` , ` EOFError ` ,
266+ ` ECONNRESET ` ) propagate immediately -- that's the peer-disconnect
267+ path the supervisor relies on.
268+
202269## Recv pump fairness
203270
204271Each connection gets its own recv pump fiber that reads messages and
0 commit comments