Skip to content

Commit 99d422d

Browse files
Fix: surface selectLive() errors eagerly and improve LiveStream thread safety (#138) (#139)
1 parent 5bf604b commit 99d422d

7 files changed

Lines changed: 448 additions & 78 deletions

File tree

src/main/java/com/surrealdb/LiveStream.java

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,41 @@
33
import java.util.Optional;
44

55
/**
6-
* Blocking iterator over live query notifications. Call {@link #next()} in a
7-
* loop and {@link #close()} when done. Implements {@link AutoCloseable} for
8-
* use in try-with-resources.
6+
* Blocking iterator over live query notifications returned by
7+
* {@link Surreal#selectLive(String)}.
8+
*
9+
* <p>Typical usage:
10+
* <pre>{@code
11+
* try (LiveStream stream = surreal.selectLive("person")) {
12+
* while (true) {
13+
* Optional<LiveNotification> n = stream.next();
14+
* if (!n.isPresent()) break; // stream closed
15+
* process(n.get());
16+
* }
17+
* }
18+
* }</pre>
19+
*
20+
* <p><b>Thread safety:</b> {@link #next()} may be called from one thread while
21+
* {@link #close()} is called from another. The {@code close()} call will
22+
* unblock any thread currently waiting inside {@code next()}. The native
23+
* handle is declared {@code volatile} so that the zeroing performed by
24+
* {@code close()} is immediately visible to concurrent {@code next()} callers.
25+
* Concurrent calls to {@code next()} from multiple threads are serialized by
26+
* a mutex in the native layer.
927
*/
1028
public class LiveStream implements AutoCloseable {
1129

1230
static {
1331
Loader.loadNative();
1432
}
1533

16-
private long handle;
34+
/**
35+
* Pointer to the native {@code LiveStreamChannel}. Zeroed by
36+
* {@link #close()} after the native resources have been released. Declared
37+
* {@code volatile} so that a {@code close()} on one thread is visible to a
38+
* concurrent {@code next()} on another thread.
39+
*/
40+
private volatile long handle;
1741

1842
LiveStream(long handle) {
1943
this.handle = handle;
@@ -22,7 +46,14 @@ public class LiveStream implements AutoCloseable {
2246
/**
2347
* Blocks until the next notification is available, or the stream ends.
2448
*
49+
* <p>Returns {@link Optional#empty()} when the stream has been closed
50+
* (either explicitly via {@link #close()} or because the server ended the
51+
* live query). If the underlying live query encounters an error, a
52+
* {@link SurrealException} is thrown.
53+
*
2554
* @return the next notification, or empty if the stream has ended
55+
* @throws SurrealException
56+
* if the live query encounters an error
2657
*/
2758
public Optional<LiveNotification> next() {
2859
if (handle == 0) {
@@ -33,7 +64,11 @@ public Optional<LiveNotification> next() {
3364
}
3465

3566
/**
36-
* Releases the live query and stops receiving notifications. Idempotent.
67+
* Releases the live query and stops receiving notifications.
68+
*
69+
* <p>If another thread is blocked inside {@link #next()}, it will be
70+
* unblocked and will return {@link Optional#empty()}. This method is
71+
* idempotent: calling it more than once has no effect.
3772
*/
3873
@Override
3974
public void close() {

src/main/java/com/surrealdb/Surreal.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -265,15 +265,25 @@ public boolean importSql(String path) {
265265
}
266266

267267
/**
268-
* Starts a live query on the given table. Returns a blocking stream of
269-
* notifications (CREATE, UPDATE, DELETE). Call {@link LiveStream#next()} in a
270-
* loop and {@link LiveStream#close()} when done.
268+
* Starts a live query on the given table and returns a blocking stream of
269+
* notifications (CREATE, UPDATE, DELETE).
270+
*
271+
* <p>This method blocks until the live query subscription is fully
272+
* established on the server. If the subscription fails (e.g. the table does
273+
* not exist), a {@link SurrealException} is thrown immediately rather than
274+
* being deferred to the first {@link LiveStream#next()} call.
275+
*
276+
* <p>Call {@link LiveStream#next()} in a loop to receive notifications and
277+
* {@link LiveStream#close()} when done. The returned stream implements
278+
* {@link AutoCloseable} for use in try-with-resources.
271279
*
272280
* @param table
273-
* table name to watch
274-
* @return a LiveStream; must call {@link LiveStream#close()} when done
281+
* table name to watch (must already exist)
282+
* @return a LiveStream; the caller must call {@link LiveStream#close()} when
283+
* done
275284
* @throws SurrealException
276-
* if live queries are not supported or the request fails
285+
* if live queries are not supported, the table does not exist,
286+
* or the subscription fails
277287
*/
278288
public LiveStream selectLive(String table) {
279289
return new LiveStream(selectLive(getPtr(), table));

src/main/rust/error.rs

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ fn details_value(error: &surrealdb::Error) -> Value {
2929
if let Value::Object(ref map) = value {
3030
let kind_matches = map
3131
.get("kind")
32-
.map(|v| matches!(v, Value::String(s) if s.to_string() == kind_str))
32+
.map(|v| matches!(v, Value::String(s) if *s == kind_str))
3333
.unwrap_or(false);
3434
if kind_matches {
3535
if let Some(inner) = map.get("details") {
@@ -58,7 +58,6 @@ fn value_to_jobject<'a>(env: &mut JNIEnv<'a>, value: &Value) -> Option<JObject<'
5858
)
5959
.ok()
6060
.and_then(|v| v.l().ok())
61-
.map(JObject::from)
6261
}
6362
Value::Object(map) => {
6463
let class = env.find_class("java/util/LinkedHashMap").ok()?;
@@ -100,7 +99,6 @@ fn number_to_jobject<'a>(env: &mut JNIEnv<'a>, n: &Number) -> Option<JObject<'a>
10099
env.call_static_method(class, "valueOf", "(J)Ljava/lang/Long;", &[JValue::Long(*i)])
101100
.ok()
102101
.and_then(|v| v.l().ok())
103-
.map(JObject::from)
104102
}
105103
Number::Float(f) => {
106104
let class = env.find_class("java/lang/Double").ok()?;
@@ -112,7 +110,6 @@ fn number_to_jobject<'a>(env: &mut JNIEnv<'a>, n: &Number) -> Option<JObject<'a>
112110
)
113111
.ok()
114112
.and_then(|v| v.l().ok())
115-
.map(JObject::from)
116113
}
117114
Number::Decimal(d) => {
118115
let class = env.find_class("java/lang/Double").ok()?;
@@ -125,7 +122,6 @@ fn number_to_jobject<'a>(env: &mut JNIEnv<'a>, n: &Number) -> Option<JObject<'a>
125122
)
126123
.ok()
127124
.and_then(|v| v.l().ok())
128-
.map(JObject::from)
129125
}
130126
}
131127
}
@@ -136,6 +132,7 @@ fn number_to_jobject<'a>(env: &mut JNIEnv<'a>, n: &Number) -> Option<JObject<'a>
136132
/// Matches on the Rust SDK's ErrorDetails enum to choose the Java exception class and
137133
/// ErrorKind enum. Base ServerException uses (ErrorKind, String rawKindIfUnknown, message, details, cause);
138134
/// subclasses use (String message, Object details, ServerException cause).
135+
#[allow(clippy::redundant_closure)] // JObject::null as fn ptr produces 'static, breaking the 'a lifetime
139136
fn build_server_exception<'a>(
140137
env: &mut JNIEnv<'a>,
141138
error: &surrealdb::Error,
@@ -201,8 +198,7 @@ fn build_server_exception<'a>(
201198
.get_static_field(&enum_class, enum_name, "Lcom/surrealdb/ErrorKind;")
202199
.ok()?
203200
.l()
204-
.ok()
205-
.map(JObject::from)?;
201+
.ok()?;
206202
let raw_kind_jstr = match raw_kind_for_unknown {
207203
Some(s) => env
208204
.new_string(s)
@@ -219,10 +215,7 @@ fn build_server_exception<'a>(
219215
JValue::Object(&details_obj),
220216
JValue::Object(&java_cause),
221217
];
222-
match env.new_object(class, sig, &args) {
223-
Ok(obj) => Some(obj),
224-
Err(_) => None,
225-
}
218+
env.new_object(class, sig, &args).ok()
226219
} else {
227220
// Subclass(String message, Object details, ServerException cause)
228221
let sig = "(Ljava/lang/String;Ljava/lang/Object;Lcom/surrealdb/ServerException;)V";
@@ -231,10 +224,7 @@ fn build_server_exception<'a>(
231224
JValue::Object(&details_obj),
232225
JValue::Object(&java_cause),
233226
];
234-
match env.new_object(class, sig, &args) {
235-
Ok(obj) => Some(obj),
236-
Err(_) => None,
237-
}
227+
env.new_object(class, sig, &args).ok()
238228
}
239229
}
240230

