Skip to content

Commit 619310a

Browse files
committed
feat(nuxt drizzle): realtime improvements and fixes
1 parent eaa8e38 commit 619310a

19 files changed

Lines changed: 1684 additions & 128 deletions

docs/plugins/nuxt-drizzle.md

Lines changed: 577 additions & 21 deletions
Large diffs are not rendered by default.

packages/nuxt-drizzle/package.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@
1717
"types": "./dist/types.d.ts",
1818
"import": "./dist/module.mjs",
1919
"require": "./dist/module.cjs"
20+
},
21+
"./realtime": {
22+
"types": "./dist/runtime/realtime-protocol.d.ts",
23+
"import": "./dist/runtime/realtime-protocol.js"
2024
}
2125
},
2226
"main": "./dist/module.cjs",

packages/nuxt-drizzle/src/module.ts

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,57 @@ export interface ModuleOptions {
3838
apiPath?: string
3939

4040
/**
41-
* Enable WebSocket support for real-time updates
41+
* Enable WebSocket support for real-time updates.
42+
*
43+
* Pass `true` to enable with defaults, or an object to customize the
44+
* server-side route and/or the client-side endpoint.
4245
*/
4346
ws?: boolean | {
47+
/**
48+
* Server-side route where the generated WebSocket handler is registered
49+
* (and, by default, the URL the client connects to).
50+
*
51+
* @default '/api/rstore-realtime/ws'
52+
*/
4453
apiPath?: string
54+
55+
/**
56+
* Fully overrides the URL the client uses to connect to the realtime
57+
* WebSocket. Useful when the WebSocket is hosted on a different
58+
* server (for example a dedicated realtime service).
59+
*
60+
* Can be an absolute URL (`wss://realtime.example.com/ws`), a
61+
* protocol-relative URL (`//realtime.example.com/ws`), or a path
62+
* relative to the current origin (`/ws`).
63+
*
64+
* When set, the server handler is still registered at `apiPath` so
65+
* you can keep using the built-in handler during development — point
66+
* this option at your external server in production via a runtime
67+
* config override if needed.
68+
*
69+
* @default apiPath
70+
*/
71+
clientEndpoint?: string
72+
73+
/**
74+
* Heartbeat interval in milliseconds. The client sends a `ping` frame
75+
* at this rate and expects a `pong` reply to detect dead connections.
76+
*
77+
* @default 10000
78+
*/
79+
heartbeatInterval?: number
80+
81+
/**
82+
* Auto-reconnect configuration forwarded to `@vueuse/core`'s
83+
* `useWebSocket`. Pass `true` for defaults, `false` to disable, or an
84+
* object to tune retries and backoff.
85+
*
86+
* @default true
87+
*/
88+
autoReconnect?: boolean | {
89+
retries?: number
90+
delay?: number
91+
}
4592
}
4693

4794
/**
@@ -516,6 +563,11 @@ ${collections.map((collection, index) => {
516563

517564
const wsOptions = typeof options.ws === 'object' ? options.ws : {}
518565
const wsApiPath = wsOptions.apiPath ?? `/api/rstore-realtime/ws`
566+
// Client-side endpoint URL — falls back to the server route so same-origin
567+
// setups keep working without extra configuration.
568+
const wsClientEndpoint = wsOptions.clientEndpoint ?? wsApiPath
569+
const wsHeartbeatInterval = wsOptions.heartbeatInterval ?? 10000
570+
const wsAutoReconnect = wsOptions.autoReconnect ?? true
519571

520572
if (options.ws) {
521573
nuxt.options.nitro.experimental ??= {}
@@ -582,6 +634,9 @@ export default createOfflinePlugin({
582634
filename: '$rstore-drizzle-config.js',
583635
getContents: () => `export const apiPath = ${JSON.stringify(apiPath)}
584636
export const wsApiPath = ${JSON.stringify(wsApiPath)}
637+
export const wsClientEndpoint = ${JSON.stringify(wsClientEndpoint)}
638+
export const wsHeartbeatInterval = ${JSON.stringify(wsHeartbeatInterval)}
639+
export const wsAutoReconnect = ${JSON.stringify(wsAutoReconnect)}
585640
export const dialect = '${drizzleConfig.dialect}'
586641
export const syncSerializeDateValue = ${offlineOptions?.serializeDateValue ? offlineOptions.serializeDateValue.toString() : 'undefined'}\n`,
587642
})

packages/nuxt-drizzle/src/runtime/plugin-realtime.ts

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
import type { SubscriptionMessage, SubscriptionUpdateMessage } from './utils/realtime'
22
// @ts-expect-error virtual module
3-
import { wsApiPath } from '#build/$rstore-drizzle-config.js'
3+
import { wsAutoReconnect, wsClientEndpoint, wsHeartbeatInterval } from '#build/$rstore-drizzle-config.js'
44
import { definePlugin, realtimeReconnectEventHook } from '@rstore/vue'
55
import { useWebSocket } from '@vueuse/core'
66
import { watch } from 'vue'
7+
import { getRstoreDrizzleClientId } from './utils/client-id'
78
import { getSubscriptionId } from './utils/realtime'
89

910
export default definePlugin({
@@ -16,14 +17,20 @@ export default definePlugin({
1617

1718
setup({ hook }) {
1819
if (import.meta.client) {
20+
const clientId = getRstoreDrizzleClientId()
21+
22+
// Local ref-count per subscription id — a given (collection, key, where)
23+
// is only sent to the server once, regardless of how many components
24+
// subscribe to it locally. The matching `unsubscribe` is only emitted
25+
// when the last subscriber is torn down.
1926
const countPerTopic: Record<string, number> = {}
2027
const messages = new Map<string, SubscriptionMessage>()
2128

22-
const ws = useWebSocket(wsApiPath, {
29+
const ws = useWebSocket(wsClientEndpoint, {
2330
heartbeat: {
24-
interval: 10000,
31+
interval: wsHeartbeatInterval,
2532
},
26-
autoReconnect: true,
33+
autoReconnect: wsAutoReconnect,
2734
})
2835

2936
let connectCount = 0
@@ -56,8 +63,14 @@ export default definePlugin({
5663
where,
5764
}
5865
const subscriptionId = getSubscriptionId(message)
59-
countPerTopic[subscriptionId] ??= 1
60-
countPerTopic[subscriptionId]--
66+
const current = countPerTopic[subscriptionId] ?? 0
67+
// Guard against decrementing below zero — a stray unsubscribe without
68+
// a matching subscribe would otherwise send a spurious unsubscribe
69+
// frame and corrupt the counter.
70+
if (current <= 0) {
71+
return
72+
}
73+
countPerTopic[subscriptionId] = current - 1
6174
if (countPerTopic[subscriptionId] === 0) {
6275
ws.send(JSON.stringify({
6376
subscription: message,
@@ -67,8 +80,8 @@ export default definePlugin({
6780
})
6881

6982
hook('init', ({ store }) => {
70-
watch(ws.data, async (data: string) => {
71-
if (data === 'pong') {
83+
watch(ws.data, async (data) => {
84+
if (typeof data !== 'string' || data === 'pong') {
7285
return
7386
}
7487
try {
@@ -112,23 +125,26 @@ export default definePlugin({
112125
})
113126

114127
watch(ws.status, (status) => {
115-
if (status === 'CLOSED') {
116-
// Reset counts on reconnect
117-
for (const key in countPerTopic) {
118-
countPerTopic[key] = 0
119-
}
120-
}
121-
else if (status === 'OPEN') {
128+
if (status === 'OPEN') {
122129
connectCount++
123130

124-
// Resubscribe to all topics
131+
// Announce our clientId first so the server can tag us for
132+
// skip-self echo suppression before any mutation echoes arrive.
133+
if (clientId) {
134+
ws.send(JSON.stringify({ init: { clientId } }), false)
135+
}
136+
137+
// Resubscribe to all active topics. On first open this replays
138+
// subscriptions issued while CONNECTING; on reconnect it restores
139+
// subscriptions across a transient disconnect.
125140
for (const [, message] of messages) {
126141
ws.send(JSON.stringify({
127142
subscription: message,
128143
}), false)
129144
}
130145

131-
// Call reconnect hook
146+
// Notify live queries to refresh so updates missed while offline
147+
// are recovered. Only fire on true reconnects (skip first open).
132148
if (connectCount > 1) {
133149
realtimeReconnectEventHook.trigger()
134150
}

packages/nuxt-drizzle/src/runtime/plugin.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,20 @@ import { apiPath, dialect } from '#build/$rstore-drizzle-config.js'
55
import { useRequestFetch } from '#imports'
66
import { definePlugin } from '@rstore/vue'
77
import SuperJSON from 'superjson'
8+
import { getRstoreDrizzleClientId, RSTORE_DRIZZLE_CLIENT_ID_HEADER } from './utils/client-id'
89
import { and, eq } from './utils/where'
910
import { filterWhere } from './where'
1011

12+
/**
13+
* Returns a headers object including the per-tab client id header when
14+
* running in the browser. The server-side realtime handler uses this id to
15+
* skip echoing the resulting update frame back to the originating client.
16+
*/
17+
function clientIdHeaders(): Record<string, string> | undefined {
18+
const id = getRstoreDrizzleClientId()
19+
return id ? { [RSTORE_DRIZZLE_CLIENT_ID_HEADER]: id } : undefined
20+
}
21+
1122
export default definePlugin({
1223
name: 'rstore-drizzle',
1324

@@ -171,6 +182,7 @@ export default definePlugin({
171182
const result: any = await requestFetch(`${apiPath}/${payload.collection.name}`, {
172183
method: 'POST',
173184
body: SuperJSON.stringify(payload.item),
185+
headers: clientIdHeaders(),
174186
})
175187
payload.setResult(result)
176188
})
@@ -186,13 +198,15 @@ export default definePlugin({
186198
const result: any = await requestFetch(`${apiPath}/${payload.collection.name}/${payload.key}`, {
187199
method: 'PATCH',
188200
body: SuperJSON.stringify(body),
201+
headers: clientIdHeaders(),
189202
})
190203
payload.setResult(result)
191204
})
192205

193206
hook('deleteItem', async (payload) => {
194207
await requestFetch(`${apiPath}/${payload.collection.name}/${payload.key}`, {
195208
method: 'DELETE',
209+
headers: clientIdHeaders(),
196210
})
197211
})
198212

@@ -267,6 +281,7 @@ export default definePlugin({
267281
method: 'POST',
268282
body: SuperJSON.stringify({ operations: wireOps }),
269283
responseType: 'text',
284+
headers: clientIdHeaders(),
270285
})
271286
response = SuperJSON.parse(raw) as BatchWireResponse
272287
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/**
2+
* Public entry point for the realtime protocol used by `@rstore/nuxt-drizzle`.
3+
*
4+
* Import from `@rstore/nuxt-drizzle/realtime` to build a custom realtime server
5+
* (or a non-Vue client) that speaks the same WebSocket protocol as the built-in
6+
* plugin. This module has **no Nuxt, Nitro, Vue, Drizzle, or H3 runtime
7+
* dependencies**, so it is safe to import from a standalone Node / Bun / Deno
8+
* / edge service.
9+
*
10+
* Protocol reference:
11+
* https://rstore.akryum.dev/plugins/nuxt-drizzle.html#realtime-protocol
12+
*/
13+
14+
// Pub/Sub — transport-agnostic helpers for fanning out updates.
15+
// The in-memory default is single-process; swap it out with `setPubSub()` for
16+
// multi-node deployments (Redis, NATS, Postgres LISTEN/NOTIFY, …).
17+
export type {
18+
PubSub,
19+
RstoreDrizzlePubSub,
20+
RstoreDrizzlePubSubChannels,
21+
} from './server/utils/pubsub'
22+
23+
export {
24+
createMemoryPubSub,
25+
getPubSub,
26+
setPubSub,
27+
usePeerPubSub,
28+
} from './server/utils/pubsub'
29+
30+
// Protocol message types & helpers
31+
export type {
32+
ClientInitMessage,
33+
RstoreDrizzleRealtimePayload,
34+
RstoreDrizzleRealtimeUpdateType,
35+
SubscriptionMessage,
36+
SubscriptionUpdateMessage,
37+
} from './utils/realtime'
38+
39+
export { getSubscriptionId, normalizeSubscriptionKey } from './utils/realtime'
40+
41+
// Subscription matcher — pure function used by the built-in WebSocket
42+
// handler; re-exported so custom realtime servers can share the exact
43+
// match semantics without re-implementing them.
44+
export type { RstoreDrizzleDialect } from './utils/subscription-match'
45+
46+
export { subscriptionMatches } from './utils/subscription-match'
47+
48+
// `where` filter — operator AST + evaluator
49+
export type {
50+
RestoreDrizzleConditionModifier,
51+
RstoreDrizzleBinaryOperator,
52+
RstoreDrizzleCondition,
53+
RstoreDrizzleConditionGroup,
54+
RstoreDrizzleTernaryOperator,
55+
RstoreDrizzleUnaryOperator,
56+
} from './utils/types'
57+
58+
export { filterWhere } from './where'

0 commit comments

Comments
 (0)