Skip to content

Commit 90a5d01

Browse files
committed
v0.17.0
- drop Readable prefetch buffer; #receive dequeues single message - tiny routing.drain_send_queue cleanup
1 parent 36f00f6 commit 90a5d01

8 files changed

Lines changed: 16 additions & 87 deletions

File tree

CHANGELOG.md

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,15 @@
11
# Changelog
22

3-
## Unreleased
3+
## 0.17.0 — 2026-04-10
4+
5+
### Changed
6+
7+
- **`Readable#receive` no longer prefetches a batch.** Each `#receive`
8+
call dequeues exactly one message from the engine recv queue. The
9+
per-socket prefetch buffer (`@recv_buffer` + `@recv_mutex`) and
10+
`dequeue_recv_batch` are gone, along with `Readable::RECV_BATCH_SIZE`.
11+
Simpler code; ~5–10% inproc microbench regression accepted (tcp/ipc
12+
unchanged — wire I/O dominates dispatch overhead).
413

514
### Added
615

lib/omq/engine.rb

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -235,30 +235,6 @@ def dequeue_recv
235235
end
236236

237237

238-
# Dequeues up to +max+ messages or +max_bytes+ total. Blocks
239-
# on the first, then drains non-blocking.
240-
#
241-
# @param max [Integer] message count limit
242-
# @param max_bytes [Integer] byte size limit
243-
# @return [Array<Array<String>>]
244-
#
245-
def dequeue_recv_batch(max, max_bytes: 1 << 20)
246-
raise @fatal_error if @fatal_error
247-
queue = routing.recv_queue
248-
msg = queue.dequeue
249-
raise @fatal_error if msg.nil? && @fatal_error
250-
batch = [msg]
251-
bytes = msg.sum(&:bytesize)
252-
while batch.size < max && bytes < max_bytes
253-
msg = queue.dequeue(timeout: 0)
254-
break unless msg
255-
batch << msg
256-
bytes += msg.sum(&:bytesize)
257-
end
258-
batch
259-
end
260-
261-
262238
# Pushes a nil sentinel into the recv queue, unblocking a
263239
# pending {#dequeue_recv} with a nil return value.
264240
#

lib/omq/queue_interface.rb

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,7 @@ module QueueReadable
1616
# @raise [IO::TimeoutError] if timeout exceeded
1717
#
1818
def dequeue(timeout: @options.read_timeout)
19-
msg = @recv_mutex.synchronize { @recv_buffer.shift }
20-
return msg if msg
21-
22-
batch = Reactor.run { with_timeout(timeout) { @engine.dequeue_recv_batch(Readable::RECV_BATCH_SIZE) } }
23-
msg = batch.shift
24-
@recv_mutex.synchronize { @recv_buffer.concat(batch) } unless batch.empty?
25-
msg
19+
Reactor.run { with_timeout(timeout) { @engine.dequeue_recv } }
2620
end
2721

2822
alias_method :pop, :dequeue

lib/omq/readable.rb

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,13 @@ module OMQ
88
module Readable
99
include QueueReadable
1010

11-
# Maximum messages to prefetch from the recv queue per drain.
12-
RECV_BATCH_SIZE = 64
13-
14-
15-
# Receives the next message. Returns from a local prefetch
16-
# buffer when available, otherwise drains up to
17-
# {RECV_BATCH_SIZE} messages from the recv queue in one
18-
# synchronized dequeue.
11+
# Receives the next message directly from the engine recv queue.
1912
#
2013
# @return [Array<String>] message parts
2114
# @raise [IO::TimeoutError] if read_timeout exceeded
2215
#
2316
def receive
24-
@recv_mutex.synchronize { @recv_buffer.shift } || fill_recv_buffer
17+
Reactor.run { with_timeout(@options.read_timeout) { @engine.dequeue_recv } }
2518
end
2619

2720

@@ -33,14 +26,5 @@ def receive
3326
def wait_readable(timeout = @options.read_timeout)
3427
true
3528
end
36-
37-
private
38-
39-
def fill_recv_buffer
40-
batch = Reactor.run { with_timeout(@options.read_timeout) { @engine.dequeue_recv_batch(RECV_BATCH_SIZE) } }
41-
msg = batch.shift
42-
@recv_mutex.synchronize { @recv_buffer.concat(batch) } unless batch.empty?
43-
msg
44-
end
4529
end
4630
end

lib/omq/routing.rb

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,7 @@ def self.build_queue(hwm, on_mute)
7070
#
7171
def self.drain_send_queue(queue, batch)
7272
loop do
73-
msg = queue.dequeue(timeout: 0)
74-
break unless msg
73+
msg = queue.dequeue(timeout: 0) or break
7574
batch << msg
7675
end
7776
end

lib/omq/socket.rb

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -316,9 +316,7 @@ def _init_engine(socket_type, linger:, send_hwm: nil, recv_hwm: nil,
316316
@options.recv_timeout = recv_timeout if recv_timeout
317317
@options.conflate = conflate
318318
@options.on_mute = on_mute if on_mute
319-
@recv_buffer = []
320-
@recv_mutex = Mutex.new
321-
@engine = case backend
319+
@engine = case backend
322320
when nil, :ruby
323321
Engine.new(socket_type, @options)
324322
when :ffi

lib/omq/version.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# frozen_string_literal: true
22

33
module OMQ
4-
VERSION = "0.16.2"
4+
VERSION = "0.17.0"
55
end

test/omq/push_pull_test.rb

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -353,37 +353,6 @@
353353
end
354354
end
355355

356-
it "prefetch batch is capped by byte size" do
357-
Async do
358-
pull = OMQ::PULL.bind("inproc://pushpull-prefetch-bytes")
359-
push = OMQ::PUSH.connect("inproc://pushpull-prefetch-bytes")
360-
361-
# Each message is ~512 KB, so 1 MB limit should cap at 2 messages
362-
big = "x" * (512 * 1024)
363-
5.times { push.send(big) }
364-
365-
# Let messages arrive
366-
sleep 0.01
367-
368-
# Peek at internal prefetch: first receive triggers a batch drain
369-
msg1 = pull.receive
370-
assert_equal big, msg1.first
371-
372-
# Check recv_buffer size — should have at most 1 more (batch was ~2)
373-
buffer_size = pull.instance_variable_get(:@recv_mutex).synchronize do
374-
pull.instance_variable_get(:@recv_buffer).size
375-
end
376-
assert_operator buffer_size, :<=, 2, "prefetch should be capped by 1 MB byte limit"
377-
378-
# Drain remaining
379-
4.times { pull.receive }
380-
ensure
381-
push&.close
382-
pull&.close
383-
end
384-
end
385-
386-
387356
it "unbounded via HWM=nil" do
388357
Async do
389358
push = OMQ::PUSH.new(nil, linger: 0)

0 commit comments

Comments
 (0)