Skip to content

Commit f6fc919

Browse files
committed
v0.27.0 - transport-supplied ZMTP Connection class
Transport modules may define .connection_class to substitute their own Protocol::ZMTP::Connection-shaped class. ConnectionLifecycle queries it (with a respond_to? fallback) so existing transports — built-in or third-party — keep working unchanged. Enables plugin transports whose wire shape differs from ZMTP/3.1 (e.g. omq-websocket per RFC 45) to plug in without forking the engine.
1 parent f752f1a commit f6fc919

7 files changed

Lines changed: 53 additions & 9 deletions

File tree

CHANGELOG.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,18 @@
11
# Changelog
22

3+
## 0.27.0 — 2026-04-20
4+
5+
### Added
6+
7+
- **Transport-supplied ZMTP Connection class.** Transport modules may
8+
now define `.connection_class` to substitute their own
9+
`Protocol::ZMTP::Connection`-shaped class. `ConnectionLifecycle`
10+
reads it (with a `respond_to?` fallback to
11+
`Protocol::ZMTP::Connection`) so existing transports — built-in or
12+
third-party — keep working unchanged. Enables plugin transports
13+
whose wire shape differs from ZMTP/3.1 (e.g. ZeroMQ-over-WebSocket
14+
per RFC 45) to plug in without forking the engine.
15+
316
## 0.26.2 — 2026-04-20
417

518
### Fixed

lib/omq/engine.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -635,7 +635,8 @@ def transport_for(endpoint)
635635
def spawn_connection(io, as_server:, endpoint: nil)
636636
@lifecycle.barrier&.async(transient: true, annotation: "conn #{endpoint}") do
637637
done = Async::Promise.new
638-
lifecycle = ConnectionLifecycle.new(self, endpoint: endpoint, done: done)
638+
transport = endpoint ? transport_for(endpoint) : nil
639+
lifecycle = ConnectionLifecycle.new(self, endpoint: endpoint, done: done, transport: transport)
639640
lifecycle.handshake!(io, as_server: as_server)
640641
done.wait
641642
rescue Async::Stop, Async::Cancel

lib/omq/engine/connection_lifecycle.rb

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -66,13 +66,19 @@ class InvalidTransition < RuntimeError
6666
# @param engine [Engine]
6767
# @param endpoint [String, nil]
6868
# @param done [Async::Promise, nil] resolved when connection is lost
69+
# @param transport [Module, nil] transport module that produced +io+;
70+
# queried for {.connection_class} so plugins (e.g. WebSocket) can
71+
# substitute their own ZMTP-shaped connection class. Falls back to
72+
# {Protocol::ZMTP::Connection} when nil or when the transport
73+
# doesn't define +connection_class+.
6974
#
70-
def initialize(engine, endpoint: nil, done: nil)
71-
@engine = engine
72-
@endpoint = endpoint
73-
@done = done
74-
@state = :new
75-
@conn = nil
75+
def initialize(engine, endpoint: nil, done: nil, transport: nil)
76+
@engine = engine
77+
@endpoint = endpoint
78+
@done = done
79+
@transport = transport
80+
@state = :new
81+
@conn = nil
7682

7783
# Nest the per-connection barrier under the socket-level barrier
7884
# so every pump spawned via +@barrier.async+ is also tracked by
@@ -90,7 +96,8 @@ def initialize(engine, endpoint: nil, done: nil)
9096
#
9197
def handshake!(io, as_server:)
9298
transition!(:handshaking)
93-
conn = Protocol::ZMTP::Connection.new io,
99+
conn_class = @transport.respond_to?(:connection_class) ? @transport.connection_class : Protocol::ZMTP::Connection
100+
conn = conn_class.new io,
94101
socket_type: @engine.socket_type.to_s,
95102
identity: @engine.options.identity,
96103
as_server: as_server,

lib/omq/transport/ipc.rb

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,15 @@ module IPC
1515

1616

1717
class << self
18+
# ZMTP connection class used for IPC-accepted/dialed peers.
19+
#
20+
# @return [Class]
21+
#
22+
def connection_class
23+
Protocol::ZMTP::Connection
24+
end
25+
26+
1827
# Creates a bound IPC listener.
1928
#
2029
# @param endpoint [String] e.g. "ipc:///tmp/my.sock" or "ipc://@abstract"

lib/omq/transport/tcp.rb

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,15 @@ module TCP
1313

1414

1515
class << self
16+
# ZMTP connection class used for TCP-accepted/dialed peers.
17+
#
18+
# @return [Class]
19+
#
20+
def connection_class
21+
Protocol::ZMTP::Connection
22+
end
23+
24+
1625
# Creates a bound TCP listener.
1726
#
1827
# @param endpoint [String] e.g. "tcp://127.0.0.1:5555" or "tcp://*:0"

lib/omq/version.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# frozen_string_literal: true
22

33
module OMQ
4-
VERSION = "0.26.2"
4+
VERSION = "0.27.0"
55
end

test/omq/transport/tcp_test.rb

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,11 @@
33
require_relative "../../test_helper"
44

55
describe "TCP transport" do
6+
it "advertises Protocol::ZMTP::Connection as its connection_class" do
7+
assert_equal Protocol::ZMTP::Connection, OMQ::Transport::TCP.connection_class
8+
end
9+
10+
611
it "PAIR over TCP with ephemeral port" do
712
Async do
813
server = OMQ::PAIR.new

0 commit comments

Comments
 (0)