src/main/rust/lib.rs

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,40 @@ use surrealdb::method::Transaction;
1515
use surrealdb::types::Value;
1616
use surrealdb::{Connection, IndexedResults, Surreal};
1717

18-
/// Item type for the live query channel (Result<Notification<Value>>).
18+
/// Item type for the live query notification channel.
1919
pub(crate) type LiveNotificationResult =
2020
std::result::Result<surrealdb::Notification<surrealdb::types::Value>, surrealdb::Error>;
2121

22-
/// Stored as handle for live streams. recv_mutex is held by nextNative during recv() so that
23-
/// releaseNative can wait for no thread in recv() before taking and dropping the receiver.
24-
/// join_handle, shutdown_tx and rx are in Mutex<Option<..>> so releaseNative can take/drop them via get_instance.
22+
/// Native handle backing a Java `LiveStream` instance.
23+
///
24+
/// Created by `selectLive` (in surreal.rs) after the live query subscription
25+
/// is confirmed, and freed by `releaseNative` (in live.rs) when the Java side
26+
/// calls `close()`.
27+
///
28+
/// ## Fields (tuple elements)
29+
///
30+
/// 0. **`recv_mutex`** (`Arc<Mutex<()>>`) — held by `nextNative` for the
31+
/// entire duration of the blocking `recv()` call. `releaseNative` acquires
32+
/// it *after* the channel has been closed so it can be sure no thread is
33+
/// still inside `recv()` before freeing the handle.
34+
///
35+
/// 1. **`join_handle`** (`Mutex<Option<JoinHandle>>`) — the background thread
36+
/// that reads from the SurrealDB live-query stream and forwards
37+
/// notifications into the async channel. Taken and joined by
38+
/// `releaseNative` during shutdown.
39+
///
40+
/// 2. **`shutdown_tx`** (`Mutex<Option<Sender<()>>>`) — dropping this sender
41+
/// signals the background thread (via `tokio::select!`) to exit.
42+
///
43+
/// 3. **`rx`** (`Mutex<Option<Receiver<LiveNotificationResult>>>`) — the
44+
/// receiving end of the notification channel, read by `nextNative`.
45+
///
46+
/// ## Lock ordering
47+
///
48+
/// Both `nextNative` and `releaseNative` acquire `recv_mutex` **before**
49+
/// `rx`, ensuring a consistent ordering and preventing deadlocks.
2550
pub(crate) type LiveStreamChannel = (
26-
std::sync::Arc<parking_lot::Mutex<()>>, // held during recv()
51+
std::sync::Arc<parking_lot::Mutex<()>>,
2752
parking_lot::Mutex<Option<std::thread::JoinHandle<()>>>,
2853
parking_lot::Mutex<Option<async_channel::Sender<()>>>,
2954
parking_lot::Mutex<Option<async_channel::Receiver<LiveNotificationResult>>>,

src/main/rust/live.rs

Lines changed: 48 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,24 @@ use jni::JNIEnv;
77
use crate::error::SurrealError;
88
use crate::{get_instance, new_string, take_instance, JniTypes, LiveStreamChannel, TOKIO_RUNTIME};
99

10+
/// JNI implementation of `LiveStream.nextNative(long handle)`.
11+
///
12+
/// Blocks the calling thread until a live-query notification arrives or the
13+
/// stream ends. Returns a `LiveNotification` jobject, or `null` when the
14+
/// stream has been closed (by `releaseNative` or because the server ended the
15+
/// live query).
16+
///
17+
/// ## Locking protocol
18+
///
19+
/// 1. Acquire `recv_mutex` — serializes concurrent `nextNative` calls and
20+
/// lets `releaseNative` know when no thread is inside `recv()`.
21+
/// 2. Acquire `rx_mux` — borrows the channel receiver for the blocking call.
22+
/// 3. Call `block_on(rx_ref.recv())` — parks the thread until a message
23+
/// arrives or all senders are dropped (channel closed).
24+
///
25+
/// Both guards are held for the duration of the `recv()`. This is safe
26+
/// because `releaseNative` only acquires these locks *after* the channel has
27+
/// been closed, guaranteeing `recv()` will have already returned.
1028
#[no_mangle]
1129
pub extern "system" fn Java_com_surrealdb_LiveStream_nextNative<'local>(
1230
mut env: JNIEnv<'local>,
@@ -16,23 +34,22 @@ pub extern "system" fn Java_com_surrealdb_LiveStream_nextNative<'local>(
1634
let (recv_mutex, _join_handle_mux, _shutdown_tx_mux, rx_mux) =
1735
match get_instance::<LiveStreamChannel>(handle_ptr, JniTypes::LiveStream) {
1836
Ok(r) => r,
19-
Err(e) => return e.exception(&mut env, || std::ptr::null_mut()),
37+
Err(e) => return e.exception(&mut env, std::ptr::null_mut),
2038
};
2139
let _recv_guard = recv_mutex.lock();
2240
let rx_opt_guard = rx_mux.lock();
2341
let rx_ref = match rx_opt_guard.as_ref() {
2442
Some(rx) => rx,
25-
None => return JObject::null().into_raw(), // already released
43+
None => return JObject::null().into_raw(),
2644
};
2745
let item = match TOKIO_RUNTIME.block_on(rx_ref.recv()) {
2846
Ok(item) => item,
29-
Err(_) => return JObject::null().into_raw(), // channel closed
47+
Err(_) => return JObject::null().into_raw(),
3048
};
3149
let notification = match item {
3250
Ok(n) => n,
33-
Err(e) => return SurrealError::from(e).exception(&mut env, || std::ptr::null_mut()),
51+
Err(e) => return SurrealError::from(e).exception(&mut env, std::ptr::null_mut),
3452
};
35-
// Build Java LiveNotification(action, valuePtr, queryId)
3653
let action_raw = new_string!(&mut env, notification.action.to_string(), || {
3754
std::ptr::null_mut()
3855
});
@@ -44,7 +61,7 @@ pub extern "system" fn Java_com_surrealdb_LiveStream_nextNative<'local>(
4461
let query_id_str = unsafe { JObject::from_raw(query_id_raw) };
4562
let class = match env.find_class("com/surrealdb/LiveNotification") {
4663
Ok(c) => c,
47-
Err(e) => return SurrealError::from(e).exception(&mut env, || std::ptr::null_mut()),
64+
Err(e) => return SurrealError::from(e).exception(&mut env, std::ptr::null_mut),
4865
};
4966
let args = [
5067
JValue::Object(&action_str),
@@ -53,10 +70,31 @@ pub extern "system" fn Java_com_surrealdb_LiveStream_nextNative<'local>(
5370
];
5471
match env.new_object(class, "(Ljava/lang/String;JLjava/lang/String;)V", &args) {
5572
Ok(obj) => obj.into_raw(),
56-
Err(e) => SurrealError::from(e).exception(&mut env, || std::ptr::null_mut()),
73+
Err(e) => SurrealError::from(e).exception(&mut env, std::ptr::null_mut),
5774
}
5875
}
5976

