-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserialize.ts
More file actions
118 lines (113 loc) · 3.57 KB
/
serialize.ts
File metadata and controls
118 lines (113 loc) · 3.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
const enc = new TextEncoder();
const dec = new TextDecoder();
const voidMessage = new Uint8Array();
/** Serialize a message. */
export function serialize(kind: number, data?: unknown) {
if (kind < 0 || kind > 31) {
throw new RangeError("Invalid message kind");
}
const isRaw = typeof data === "string";
const payload = data === undefined
? voidMessage
: enc.encode(isRaw && data !== "" ? data : JSON.stringify(data));
const payloadLen = payload.byteLength;
const lenBits = payloadLen <= 0xff ? 0 : (payloadLen <= 0xffff ? 0b01 : 0b11);
const headSize = 2 + lenBits;
const buffer = new Uint8Array(headSize + payloadLen);
buffer[0] = (kind << 3) | lenBits << 1 | (isRaw ? 1 : 0);
if (headSize === 2) {
buffer[1] = payloadLen;
} else {
const view = new DataView(buffer.buffer);
if (headSize === 3) {
view.setUint16(1, payloadLen);
} else {
view.setUint32(1, payloadLen);
}
}
if (payloadLen > 0) {
buffer.set(payload, headSize);
}
return buffer;
}
/** Deserialize a stream of messages. */
export async function* deserialize(stream: ReadableStream<Uint8Array>): AsyncGenerator<[kind: number, data: unknown]> {
let buffer: Uint8Array<ArrayBufferLike> = new Uint8Array();
const reader = stream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}
if (!value || value.byteLength === 0) {
continue;
}
if (buffer.byteLength === 0) {
buffer = value;
} else {
const buf = new Uint8Array(buffer.byteLength + value.byteLength);
buf.set(buffer, 0);
buf.set(value, buffer.byteLength);
buffer = buf;
}
let offset = 0;
while (buffer.byteLength - offset >= 2) {
const view = new DataView(buffer.buffer, buffer.byteOffset + offset, buffer.byteLength - offset);
const head = view.getUint8(0);
const kind = head >> 3;
const lenBits = (head >> 1) & 0b11;
const isRaw = (head & 1) === 1;
let headSize = 2;
let payloadLen = view.getUint8(1);
if (lenBits > 0) {
if (lenBits !== 0b01 && lenBits !== 0b11) {
throw new Error("Invalid message frame");
}
headSize += lenBits;
if (buffer.byteLength - offset < headSize) {
break;
}
payloadLen = lenBits === 0b01 ? view.getUint16(1) : view.getUint32(1);
}
const payloadEnd = offset + headSize + payloadLen;
if (buffer.byteLength < payloadEnd) {
break;
}
let payload: unknown = undefined;
if (payloadLen > 0) {
const payloadRaw = dec.decode(buffer.subarray(offset + headSize, payloadEnd));
payload = isRaw ? payloadRaw : JSON.parse(payloadRaw);
}
yield [kind, payload];
offset = payloadEnd;
}
if (offset > 0) {
buffer = buffer.subarray(offset);
}
}
}
/** Read a SSE stream and yield the data lines. */
export async function* readSSEStream(stream: ReadableStream<Uint8Array>): AsyncGenerator<string> {
const trimCR = (line: string) => line.endsWith("\r") ? line.slice(0, -1) : line;
const reader = stream.getReader();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) {
const line = trimCR(buffer);
if (line.startsWith("data: ")) {
yield line.slice(6);
}
break;
}
buffer += dec.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() ?? "";
for (let line of lines) {
line = trimCR(line);
if (line.startsWith("data: ")) {
yield line.slice(6);
}
}
}
}