Skip to content

Commit 09a7509

Browse files
authored
Add a function createReactiveStoreFromRpcAndSubscription (#1528)
* Add a function `createReactiveStoreFromRpcAndSubscription` This creates a reactive store that combines an RPC and RPC Subscription, that can both map to the same data type. It exposes the latest value from either source, by slot comparison. * Update balance in react-app to use reactive store * Do not update state after signal is aborted * Update lockfile * Rename to createReactiveStoreWithInitialValueAndSlotTracking
1 parent 56251ed commit 09a7509

8 files changed

Lines changed: 768 additions & 55 deletions

File tree

.changeset/giant-falcons-invite.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@solana/kit': minor
3+
---
4+
5+
Add `createReactiveStoreWithInitialValueAndSlotTracking()`, a helper that combines an initial RPC fetch with an ongoing subscription into a single `ReactiveStore`. Uses slot-based comparison to ensure only the most recent value is kept, regardless of arrival order. Compatible with `useSyncExternalStore`, Svelte stores, and other reactive primitives.
Lines changed: 27 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,21 @@
1-
import { AccountNotificationsApi, Address, GetBalanceApi, Lamports, Rpc, RpcSubscriptions } from '@solana/kit';
1+
import {
2+
AccountNotificationsApi,
3+
Address,
4+
createReactiveStoreWithInitialValueAndSlotTracking,
5+
GetBalanceApi,
6+
Lamports,
7+
Rpc,
8+
RpcSubscriptions,
9+
} from '@solana/kit';
210
import { SWRSubscription } from 'swr/subscription';
311

4-
const EXPLICIT_ABORT_TOKEN = Symbol();
5-
612
/**
713
* This is an example of a strategy to fetch some account data and to keep it up to date over time.
814
* It's implemented as an SWR subscription function (https://swr.vercel.app/docs/subscription) but
915
* the approach is generalizable.
1016
*
11-
* 1. Fetch the current account state and publish it to the consumer
12-
* 2. Subscribe to account data notifications and publish them to the consumer
13-
*
14-
* At all points in time, check that the update you received -- no matter from where -- is from a
15-
* higher slot (ie. is newer) than the last one you published to the consumer.
17+
* It uses {@link createReactiveStoreWithInitialValueAndSlotTracking} to combine an initial RPC fetch with an
18+
* ongoing subscription, using slot-based comparison to ensure only the latest value is published.
1619
*/
1720
export function balanceSubscribe(
1821
rpc: Rpc<GetBalanceApi>,
@@ -21,53 +24,22 @@ export function balanceSubscribe(
2124
) {
2225
const [{ address }, { next }] = subscriptionArgs;
2326
const abortController = new AbortController();
24-
// Keep track of the slot of the last-published update.
25-
let lastUpdateSlot = -1n;
26-
// Fetch the current balance of this account.
27-
rpc.getBalance(address, { commitment: 'confirmed' })
28-
.send({ abortSignal: abortController.signal })
29-
.then(({ context: { slot }, value: lamports }) => {
30-
if (slot < lastUpdateSlot) {
31-
// The last-published update (ie. from the subscription) is newer than this one.
32-
return;
33-
}
34-
lastUpdateSlot = slot;
35-
next(null /* err */, lamports /* data */);
36-
})
37-
.catch(e => {
38-
if (e !== EXPLICIT_ABORT_TOKEN) {
39-
next(e /* err */);
40-
}
41-
});
42-
// Subscribe for updates to that balance.
43-
rpcSubscriptions
44-
.accountNotifications(address)
45-
.subscribe({ abortSignal: abortController.signal })
46-
.then(async accountInfoNotifications => {
47-
try {
48-
for await (const {
49-
context: { slot },
50-
value: { lamports },
51-
} of accountInfoNotifications) {
52-
if (slot < lastUpdateSlot) {
53-
// The last-published update (ie. from the initial fetch) is newer than this
54-
// one.
55-
continue;
56-
}
57-
lastUpdateSlot = slot;
58-
next(null /* err */, lamports /* data */);
59-
}
60-
} catch (e) {
61-
next(e /* err */);
62-
}
63-
})
64-
.catch(e => {
65-
if (e !== EXPLICIT_ABORT_TOKEN) {
66-
next(e /* err */);
67-
}
68-
});
69-
// Return a cleanup callback that aborts the RPC call/subscription.
27+
const store = createReactiveStoreWithInitialValueAndSlotTracking({
28+
abortSignal: abortController.signal,
29+
rpcRequest: rpc.getBalance(address, { commitment: 'confirmed' }),
30+
rpcSubscriptionRequest: rpcSubscriptions.accountNotifications(address),
31+
rpcSubscriptionValueMapper: ({ lamports }) => lamports,
32+
rpcValueMapper: lamports => lamports,
33+
});
34+
store.subscribe(() => {
35+
const error = store.getError();
36+
if (error) {
37+
next(error as Error);
38+
} else {
39+
next(null, store.getState());
40+
}
41+
});
7042
return () => {
71-
abortController.abort(EXPLICIT_ABORT_TOKEN);
43+
abortController.abort();
7244
};
7345
}

packages/kit/README.md

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,39 @@ await airdrop({
3838

3939
> [!NOTE] This only works on test clusters.
4040
41+
### `createReactiveStoreWithInitialValueAndSlotTracking(config)`
42+
43+
Creates a `ReactiveStore` that combines an initial RPC fetch with an ongoing subscription to keep its state up to date. Uses slot-based comparison to ensure only the most recent value is kept, regardless of whether it came from the RPC response or a subscription notification.
44+
45+
The returned store is compatible with React's `useSyncExternalStore`, Svelte stores, Solid's `from()`, and any other reactive primitive that expects a `{ subscribe, getState }` contract.
46+
47+
```ts
48+
import {
49+
address,
50+
createReactiveStoreWithInitialValueAndSlotTracking,
51+
createSolanaRpc,
52+
createSolanaRpcSubscriptions,
53+
} from '@solana/kit';
54+
55+
const rpc = createSolanaRpc('http://127.0.0.1:8899');
56+
const rpcSubscriptions = createSolanaRpcSubscriptions('ws://127.0.0.1:8900');
57+
const myAddress = address('FnHyam9w4NZoWR6mKN1CuGBritdsEWZQa4Z4oawLZGxa');
58+
59+
const balanceStore = createReactiveStoreWithInitialValueAndSlotTracking({
60+
abortSignal: AbortSignal.timeout(60_000),
61+
rpcRequest: rpc.getBalance(myAddress, { commitment: 'confirmed' }),
62+
rpcValueMapper: lamports => lamports,
63+
rpcSubscriptionRequest: rpcSubscriptions.accountNotifications(myAddress),
64+
rpcSubscriptionValueMapper: ({ lamports }) => lamports,
65+
});
66+
67+
const unsubscribe = balanceStore.subscribe(() => {
68+
const error = balanceStore.getError();
69+
if (error) console.error('Error:', error);
70+
else console.log('Balance:', balanceStore.getState());
71+
});
72+
```
73+
4174
### `decompileTransactionMessageFetchingLookupTables(compiledTransactionMessage, rpc, config)`
4275

4376
Returns a `TransactionMessage` from a `CompiledTransactionMessage`. If any of the accounts in the compiled message require an address lookup table to find their address, this function will use the supplied RPC instance to fetch the contents of the address lookup table from the network.

packages/kit/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@
119119
"@solana/rpc-parsed-types": "workspace:*",
120120
"@solana/rpc-spec-types": "workspace:*",
121121
"@solana/rpc-subscriptions": "workspace:*",
122+
"@solana/subscribable": "workspace:*",
122123
"@solana/rpc-types": "workspace:*",
123124
"@solana/signers": "workspace:*",
124125
"@solana/sysvars": "workspace:*",

0 commit comments

Comments
 (0)