Skip to content

Commit 5af368e

Browse files
authored
Merge pull request #5 from physwkim/fix/udp-reuseport-and-init-no-data
fix(server): macOS UDP search port sharing + init handlers without data, Ill need to fix search.rs but otherwise happy with the changes
2 parents d5ba26a + 58b0782 commit 5af368e

3 files changed

Lines changed: 63 additions & 9 deletions

File tree

spvirit-server/src/handler.rs

Lines changed: 57 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,28 @@ pub fn search_reply_target(addr: &[u8; 16], port: u16, peer: SocketAddr) -> Sock
247247
SocketAddr::new(target_ip, target_port)
248248
}
249249

250+
/// Bind the fixed UDP search port with `SO_REUSEADDR` (and `SO_REUSEPORT`
251+
/// on Unix) so other local PVA consumers such as `p4p` can also listen on
252+
/// the same well-known port. On macOS in particular, a plain
253+
/// `UdpSocket::bind(5076)` prevents any subsequent binder from joining the
254+
/// port, which broke co-located clients.
255+
pub fn bind_udp_search_socket(addr: SocketAddr) -> std::io::Result<UdpSocket> {
256+
use socket2::{Domain, Protocol, Socket, Type};
257+
258+
let domain = if addr.is_ipv4() {
259+
Domain::IPV4
260+
} else {
261+
Domain::IPV6
262+
};
263+
let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
264+
socket.set_reuse_address(true)?;
265+
#[cfg(unix)]
266+
socket.set_reuse_port(true)?;
267+
socket.set_nonblocking(true)?;
268+
socket.bind(&addr.into())?;
269+
UdpSocket::from_std(socket.into())
270+
}
271+
250272
pub fn infer_udp_response_ip(peer: SocketAddr) -> Option<IpAddr> {
251273
let bind_addr = if peer.is_ipv4() {
252274
"0.0.0.0:0"
@@ -606,7 +628,7 @@ pub async fn run_udp_search<S: PvStore>(
606628
guid: [u8; 12],
607629
advertise_ip: Option<IpAddr>,
608630
) -> Result<(), Box<dyn std::error::Error>> {
609-
let socket = UdpSocket::bind(addr).await?;
631+
let socket = bind_udp_search_socket(addr)?;
610632
socket.set_broadcast(true)?;
611633
let mut buf = vec![0u8; 4096];
612634

@@ -1123,7 +1145,16 @@ pub async fn handle_connection<S: PvStore>(
11231145
11 => {
11241146
// PUT
11251147
if is_init {
1126-
let Some(nt) = get_nt_snapshot(&state, &pv_name).await else {
1148+
// Init only needs the type descriptor, not current data.
1149+
// Use get_descriptor first; fall back to snapshot so PUT
1150+
// can target PVs that do not yet have any data.
1151+
let desc = if let Some(desc) =
1152+
state.store.get_descriptor(&pv_name).await
1153+
{
1154+
desc
1155+
} else if let Some(nt) = get_nt_snapshot(&state, &pv_name).await {
1156+
nt_payload_desc(&nt)
1157+
} else {
11271158
state
11281159
.registry
11291160
.send_msg(
@@ -1153,7 +1184,6 @@ pub async fn handle_connection<S: PvStore>(
11531184
state.registry.send_msg(conn_id, resp).await;
11541185
continue;
11551186
}
1156-
let desc = nt_payload_desc(&nt);
11571187
conn_state.ioid_to_desc.insert(ioid, desc.clone());
11581188
conn_state.ioid_to_pv.insert(ioid, pv_name.clone());
11591189
let resp = encode_op_init_response_desc(
@@ -1271,7 +1301,16 @@ pub async fn handle_connection<S: PvStore>(
12711301
12 => {
12721302
// PUT_GET
12731303
if is_init {
1274-
let Some(nt) = get_nt_snapshot(&state, &pv_name).await else {
1304+
// Init only needs the type descriptor, not current data.
1305+
// Use get_descriptor first; fall back to snapshot so that
1306+
// clients can initiate PUT_GET before any data exists.
1307+
let desc = if let Some(desc) =
1308+
state.store.get_descriptor(&pv_name).await
1309+
{
1310+
desc
1311+
} else if let Some(nt) = get_nt_snapshot(&state, &pv_name).await {
1312+
nt_payload_desc(&nt)
1313+
} else {
12751314
state
12761315
.registry
12771316
.send_msg(
@@ -1300,7 +1339,6 @@ pub async fn handle_connection<S: PvStore>(
13001339
state.registry.send_msg(conn_id, resp).await;
13011340
continue;
13021341
}
1303-
let desc = nt_payload_desc(&nt);
13041342
conn_state.ioid_to_desc.insert(ioid, desc.clone());
13051343
conn_state.ioid_to_pv.insert(ioid, pv_name.clone());
13061344
let resp =
@@ -1391,7 +1429,20 @@ pub async fn handle_connection<S: PvStore>(
13911429
13 => {
13921430
// MONITOR
13931431
if is_init {
1394-
let Some(nt) = get_nt_snapshot(&state, &pv_name).await else {
1432+
// Init only needs the type descriptor, not the data.
1433+
// Use get_descriptor first; fall back to snapshot. This
1434+
// lets clients subscribe to PVs before any data has been
1435+
// produced (e.g. NTNDArray before acquire). Real monitor
1436+
// updates are pushed once data arrives. Strict clients
1437+
// like p4p treat a MONITOR init error as fatal, so we
1438+
// must not error out when only the descriptor is known.
1439+
let full_desc = if let Some(desc) =
1440+
state.store.get_descriptor(&pv_name).await
1441+
{
1442+
desc
1443+
} else if let Some(nt) = get_nt_snapshot(&state, &pv_name).await {
1444+
nt_payload_desc(&nt)
1445+
} else {
13951446
state
13961447
.registry
13971448
.send_msg(
@@ -1408,7 +1459,6 @@ pub async fn handle_connection<S: PvStore>(
14081459
.await;
14091460
continue;
14101461
};
1411-
let full_desc = nt_payload_desc(&nt);
14121462
let pv_req_fields = decode_pv_request_fields(&payload.body, is_be);
14131463
let desc = match &pv_req_fields {
14141464
Some(fields) => filter_structure_desc(&full_desc, fields),

spvirit-tools/src/bin/spvirit_dodeca.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,9 @@ async fn run_udp_search(
396396
tcp_port: u16,
397397
guid: [u8; 12],
398398
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
399-
let socket = UdpSocket::bind(addr).await?;
399+
// Bind with SO_REUSEADDR/SO_REUSEPORT so co-located PVA consumers
400+
// (e.g. p4p on macOS) can also listen on the fixed search port.
401+
let socket = spvirit_server::handler::bind_udp_search_socket(addr)?;
400402
socket.set_broadcast(true)?;
401403
let mut buf = vec![0u8; 4096];
402404

spvirit-tools/src/bin/spvirit_server.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -375,7 +375,9 @@ async fn run_udp_search(
375375
guid: [u8; 12],
376376
advertise_ip: Option<IpAddr>,
377377
) -> Result<(), Box<dyn std::error::Error>> {
378-
let socket = UdpSocket::bind(addr).await?;
378+
// Bind with SO_REUSEADDR/SO_REUSEPORT so co-located PVA consumers
379+
// (e.g. p4p on macOS) can also listen on the fixed search port.
380+
let socket = spvirit_server::handler::bind_udp_search_socket(addr)?;
379381
socket.set_broadcast(true)?;
380382
let mut buf = vec![0u8; 4096];
381383

0 commit comments

Comments
 (0)