Skip to content

Commit 3870a42

Browse files
authored
encode before incrementing seq + putting in send buffer (#363)
## Why + What changed spiritual successor of #278 - Moves message encoding (codec.toBuffer) before incrementing the sequence number and pushing to the send buffer, so that if encoding fails (e.g. unserializable values like `Symbol`), or JS cannot allocate memory for the buffer, the session `seq` doesn't get out of sync - Changes the send buffer type from OpaqueTransportMessage[] to EncodedTransportMessage[] so buffered messages are already serialized and don't need re-encoding on reconnect - Adds tests for unserializable values in procedure handlers (binary codec throws, JSON codec silently drops) ## Versioning - [x] Breaking protocol change - [ ] Breaking ts/js API change <!-- Kind reminder to add tests and updated documentation if needed -->
1 parent 0a90804 commit 3870a42

18 files changed

Lines changed: 586 additions & 113 deletions

__tests__/unserializable.test.ts

Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
import { beforeEach, describe, expect, test } from 'vitest';
2+
import { Type } from '@sinclair/typebox';
3+
import {
4+
Procedure,
5+
createServiceSchema,
6+
Ok,
7+
createClient,
8+
createServer,
9+
UNEXPECTED_DISCONNECT_CODE,
10+
} from '../router';
11+
import { testMatrix } from '../testUtil/fixtures/matrix';
12+
import {
13+
advanceFakeTimersBySessionGrace,
14+
cleanupTransports,
15+
createPostTestCleanups,
16+
} from '../testUtil/fixtures/cleanup';
17+
import { TestSetupHelpers } from '../testUtil/fixtures/transports';
18+
import { readNextResult } from '../testUtil';
19+
20+
const ServiceSchema = createServiceSchema();
21+
22+
const UnserializableServiceSchema = ServiceSchema.define({
23+
returnSymbol: Procedure.rpc({
24+
requestInit: Type.Object({}),
25+
responseData: Type.Object({ id: Type.String() }),
26+
async handler() {
27+
return Ok({ id: 'test', extra: Symbol('unserializable') });
28+
},
29+
}),
30+
streamSymbol: Procedure.subscription({
31+
requestInit: Type.Object({}),
32+
responseData: Type.Object({ id: Type.String() }),
33+
async handler({ resWritable }) {
34+
resWritable.write(Ok({ id: 'test', extra: Symbol('unserializable') }));
35+
resWritable.close();
36+
},
37+
}),
38+
});
39+
40+
describe('unserializable values in procedure handlers', () => {
41+
// binary codec (msgpack) throws on Symbol, causing encode failure
42+
// which kills the session -- only test with ws transport since mock
43+
// transport's setImmediate chains conflict with fake timer flushing
44+
describe.each(testMatrix(['ws', 'binary']))(
45+
'binary codec ($transport.name transport)',
46+
({ transport, codec }) => {
47+
const opts = { codec: codec.codec };
48+
const { addPostTestCleanup, postTestCleanup } = createPostTestCleanups();
49+
let getClientTransport: TestSetupHelpers['getClientTransport'];
50+
let getServerTransport: TestSetupHelpers['getServerTransport'];
51+
52+
beforeEach(async () => {
53+
const setup = await transport.setup({ client: opts, server: opts });
54+
getClientTransport = setup.getClientTransport;
55+
getServerTransport = setup.getServerTransport;
56+
57+
return async () => {
58+
await postTestCleanup();
59+
await setup.cleanup();
60+
};
61+
});
62+
63+
test('rpc handler returning symbol causes client disconnect', async () => {
64+
const clientTransport = getClientTransport('client');
65+
const serverTransport = getServerTransport();
66+
const services = { svc: UnserializableServiceSchema };
67+
createServer(serverTransport, services);
68+
const client = createClient<typeof services>(
69+
clientTransport,
70+
serverTransport.clientId,
71+
);
72+
addPostTestCleanup(() =>
73+
cleanupTransports([clientTransport, serverTransport]),
74+
);
75+
76+
const resultPromise = client.svc.returnSymbol.rpc({});
77+
await advanceFakeTimersBySessionGrace();
78+
79+
const result = await resultPromise;
80+
expect(result).toMatchObject({
81+
ok: false,
82+
payload: {
83+
code: UNEXPECTED_DISCONNECT_CODE,
84+
},
85+
});
86+
});
87+
88+
test('client-side encode failure cleans up listeners', async () => {
89+
const clientTransport = getClientTransport('client');
90+
const serverTransport = getServerTransport();
91+
const services = { svc: UnserializableServiceSchema };
92+
createServer(serverTransport, services);
93+
const client = createClient<typeof services>(
94+
clientTransport,
95+
serverTransport.clientId,
96+
);
97+
addPostTestCleanup(() =>
98+
cleanupTransports([clientTransport, serverTransport]),
99+
);
100+
101+
const messageListenersBefore =
102+
clientTransport.eventDispatcher.numberOfListeners('message');
103+
const sessionStatusListenersBefore =
104+
clientTransport.eventDispatcher.numberOfListeners('sessionStatus');
105+
106+
// sending a Symbol as init payload will fail encoding on the client side
107+
expect(() =>
108+
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument, @typescript-eslint/no-explicit-any
109+
client.svc.returnSymbol.rpc({ extra: Symbol('x') } as any),
110+
).toThrow();
111+
112+
// listeners should not leak after the failed send
113+
expect(
114+
clientTransport.eventDispatcher.numberOfListeners('message'),
115+
).toEqual(messageListenersBefore);
116+
expect(
117+
clientTransport.eventDispatcher.numberOfListeners('sessionStatus'),
118+
).toEqual(sessionStatusListenersBefore);
119+
});
120+
121+
test('subscription handler writing symbol causes client disconnect', async () => {
122+
const clientTransport = getClientTransport('client');
123+
const serverTransport = getServerTransport();
124+
const services = { svc: UnserializableServiceSchema };
125+
createServer(serverTransport, services);
126+
const client = createClient<typeof services>(
127+
clientTransport,
128+
serverTransport.clientId,
129+
);
130+
addPostTestCleanup(() =>
131+
cleanupTransports([clientTransport, serverTransport]),
132+
);
133+
134+
const { resReadable } = client.svc.streamSymbol.subscribe({});
135+
await advanceFakeTimersBySessionGrace();
136+
137+
const result = await readNextResult(resReadable);
138+
expect(result).toMatchObject({
139+
ok: false,
140+
payload: {
141+
code: UNEXPECTED_DISCONNECT_CODE,
142+
},
143+
});
144+
});
145+
},
146+
);
147+
148+
// json codec silently drops Symbol values via JSON.stringify
149+
describe.each(testMatrix(['all', 'naive']))(
150+
'json codec ($transport.name transport)',
151+
({ transport, codec }) => {
152+
const opts = { codec: codec.codec };
153+
const { addPostTestCleanup, postTestCleanup } = createPostTestCleanups();
154+
let getClientTransport: TestSetupHelpers['getClientTransport'];
155+
let getServerTransport: TestSetupHelpers['getServerTransport'];
156+
157+
beforeEach(async () => {
158+
const setup = await transport.setup({ client: opts, server: opts });
159+
getClientTransport = setup.getClientTransport;
160+
getServerTransport = setup.getServerTransport;
161+
162+
return async () => {
163+
await postTestCleanup();
164+
await setup.cleanup();
165+
};
166+
});
167+
168+
test('rpc handler returning symbol silently drops the value', async () => {
169+
const clientTransport = getClientTransport('client');
170+
const serverTransport = getServerTransport();
171+
const services = { svc: UnserializableServiceSchema };
172+
const server = createServer(serverTransport, services);
173+
const client = createClient<typeof services>(
174+
clientTransport,
175+
serverTransport.clientId,
176+
);
177+
addPostTestCleanup(() =>
178+
cleanupTransports([clientTransport, serverTransport]),
179+
);
180+
181+
const result = await client.svc.returnSymbol.rpc({});
182+
// JSON.stringify silently drops Symbol values, so the
183+
// response arrives with the extra symbol field missing
184+
expect(result).toStrictEqual({
185+
ok: true,
186+
payload: { id: 'test' },
187+
});
188+
189+
await server.close();
190+
});
191+
},
192+
);
193+
});

package-lock.json

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"name": "@replit/river",
33
"description": "It's like tRPC but... with JSON Schema Support, duplex streaming and support for service multiplexing. Transport agnostic!",
4-
"version": "0.214.0",
4+
"version": "0.215.0",
55
"type": "module",
66
"exports": {
77
".": {

router/client.ts

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -508,16 +508,21 @@ function handleProc(
508508
transport.addEventListener('message', onMessage);
509509
transport.addEventListener('sessionStatus', onSessionStatus);
510510

511-
sessionScopedSend({
512-
streamId,
513-
serviceName,
514-
procedureName,
515-
tracing: getPropagationContext(ctx),
516-
payload: init,
517-
controlFlags: procClosesWithInit
518-
? ControlFlags.StreamOpenBit | ControlFlags.StreamClosedBit
519-
: ControlFlags.StreamOpenBit,
520-
});
511+
try {
512+
sessionScopedSend({
513+
streamId,
514+
serviceName,
515+
procedureName,
516+
tracing: getPropagationContext(ctx),
517+
payload: init,
518+
controlFlags: procClosesWithInit
519+
? ControlFlags.StreamOpenBit | ControlFlags.StreamClosedBit
520+
: ControlFlags.StreamOpenBit,
521+
});
522+
} catch (e) {
523+
cleanup();
524+
throw e;
525+
}
521526

522527
if (procClosesWithInit) {
523528
reqWritable.close();

testUtil/fixtures/cleanup.ts

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
1-
import { expect, vi } from 'vitest';
1+
import { assert, expect, vi } from 'vitest';
22
import {
33
ClientTransport,
44
Connection,
5-
OpaqueTransportMessage,
65
ServerTransport,
76
Transport,
87
} from '../../transport';
@@ -68,14 +67,17 @@ export async function ensureTransportBuffersAreEventuallyEmpty(
6867
[...t.sessions]
6968
.map(([client, sess]) => {
7069
// get all messages that are not heartbeats
71-
const buff = sess.sendBuffer.filter((msg) => {
72-
return !Value.Check(ControlMessageAckSchema, msg.payload);
70+
const buff = sess.sendBuffer.filter((encodedMsg) => {
71+
const decoded = sess.codec.fromBuffer(encodedMsg.data);
72+
assert(decoded.ok);
73+
74+
return !Value.Check(
75+
ControlMessageAckSchema,
76+
decoded.value.payload,
77+
);
7378
});
7479

75-
return [client, buff] as [
76-
string,
77-
ReadonlyArray<OpaqueTransportMessage>,
78-
];
80+
return [client, buff] as const;
7981
})
8082
.filter((entry) => entry[1].length > 0),
8183
),

testUtil/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,9 @@ export function dummySession() {
184184
onSessionGracePeriodElapsed: () => {
185185
/* noop */
186186
},
187+
onMessageSendFailure: () => {
188+
/* noop */
189+
},
187190
},
188191
testingSessionOptions,
189192
currentProtocolVersion,

transport/client.ts

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,18 @@ export abstract class ClientTransport<
107107
onSessionGracePeriodElapsed: () => {
108108
this.onSessionGracePeriodElapsed(session);
109109
},
110+
onMessageSendFailure: (msg, reason) => {
111+
this.log?.error(`failed to send message: ${reason}`, {
112+
...session.loggingMetadata,
113+
transportMessage: msg,
114+
});
115+
116+
this.protocolError({
117+
type: ProtocolError.MessageSendFailure,
118+
message: reason,
119+
});
120+
this.deleteSession(session, { unhealthy: true });
121+
},
110122
},
111123
this.options,
112124
currentProtocolVersion,
@@ -186,6 +198,18 @@ export abstract class ClientTransport<
186198
onSessionGracePeriodElapsed: () => {
187199
this.onSessionGracePeriodElapsed(handshakingSession);
188200
},
201+
onMessageSendFailure: (msg, reason) => {
202+
this.log?.error(`failed to send message: ${reason}`, {
203+
...handshakingSession.loggingMetadata,
204+
transportMessage: msg,
205+
});
206+
207+
this.protocolError({
208+
type: ProtocolError.MessageSendFailure,
209+
message: reason,
210+
});
211+
this.deleteSession(handshakingSession, { unhealthy: true });
212+
},
189213
},
190214
);
191215

@@ -395,6 +419,18 @@ export abstract class ClientTransport<
395419
onSessionGracePeriodElapsed: () => {
396420
this.onSessionGracePeriodElapsed(backingOffSession);
397421
},
422+
onMessageSendFailure: (msg, reason) => {
423+
this.log?.error(`failed to send message: ${reason}`, {
424+
...backingOffSession.loggingMetadata,
425+
transportMessage: msg,
426+
});
427+
428+
this.protocolError({
429+
type: ProtocolError.MessageSendFailure,
430+
message: reason,
431+
});
432+
this.deleteSession(backingOffSession, { unhealthy: true });
433+
},
398434
},
399435
);
400436

@@ -470,6 +506,18 @@ export abstract class ClientTransport<
470506
onSessionGracePeriodElapsed: () => {
471507
this.onSessionGracePeriodElapsed(connectingSession);
472508
},
509+
onMessageSendFailure: (msg, reason) => {
510+
this.log?.error(`failed to send message: ${reason}`, {
511+
...connectingSession.loggingMetadata,
512+
transportMessage: msg,
513+
});
514+
515+
this.protocolError({
516+
type: ProtocolError.MessageSendFailure,
517+
message: reason,
518+
});
519+
this.deleteSession(connectingSession, { unhealthy: true });
520+
},
473521
},
474522
);
475523

transport/message.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,18 @@ export function cancelMessage(
280280
export type OpaqueTransportMessage = TransportMessage;
281281
export type TransportClientId = string;
282282

283+
/**
284+
* An encoded message that is ready to be sent over the transport.
285+
* The seq number is kept to track which messages have been
286+
* acked by the peer and can be dropped from the send buffer.
287+
*/
288+
export interface EncodedTransportMessage {
289+
id: string;
290+
seq: number;
291+
msg: PartialTransportMessage;
292+
data: Uint8Array;
293+
}
294+
283295
/**
284296
* Checks if the given control flag (usually found in msg.controlFlag) is an ack message.
285297
* @param controlFlag - The control flag to check.

0 commit comments

Comments
 (0)