Skip to content

Commit dbdf559

Browse files
committed
Fix NPE in PendingAddOp.maybeTimeout() when clientCtx is null after recycling
monitorPendingAddOps() iterates the pendingAddOps queue and calls maybeTimeout() on each op. Concurrently, sendAddSuccessCallbacks() can remove a completed op from the queue and, via submitCallback(), lead to recyclePendAddOpObject() which sets clientCtx = null. If the scheduler thread still holds an iterator reference to that op and then calls maybeTimeout(), the dereference of clientCtx causes an NPE. The race is triggered in practice when addEntryQuorumTimeoutNanos > 0 (i.e. the quorum-timeout monitor is enabled). It became visible with Netty 4.1.130, which changed Recycler thread-scheduling behavior and made the narrow window between the iterator snapshot and the null assignment observable. Fix: - Make clientCtx volatile so that the null write in the synchronized recyclePendAddOpObject() is immediately visible to the unsynchronized maybeTimeout() reader. - Guard the top of maybeTimeout() with an explicit null check: if clientCtx is null the op has already completed and recycled, so there is nothing to time out and the method returns false. Closes: #4759
1 parent 7410987 commit dbdf559

2 files changed

Lines changed: 85 additions & 1 deletion

File tree

bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ class PendingAddOp implements WriteCallback {
6666
boolean completed = false;
6767

6868
LedgerHandle lh;
69-
ClientContext clientCtx;
69+
volatile ClientContext clientCtx;
7070
boolean isRecoveryAdd = false;
7171
volatile long requestTimeNanos;
7272
long qwcLatency; // Quorum Write Completion Latency after response from quorum bookies.
@@ -154,6 +154,12 @@ private void sendWriteRequest(List<BookieId> ensemble, int bookieIndex) {
154154
}
155155

156156
boolean maybeTimeout() {
157+
if (clientCtx == null) {
158+
// Op has already been recycled: recyclePendAddOpObject() cleared clientCtx while
159+
// monitorPendingAddOps() was still holding a reference to it from the iterator.
160+
// The add-entry completed before the timeout monitor fired; nothing to time out.
161+
return false;
162+
}
157163
if (MathUtils.elapsedNanos(requestTimeNanos) >= clientCtx.getConf().addEntryQuorumTimeoutNanos) {
158164
timeoutQuorumWait();
159165
return true;

bookkeeper-server/src/test/java/org/apache/bookkeeper/client/PendingAddOpTest.java

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,22 @@
2020

2121
import static java.nio.charset.StandardCharsets.UTF_8;
2222
import static org.junit.Assert.assertEquals;
23+
import static org.junit.Assert.assertFalse;
2324
import static org.junit.Assert.assertNull;
2425
import static org.junit.Assert.assertSame;
26+
import static org.junit.Assert.assertTrue;
2527
import static org.mockito.Mockito.mock;
2628
import static org.mockito.Mockito.when;
2729

2830
import io.netty.buffer.ByteBuf;
2931
import io.netty.buffer.Unpooled;
3032
import java.util.concurrent.atomic.AtomicInteger;
3133
import org.apache.bookkeeper.client.BKException.Code;
34+
import org.apache.bookkeeper.client.api.LedgerMetadata;
3235
import org.apache.bookkeeper.client.api.WriteFlag;
36+
import org.apache.bookkeeper.common.util.MathUtils;
3337
import org.apache.bookkeeper.common.util.OrderedExecutor;
38+
import org.apache.bookkeeper.conf.ClientConfiguration;
3439
import org.apache.bookkeeper.proto.BookieClient;
3540
import org.apache.bookkeeper.stats.NullStatsLogger;
3641
import org.junit.Before;
@@ -87,4 +92,77 @@ public void testExecuteAfterCancelled() {
8792
assertNull(op.lh);
8893
}
8994

95+
/**
96+
* Verify that {@link PendingAddOp#maybeTimeout()} returns {@code false} without throwing
97+
* {@link NullPointerException} when {@code clientCtx} is {@code null}.
98+
*
99+
* <p>This is the exact scenario that caused the production NPE reported in
100+
* apache/bookkeeper#4759: {@code monitorPendingAddOps()} holds an iterator reference to
101+
* an op while {@code recyclePendAddOpObject()} runs concurrently on another thread and
102+
* clears {@code clientCtx} to {@code null}. After recycling the op has already completed,
103+
* so the correct behaviour is to skip the timeout check (return {@code false}).
104+
*/
105+
@Test
106+
public void testMaybeTimeoutReturnsFalseWhenClientCtxIsNull() {
107+
PendingAddOp op = PendingAddOp.create(
108+
lh, mockClientContext, lh.getCurrentEnsemble(),
109+
payload, WriteFlag.NONE,
110+
(rc, handle, entryId, qwcLatency, ctx) -> {}, null);
111+
112+
// Simulate the race: recyclePendAddOpObject() cleared clientCtx on the writer
113+
// thread while monitorPendingAddOps() is still iterating the pendingAddOps queue
114+
// on the scheduler thread.
115+
op.clientCtx = null;
116+
117+
// Before the fix this threw NullPointerException; after the fix it must return false.
118+
assertFalse(op.maybeTimeout());
119+
}
120+
121+
/**
122+
* Verify that {@link PendingAddOp#maybeTimeout()} returns {@code false} when
123+
* {@code clientCtx} is non-null and the quorum timeout has not yet elapsed.
124+
*/
125+
@Test
126+
public void testMaybeTimeoutReturnsFalseWhenWithinQuorumTimeout() {
127+
// Configure a 1-hour quorum timeout so the op will not have expired.
128+
ClientConfiguration conf = new ClientConfiguration();
129+
conf.setAddEntryQuorumTimeout(3600);
130+
when(mockClientContext.getConf()).thenReturn(ClientInternalConf.fromConfig(conf));
131+
132+
PendingAddOp op = PendingAddOp.create(
133+
lh, mockClientContext, lh.getCurrentEnsemble(),
134+
payload, WriteFlag.NONE,
135+
(rc, handle, entryId, qwcLatency, ctx) -> {}, null);
136+
// Stamp the request as starting right now so elapsed time is effectively 0.
137+
op.requestTimeNanos = MathUtils.nowInNano();
138+
139+
assertFalse(op.maybeTimeout());
140+
}
141+
142+
/**
143+
* Verify that {@link PendingAddOp#maybeTimeout()} returns {@code true} and triggers
144+
* {@link PendingAddOp#timeoutQuorumWait()} when the quorum timeout has already elapsed.
145+
*/
146+
@Test
147+
public void testMaybeTimeoutReturnsTrueWhenQuorumTimeoutExpired() {
148+
// Configure a 1-second quorum timeout.
149+
ClientConfiguration conf = new ClientConfiguration();
150+
conf.setAddEntryQuorumTimeout(1);
151+
when(mockClientContext.getConf()).thenReturn(ClientInternalConf.fromConfig(conf));
152+
153+
LedgerMetadata meta = mock(LedgerMetadata.class);
154+
when(lh.getLedgerMetadata()).thenReturn(meta);
155+
// addEntrySuccessBookies starts empty (size 0 < ackQuorumSize 3),
156+
// so the fault-domain stats branch is skipped and no extra mocking is needed.
157+
when(meta.getAckQuorumSize()).thenReturn(3);
158+
159+
PendingAddOp op = PendingAddOp.create(
160+
lh, mockClientContext, lh.getCurrentEnsemble(),
161+
payload, WriteFlag.NONE,
162+
(rc, handle, entryId, qwcLatency, ctx) -> {}, null);
163+
// Set the request start time to 0 so that elapsed nanos is enormous (>> 1 second).
164+
op.requestTimeNanos = 0L;
165+
166+
assertTrue(op.maybeTimeout());
167+
}
90168
}

0 commit comments

Comments
 (0)