77+
/// JNI implementation of `LiveStream.releaseNative(long handle)`.
78+
///
79+
/// Shuts down the live query: stops the background thread, waits for any
80+
/// in-progress `nextNative` call to finish, then frees the native handle.
81+
///
82+
/// ## Shutdown sequence
83+
///
84+
/// 1. **Drop `shutdown_tx`** — the background thread's `tokio::select!` loop
85+
/// detects the closed shutdown channel and breaks. This also causes the
86+
/// background thread to drop its `tx_thread` sender, closing the
87+
/// notification channel.
88+
/// 2. **Join the background thread** — ensures `tx_thread` has been dropped
89+
/// and the channel is fully closed before proceeding.
90+
/// 3. **Acquire `recv_mutex`** — at this point the channel is closed, so any
91+
/// `nextNative` call blocked on `recv()` has already returned and released
92+
/// the mutex. Acquiring it here is a final safety barrier.
93+
/// 4. **Take the receiver** — drops it while holding `recv_mutex`, preventing
94+
/// any new `recv()` attempt.
95+
/// 5. **`take_instance`** — reclaims the boxed `LiveStreamChannel`, freeing
96+
/// the allocation. The Java side zeroes its `handle` field after this
97+
/// call returns, preventing further native access.
6098
#[no_mangle]
6199
pub extern "system" fn Java_com_surrealdb_LiveStream_releaseNative<'local>(
62100
_env: JNIEnv<'local>,
@@ -66,21 +104,17 @@ pub extern "system" fn Java_com_surrealdb_LiveStream_releaseNative<'local>(
66104
if handle_ptr == 0 {
67105
return;
68106
}
69-
// Do NOT take_instance yet: another thread may be in nextNative (get_instance + recv).
70-
// First close the channel via the stored sender, join the background thread, then acquire
71-
// the recv mutex (so no thread is in recv()), then take_instance and drop the receiver.
72107
let channel_ref = match get_instance::<LiveStreamChannel>(handle_ptr, JniTypes::LiveStream) {
73108
Ok(r) => r,
74109
Err(_) => return,
75110
};
76111
let (recv_mutex, join_handle_mux, shutdown_tx_mux, rx_mux) = channel_ref;
77-
drop(shutdown_tx_mux.lock().take()); // drop sender so background thread exits and channel closes
112+
drop(shutdown_tx_mux.lock().take());
78113
if let Some(join_handle) = join_handle_mux.lock().take() {
79114
let _ = join_handle.join();
80115
}
81-
let _recv_guard = recv_mutex.lock(); // wait until any thread in nextNative has left recv()
82-
let _rx = rx_mux.lock().take(); // take and drop receiver while holding recv_guard
116+
let _recv_guard = recv_mutex.lock();
117+
let _rx = rx_mux.lock().take();
83118
drop(_recv_guard);
84119
let _ = take_instance::<LiveStreamChannel>(handle_ptr, JniTypes::LiveStream);
85-
// free the box
86120
}

0 commit comments

Comments
 (0)