Skip to content

Commit d7f06d8

Browse files
committed
v0.22.1 — reduce per-message allocations in send pumps and routing
Reuse batch arrays across send pump cycles. Consolidate blocking-dequeue + non-blocking-sweep into Routing.dequeue_batch. REP envelope as [conn, envelope] + concat instead of Hash + splat. REQ transform_send uses dup.unshift instead of splat. Bench harness accepts OMQ_BENCH_SIZES, OMQ_BENCH_TRANSPORTS, OMQ_BENCH_PEERS env vars.
1 parent a8dd2d1 commit d7f06d8

11 files changed

Lines changed: 70 additions & 23 deletions

File tree

CHANGELOG.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,29 @@
11
# Changelog
22

3+
## 0.22.1 — 2026-04-16
4+
5+
### Changed
6+
7+
- **Reuse batch arrays in send pumps.** All send pumps (RoundRobin,
8+
Pair, ConnSendPump, FanOut, FanOut-conflate) now pre-allocate a
9+
single batch array and clear it between cycles instead of
10+
allocating a fresh `[msg]` per dequeue.
11+
12+
- **`Routing.dequeue_batch`** consolidates the blocking-dequeue +
13+
non-blocking-sweep pattern that was duplicated across four call
14+
sites into one method. `dequeue_batch_capped` does the same for
15+
the byte/message-capped RoundRobin variant.
16+
17+
- **REP envelope stored as `[conn, envelope]`** instead of a Hash,
18+
and reply assembly uses `<<` + `concat` instead of double splat.
19+
20+
- **Heartbeat drops redundant `context: "".b`** — the default is
21+
now `EMPTY_BINARY` in protocol-zmtp.
22+
23+
- **Bench harness accepts `OMQ_BENCH_SIZES`, `OMQ_BENCH_TRANSPORTS`,
24+
and `OMQ_BENCH_PEERS`** env vars to scope runs without editing
25+
code.
26+
327
## 0.22.0 — 2026-04-15
428

529
### Fixed

bench/bench_helper.rb

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
module BenchHelper
2424
# ×4 geometric sweep from 128 B to 32 KiB.
25-
SIZES = [128, 512, 2048, 8192, 32_768].freeze
25+
SIZES = (ENV["OMQ_BENCH_SIZES"] || "128,512,2048,8192,32768").split(",").map(&:to_i).freeze
2626

2727
# Each cell runs ROUNDS timed rounds and reports the fastest one.
2828
# Transient jitter (GC, scheduler preemption, YJIT tier-up, kernel
@@ -58,7 +58,7 @@ module BenchHelper
5858
# curve and blake3 are intentionally excluded from this suite.
5959
# CURVE regressions are caught by protocol-zmtp tests; BLAKE3-ZMTP
6060
# perf is tracked in the omq-rfc-blake3zmq repo.
61-
TRANSPORTS = %w[inproc ipc tcp].freeze
61+
TRANSPORTS = (ENV["OMQ_BENCH_TRANSPORTS"] || "inproc,ipc,tcp").split(",").freeze
6262

6363
def run_id
6464
@run_id ||= ENV["OMQ_BENCH_RUN_ID"] || Time.now.strftime("%Y-%m-%dT%H:%M:%S")
@@ -70,6 +70,7 @@ def run_id
7070
RUN_TIMEOUT = Integer(ENV.fetch("OMQ_BENCH_TIMEOUT", 30))
7171

7272
def run(label, dir:, peer_counts: [1, 3], &block)
73+
peer_counts = ENV["OMQ_BENCH_PEERS"].split(",").map(&:to_i) if ENV["OMQ_BENCH_PEERS"]
7374
pattern = File.basename(dir)
7475
jit = defined?(RubyVM::YJIT) && RubyVM::YJIT.enabled? ? "+YJIT" : "no JIT"
7576
puts "#{label} | OMQ #{OMQ::VERSION} | Ruby #{RUBY_VERSION} (#{jit}) | #{KERNEL}"

lib/omq/engine/heartbeat.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ def self.start(parent, conn, options, tasks)
2424
tasks << parent.async(transient: true, annotation: "heartbeat") do
2525
loop do
2626
sleep interval
27-
conn.send_command(Protocol::ZMTP::Codec::Command.ping(ttl: ttl, context: "".b))
27+
conn.send_command(Protocol::ZMTP::Codec::Command.ping(ttl: ttl))
2828
if conn.heartbeat_expired?(timeout)
2929
conn.close
3030
break

lib/omq/routing.rb

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ def self.build_queue(hwm, on_mute)
5757

5858

5959
# Drains all available messages from +queue+ into +batch+ without
60-
# blocking. Call after the initial blocking dequeue.
60+
# Blocks for the first message, then sweeps all immediately
61+
# available messages into +batch+ without blocking.
6162
#
6263
# No cap is needed: IO::Stream auto-flushes at 64 KB, so the
6364
# write buffer hits the wire naturally under sustained load.
@@ -67,7 +68,9 @@ def self.build_queue(hwm, on_mute)
6768
# @param batch [Array]
6869
# @return [void]
6970
#
70-
def self.drain_send_queue(queue, batch)
71+
def self.dequeue_batch(queue, batch = [])
72+
batch << queue.dequeue
73+
7174
loop do
7275
msg = queue.dequeue(timeout: 0) or break
7376
batch << msg

lib/omq/routing/conn_send_pump.rb

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@ module ConnSendPump
1818
#
1919
def self.start(engine, conn, q, tasks)
2020
task = engine.spawn_conn_pump_task(conn, annotation: "send pump") do
21+
batch = []
22+
2123
loop do
22-
batch = [q.dequeue]
23-
Routing.drain_send_queue(q, batch)
24+
Routing.dequeue_batch(q, batch)
2425

