Skip to content

Commit 3645d6b

Browse files
fix: address code review findings (#15)
* fix: address code review findings - H1: Only suppress NATS error codes 10058/10059 in stream catch blocks, rethrow all other errors instead of silently swallowing them - H2: Rethrow errors for ephemeral consumer creation instead of silently ignoring failures - H3: Replace long positional parameter lists in startJSConsumers and startCoreConsumers with JSConsumerOptions/CoreConsumerOptions interfaces - M2: Replace non-null assertion on msg.headers with guard pattern - M3: Use module-level TextEncoder/TextDecoder instances instead of allocating per message - M4: Validate that backOff array length does not exceed maxDeliver * test: add coverage for stream error handling, backOff validation, and ephemeral consumer errors
1 parent 2cf31ef commit 3645d6b

8 files changed

Lines changed: 244 additions & 96 deletions

File tree

__tests__/consumer-options.test.ts

Lines changed: 126 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@ import { describe, expect, it, mock, beforeEach } from "bun:test";
22
import { startJSConsumers } from "../src/consumer.js";
33
import type { JSConsumerRegistration } from "../src/consumer.js";
44
import type { ConsumerDefaults } from "../src/connection.js";
5-
import { AckPolicy } from "nats";
5+
import type { JSConsumerOptions } from "../src/consumer.js";
6+
import { AckPolicy, NatsError } from "nats";
67

78
const silentLogger = {
89
info: mock(() => {}),
@@ -65,9 +66,7 @@ describe("Consumer defaults (connection-level)", () => {
6566
"test-svc",
6667
registrations,
6768
silentLogger,
68-
undefined,
69-
undefined,
70-
defaults,
69+
{ consumerDefaults: defaults },
7170
);
7271

7372
expect(jsm.consumers.add).toHaveBeenCalledWith("events", {
@@ -101,9 +100,7 @@ describe("Consumer defaults (connection-level)", () => {
101100
"test-svc",
102101
registrations,
103102
silentLogger,
104-
undefined,
105-
undefined,
106-
defaults,
103+
{ consumerDefaults: defaults },
107104
);
108105

109106
expect(jsm.consumers.add).toHaveBeenCalledWith("events", {
@@ -138,9 +135,7 @@ describe("Consumer defaults (connection-level)", () => {
138135
"test-svc",
139136
registrations,
140137
silentLogger,
141-
undefined,
142-
undefined,
143-
defaults,
138+
{ consumerDefaults: defaults },
144139
);
145140

146141
expect(jsm.consumers.add).toHaveBeenCalledWith("events", {
@@ -201,9 +196,7 @@ describe("Consumer defaults (connection-level)", () => {
201196
"test-svc",
202197
registrations,
203198
silentLogger,
204-
undefined,
205-
undefined,
206-
defaults,
199+
{ consumerDefaults: defaults },
207200
);
208201

209202
expect(jsm.consumers.add).toHaveBeenCalledWith("events", {
@@ -234,9 +227,7 @@ describe("Consumer defaults (connection-level)", () => {
234227
"test-svc",
235228
registrations,
236229
silentLogger,
237-
undefined,
238-
undefined,
239-
defaults,
230+
{ consumerDefaults: defaults },
240231
);
241232

242233
expect(jsm.consumers.add).toHaveBeenCalledWith("events", {
@@ -270,9 +261,7 @@ describe("Per-consumer options override defaults", () => {
270261
"test-svc",
271262
registrations,
272263
silentLogger,
273-
undefined,
274-
undefined,
275-
defaults,
264+
{ consumerDefaults: defaults },
276265
);
277266

278267
expect(jsm.consumers.add).toHaveBeenCalledWith("events", {
@@ -305,9 +294,7 @@ describe("Per-consumer options override defaults", () => {
305294
"test-svc",
306295
registrations,
307296
silentLogger,
308-
undefined,
309-
undefined,
310-
defaults,
297+
{ consumerDefaults: defaults },
311298
);
312299

313300
expect(jsm.consumers.add).toHaveBeenCalledWith("events", {
@@ -380,9 +367,7 @@ describe("Per-consumer options override defaults", () => {
380367
"test-svc",
381368
registrations,
382369
silentLogger,
383-
undefined,
384-
undefined,
385-
defaults,
370+
{ consumerDefaults: defaults },
386371
);
387372

388373
expect(jsm.consumers.add).toHaveBeenCalledWith("events", {
@@ -393,3 +378,119 @@ describe("Per-consumer options override defaults", () => {
393378
});
394379
});
395380
});
381+
382+
describe("Stream error handling", () => {
383+
it("rethrows non-NATS errors from stream update", async () => {
384+
const { js } = createMockJsAndJsm();
385+
386+
// streams.add fails, then streams.update also fails with a non-NATS error
387+
const jsm = {
388+
streams: {
389+
add: mock(() => Promise.reject(new Error("add failed"))),
390+
update: mock(() => Promise.reject(new Error("network timeout"))),
391+
},
392+
consumers: {
393+
add: mock(() => Promise.resolve({})),
394+
delete: mock(() => Promise.resolve(true)),
395+
},
396+
};
397+
398+
const registrations: JSConsumerRegistration<unknown>[] = [
399+
{
400+
kind: "jetstream",
401+
stream: "events",
402+
routingKey: "Order.Created",
403+
handler: async () => {},
404+
durable: "test-svc",
405+
},
406+
];
407+
408+
await expect(
409+
startJSConsumers(js as never, jsm as never, "test-svc", registrations, silentLogger),
410+
).rejects.toThrow("network timeout");
411+
});
412+
413+
it("suppresses NATS 10058 error (stream already exists)", async () => {
414+
const { js } = createMockJsAndJsm();
415+
416+
const natsErr = new NatsError("stream exists", "10058", new Error("inner"));
417+
(natsErr as any).api_error = { err_code: 10058 };
418+
const jsm = {
419+
streams: {
420+
add: mock(() => Promise.reject(new Error("add failed"))),
421+
update: mock(() => Promise.reject(natsErr)),
422+
},
423+
consumers: {
424+
add: mock(() => Promise.resolve({ name: "test-svc" })),
425+
delete: mock(() => Promise.resolve(true)),
426+
},
427+
};
428+
429+
const registrations: JSConsumerRegistration<unknown>[] = [
430+
{
431+
kind: "jetstream",
432+
stream: "events",
433+
routingKey: "Order.Created",
434+
handler: async () => {},
435+
durable: "test-svc",
436+
},
437+
];
438+
439+
// Should not throw
440+
await startJSConsumers(js as never, jsm as never, "test-svc", registrations, silentLogger);
441+
expect(jsm.consumers.add).toHaveBeenCalled();
442+
});
443+
});
444+
445+
describe("backOff/maxDeliver validation", () => {
446+
it("throws when backOff length exceeds maxDeliver", async () => {
447+
const { js, jsm } = createMockJsAndJsm();
448+
449+
const registrations: JSConsumerRegistration<unknown>[] = [
450+
{
451+
kind: "jetstream",
452+
stream: "events",
453+
routingKey: "Order.Created",
454+
handler: async () => {},
455+
durable: "test-svc",
456+
maxDeliver: 2,
457+
backOff: [1000, 5000, 30000],
458+
},
459+
];
460+
461+
await expect(
462+
startJSConsumers(js as never, jsm as never, "test-svc", registrations, silentLogger),
463+
).rejects.toThrow("backOff length (3) exceeds maxDeliver (2)");
464+
});
465+
});
466+
467+
describe("Ephemeral consumer error handling", () => {
468+
it("rethrows error when ephemeral consumer creation fails", async () => {
469+
const { js } = createMockJsAndJsm();
470+
471+
const jsm = {
472+
streams: {
473+
add: mock(() => Promise.resolve({})),
474+
update: mock(() => Promise.resolve({})),
475+
},
476+
consumers: {
477+
add: mock(() => Promise.reject(new Error("consumer creation failed"))),
478+
delete: mock(() => Promise.resolve(true)),
479+
},
480+
};
481+
482+
const registrations: JSConsumerRegistration<unknown>[] = [
483+
{
484+
kind: "jetstream",
485+
stream: "events",
486+
routingKey: "Order.Created",
487+
handler: async () => {},
488+
// no durable = ephemeral
489+
},
490+
];
491+
492+
await expect(
493+
startJSConsumers(js as never, jsm as never, "test-svc", registrations, silentLogger),
494+
).rejects.toThrow("consumer creation failed");
495+
});
496+
});

__tests__/metrics.test.ts

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -151,8 +151,7 @@ describe("NATS JetStream Consumer Metrics", () => {
151151

152152
await startJSConsumers(
153153
js as never, jsm as never, "test-svc", registrations, silentLogger,
154-
undefined, undefined, undefined, undefined, undefined,
155-
metrics,
154+
{ metrics },
156155
);
157156

158157
await waitFor(() => {
@@ -221,8 +220,7 @@ describe("NATS JetStream Consumer Metrics", () => {
221220

222221
await startJSConsumers(
223222
js as never, jsm as never, "test-svc", registrations, silentLogger,
224-
undefined, undefined, undefined, undefined, undefined,
225-
metrics,
223+
{ metrics },
226224
);
227225

228226
await waitFor(() => {
@@ -285,8 +283,7 @@ describe("NATS JetStream Consumer Metrics", () => {
285283

286284
await startJSConsumers(
287285
js as never, jsm as never, "test-svc", registrations, silentLogger,
288-
undefined, undefined, undefined, undefined, undefined,
289-
metrics,
286+
{ metrics },
290287
);
291288

292289
await waitFor(() => {
@@ -347,8 +344,7 @@ describe("NATS JetStream Consumer Metrics", () => {
347344

348345
await startJSConsumers(
349346
js as never, jsm as never, "test-svc", registrations, silentLogger,
350-
undefined, undefined, undefined, undefined, undefined,
351-
metrics,
347+
{ metrics },
352348
);
353349

354350
await waitFor(() => {
@@ -416,8 +412,7 @@ describe("NATS JetStream Consumer Metrics", () => {
416412

417413
await startJSConsumers(
418414
js as never, jsm as never, "test-svc", registrations, silentLogger,
419-
undefined, undefined, undefined, undefined, undefined,
420-
metrics, mapper,
415+
{ metrics, routingKeyMapper: mapper },
421416
);
422417

423418
await waitFor(() => {
@@ -453,7 +448,7 @@ describe("NATS Core Consumer Metrics", () => {
453448

454449
startCoreConsumers(
455450
nc as never, "test-service", registrations, silentLogger,
456-
undefined, undefined, undefined, metrics,
451+
{ metrics },
457452
);
458453

459454
const msg = createMockMsg({ orderId: "123" }, ceHeaders);
@@ -480,7 +475,7 @@ describe("NATS Core Consumer Metrics", () => {
480475

481476
startCoreConsumers(
482477
nc as never, "test-service", registrations, silentLogger,
483-
undefined, undefined, undefined, metrics,
478+
{ metrics },
484479
);
485480

486481
const msg = createMockMsg({ orderId: "123" }, ceHeaders);
@@ -507,7 +502,7 @@ describe("NATS Core Consumer Metrics", () => {
507502

508503
startCoreConsumers(
509504
nc as never, "test-service", registrations, silentLogger,
510-
undefined, undefined, undefined, metrics,
505+
{ metrics },
511506
);
512507

513508
const msg = {
@@ -545,7 +540,7 @@ describe("NATS Core Consumer Metrics", () => {
545540

546541
startCoreConsumers(
547542
nc as never, "test-service", registrations, silentLogger,
548-
undefined, undefined, undefined, metrics, mapper,
543+
{ metrics, routingKeyMapper: mapper },
549544
);
550545

551546
const msg = createMockMsg({ orderId: "123" }, ceHeaders);

__tests__/notifications.test.ts

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -83,10 +83,7 @@ describe("JetStream notifications", () => {
8383
"test-svc",
8484
registrations,
8585
silentLogger,
86-
undefined,
87-
undefined,
88-
undefined,
89-
onNotification,
86+
{ onNotification },
9087
);
9188

9289
await waitFor(() => {
@@ -149,11 +146,7 @@ describe("JetStream notifications", () => {
149146
"test-svc",
150147
registrations,
151148
silentLogger,
152-
undefined,
153-
undefined,
154-
undefined,
155-
undefined,
156-
onError,
149+
{ onError },
157150
);
158151

159152
await waitFor(() => {
@@ -229,8 +222,7 @@ describe("Core NATS notifications", () => {
229222
"test-service",
230223
registrations,
231224
silentLogger,
232-
undefined,
233-
onNotification,
225+
{ onNotification },
234226
);
235227

236228
const msg = createMockMsg({ orderId: "123" });
@@ -264,9 +256,7 @@ describe("Core NATS notifications", () => {
264256
"test-service",
265257
registrations,
266258
silentLogger,
267-
undefined,
268-
undefined,
269-
onError,
259+
{ onError },
270260
);
271261

272262
const msg = createMockMsg({ orderId: "123" });

__tests__/stream-config.test.ts

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,7 @@ describe("Stream config in startJSConsumers", () => {
8484
"test-svc",
8585
registrations,
8686
silentLogger,
87-
undefined,
88-
resolver,
87+
{ resolveStreamConfig: resolver },
8988
);
9089

9190
expect(jsm.streams.add).toHaveBeenCalledWith({
@@ -149,8 +148,7 @@ describe("Stream config in startJSConsumers", () => {
149148
"test-svc",
150149
registrations,
151150
silentLogger,
152-
undefined,
153-
resolver,
151+
{ resolveStreamConfig: resolver },
154152
);
155153

156154
expect(jsm.streams.update).toHaveBeenCalledWith("events", {
@@ -195,8 +193,7 @@ describe("Stream config in startJSConsumers", () => {
195193
"svc",
196194
registrations,
197195
silentLogger,
198-
undefined,
199-
resolver,
196+
{ resolveStreamConfig: resolver },
200197
);
201198

202199
expect(resolvedStreams).toEqual(["events", "custom"]);
@@ -235,8 +232,7 @@ describe("Stream config in startJSConsumers", () => {
235232
"test-svc",
236233
registrations,
237234
silentLogger,
238-
undefined,
239-
resolver,
235+
{ resolveStreamConfig: resolver },
240236
);
241237

242238
expect(jsm.streams.add).toHaveBeenCalledWith({

0 commit comments

Comments
 (0)