2526
if batch.size == 1
2627
conn.write_message batch.first
@@ -30,7 +31,11 @@ def self.start(engine, conn, q, tasks)
3031

3132
conn.flush
3233

33-
batch.each { |parts| engine.emit_verbose_msg_sent(conn, parts) }
34+
batch.each do |parts|
35+
engine.emit_verbose_msg_sent(conn, parts)
36+
end
37+
38+
batch.clear
3439
end
3540
end
3641

lib/omq/routing/fan_out.rb

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -181,14 +181,16 @@ def start_conn_send_pump(conn, q)
181181
#
182182
def start_conn_send_pump_normal(conn, q, use_wire)
183183
@engine.spawn_conn_pump_task(conn, annotation: "send pump") do
184+
batch = []
185+
184186
loop do
185-
batch = [q.dequeue]
186-
Routing.drain_send_queue(q, batch)
187+
Routing.dequeue_batch(q, batch)
187188

188189
if write_matching_batch(conn, batch, use_wire)
189190
conn.flush
190191
batch.each { |parts| @engine.emit_verbose_msg_sent(conn, parts) }
191192
end
193+
batch.clear
192194
end
193195
end
194196
end
@@ -227,14 +229,17 @@ def write_matching_batch(conn, batch, use_wire)
227229
#
228230
def start_conn_send_pump_conflate(conn, q)
229231
@engine.spawn_conn_pump_task(conn, annotation: "send pump") do
232+
batch = []
233+
230234
loop do
231-
batch = [q.dequeue]
232-
Routing.drain_send_queue(q, batch)
235+
Routing.dequeue_batch(q, batch)
233236

234237
# Keep only the latest message that matches the subscription.
235238
latest = batch.reverse.find do |parts|
236239
subscribed?(conn, parts.first || EMPTY_BINARY)
237240
end
241+
242+
batch.clear
238243
next unless latest
239244

240245
conn.write_message(latest)

lib/omq/routing/pair.rb

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,9 +106,10 @@ def send_queues_drained?
106106

107107
def start_send_pump(conn)
108108
@send_pump = @engine.spawn_conn_pump_task(conn, annotation: "send pump") do
109+
batch = []
110+
109111
loop do
110-
batch = [@send_queue.dequeue]
111-
Routing.drain_send_queue(@send_queue, batch)
112+
Routing.dequeue_batch(@send_queue, batch)
112113

113114
if batch.size == 1
114115
conn.write_message(batch.first)
@@ -120,6 +121,7 @@ def start_send_pump(conn)
120121
batch.each do |parts|
121122
@engine.emit_verbose_msg_sent(conn, parts)
122123
end
124+
batch.clear
123125
end
124126
end
125127

lib/omq/routing/rep.rb

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ def connection_added(connection)
5555
envelope = msg[0, delimiter]
5656
body = msg[(delimiter + 1)..] || []
5757

58-
@pending_replies << { conn: connection, envelope: envelope }
58+
@pending_replies << [connection, envelope]
5959
body
6060
end
6161
@tasks << task if task
@@ -69,7 +69,7 @@ def connection_added(connection)
6969
# @param connection [Connection]
7070
#
7171
def connection_removed(connection)
72-
@pending_replies.reject! { |r| r[:conn] == connection }
72+
@pending_replies.reject! { |r| r[0] == connection }
7373
@conn_queues.delete(connection)
7474
@conn_send_tasks.delete(connection)&.stop
7575
end
@@ -83,8 +83,12 @@ def connection_removed(connection)
8383
def enqueue(parts)
8484
reply_info = @pending_replies.shift
8585
return unless reply_info
86-
conn = reply_info[:conn]
87-
@conn_queues[conn]&.enqueue([*reply_info[:envelope], EMPTY_FRAME, *parts])
86+
87+
conn, envelope = reply_info
88+
msg = envelope
89+
msg << EMPTY_FRAME
90+
msg.concat(parts)
91+
@conn_queues[conn]&.enqueue(msg)
8892
end
8993

9094

lib/omq/routing/req.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ def stop
9393
# REQ prepends empty delimiter frame on the wire.
9494
#
9595
def transform_send(parts)
96-
[EMPTY_BINARY, *parts]
96+
parts.dup.unshift(EMPTY_BINARY)
9797
end
9898

9999
end

lib/omq/routing/round_robin.rb

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -133,9 +133,10 @@ def transform_send(parts)
133133
#
134134
def start_conn_send_pump(conn)
135135
task = @engine.spawn_conn_pump_task(conn, annotation: "send pump") do
136+
batch = []
137+
136138
loop do
137-
batch = [@send_queue.dequeue]
138-
drain_send_queue_capped(batch)
139+
dequeue_batch_capped(batch)
139140
@in_flight += batch.size
140141

141142
begin
@@ -147,6 +148,7 @@ def start_conn_send_pump(conn)
147148
batch.each do |parts|
148149
@engine.emit_verbose_msg_sent(conn, parts)
149150
end
151+
batch.clear
150152

151153
Async::Task.current.yield
152154
end
@@ -157,7 +159,8 @@ def start_conn_send_pump(conn)
157159
end
158160

159161

160-
def drain_send_queue_capped(batch)
162+
def dequeue_batch_capped(batch = [])
163+
batch << @send_queue.dequeue
161164
bytes = batch_bytes(batch.first)
162165

163166
while batch.size < BATCH_MSG_CAP && bytes < BATCH_BYTE_CAP

0 commit comments

Comments
 (0)