| 1 | |
| 2 | |
| 3 | |
| 4 | |
| 5 | |
| 6 | |
| 7 | |
| 8 | |
| 9 | |
| 10 |
|
| 11 | import "../../lib/router/utils.js";
|
| 12 | import { Deferred, createLineSplittingTransform } from "./utils.js";
|
| 13 | import { flatten } from "./flatten.js";
|
| 14 | import { unflatten } from "./unflatten.js";
|
| 15 |
|
| 16 | async function decode(readable, options) {
|
| 17 | const { plugins } = options ?? {};
|
| 18 | const done = new Deferred();
|
| 19 | const reader = readable.pipeThrough(createLineSplittingTransform()).getReader();
|
| 20 | const decoder = {
|
| 21 | values: [],
|
| 22 | hydrated: [],
|
| 23 | deferred: {},
|
| 24 | plugins
|
| 25 | };
|
| 26 | const decoded = await decodeInitial.call(decoder, reader);
|
| 27 | let donePromise = done.promise;
|
| 28 | if (decoded.done) done.resolve();
|
| 29 | else donePromise = decodeDeferred.call(decoder, reader).then(done.resolve).catch((reason) => {
|
| 30 | for (const deferred of Object.values(decoder.deferred)) deferred.reject(reason);
|
| 31 | done.reject(reason);
|
| 32 | });
|
| 33 | return {
|
| 34 | done: donePromise.then(() => reader.closed),
|
| 35 | value: decoded.value
|
| 36 | };
|
| 37 | }
|
| 38 | async function decodeInitial(reader) {
|
| 39 | const read = await reader.read();
|
| 40 | if (!read.value) throw new SyntaxError();
|
| 41 | let line;
|
| 42 | try {
|
| 43 | line = JSON.parse(read.value);
|
| 44 | } catch (reason) {
|
| 45 | throw new SyntaxError();
|
| 46 | }
|
| 47 | return {
|
| 48 | done: read.done,
|
| 49 | value: unflatten.call(this, line)
|
| 50 | };
|
| 51 | }
|
| 52 | async function decodeDeferred(reader) {
|
| 53 | let read = await reader.read();
|
| 54 | while (!read.done) {
|
| 55 | if (!read.value) continue;
|
| 56 | const line = read.value;
|
| 57 | switch (line[0]) {
|
| 58 | case "P": {
|
| 59 | const colonIndex = line.indexOf(":");
|
| 60 | const deferredId = Number(line.slice(1, colonIndex));
|
| 61 | const deferred = this.deferred[deferredId];
|
| 62 | if (!deferred) throw new Error(`Deferred ID ${deferredId} not found in stream`);
|
| 63 | const lineData = line.slice(colonIndex + 1);
|
| 64 | let jsonLine;
|
| 65 | try {
|
| 66 | jsonLine = JSON.parse(lineData);
|
| 67 | } catch (reason) {
|
| 68 | throw new SyntaxError();
|
| 69 | }
|
| 70 | const value = unflatten.call(this, jsonLine);
|
| 71 | deferred.resolve(value);
|
| 72 | break;
|
| 73 | }
|
| 74 | case "E": {
|
| 75 | const colonIndex = line.indexOf(":");
|
| 76 | const deferredId = Number(line.slice(1, colonIndex));
|
| 77 | const deferred = this.deferred[deferredId];
|
| 78 | if (!deferred) throw new Error(`Deferred ID ${deferredId} not found in stream`);
|
| 79 | const lineData = line.slice(colonIndex + 1);
|
| 80 | let jsonLine;
|
| 81 | try {
|
| 82 | jsonLine = JSON.parse(lineData);
|
| 83 | } catch (reason) {
|
| 84 | throw new SyntaxError();
|
| 85 | }
|
| 86 | const value = unflatten.call(this, jsonLine);
|
| 87 | deferred.reject(value);
|
| 88 | break;
|
| 89 | }
|
| 90 | default: throw new SyntaxError();
|
| 91 | }
|
| 92 | read = await reader.read();
|
| 93 | }
|
| 94 | }
|
| 95 | function encode(input, options) {
|
| 96 | const { onComplete, plugins, postPlugins, signal } = options ?? {};
|
| 97 | const encoder = {
|
| 98 | deferred: {},
|
| 99 | index: 0,
|
| 100 | indices: new Map(),
|
| 101 | stringified: [],
|
| 102 | plugins,
|
| 103 | postPlugins,
|
| 104 | signal
|
| 105 | };
|
| 106 | const textEncoder = new TextEncoder();
|
| 107 | let lastSentIndex = 0;
|
| 108 | return new ReadableStream({ async start(controller) {
|
| 109 | const id = await flatten.call(encoder, input);
|
| 110 | if (Array.isArray(id)) throw new Error("This should never happen");
|
| 111 | if (id < 0) controller.enqueue(textEncoder.encode(`${id}\n`));
|
| 112 | else {
|
| 113 | controller.enqueue(textEncoder.encode(`[${encoder.stringified.join(",")}]\n`));
|
| 114 | lastSentIndex = encoder.stringified.length - 1;
|
| 115 | }
|
| 116 | const seenPromises = new WeakSet();
|
| 117 | let processingChain = Promise.resolve();
|
| 118 | if (Object.keys(encoder.deferred).length) {
|
| 119 | let raceDone;
|
| 120 | const racePromise = new Promise((resolve, reject) => {
|
| 121 | raceDone = resolve;
|
| 122 | if (signal) {
|
| 123 | const rejectPromise = () => reject(signal.reason || new Error("Signal was aborted."));
|
| 124 | if (signal.aborted) rejectPromise();
|
| 125 | else signal.addEventListener("abort", (event) => {
|
| 126 | rejectPromise();
|
| 127 | });
|
| 128 | }
|
| 129 | });
|
| 130 | while (Object.keys(encoder.deferred).length > 0) {
|
| 131 | for (const [deferredId, deferred] of Object.entries(encoder.deferred)) {
|
| 132 | if (seenPromises.has(deferred)) continue;
|
| 133 | seenPromises.add(encoder.deferred[Number(deferredId)] = Promise.race([racePromise, deferred]).then((resolved) => {
|
| 134 | processingChain = processingChain.then(async () => {
|
| 135 | const id = await flatten.call(encoder, resolved);
|
| 136 | if (Array.isArray(id)) {
|
| 137 | controller.enqueue(textEncoder.encode(`P${deferredId}:[["Z",${id[0]}]]\n`));
|
| 138 | encoder.index++;
|
| 139 | lastSentIndex++;
|
| 140 | } else if (id < 0) controller.enqueue(textEncoder.encode(`P${deferredId}:${id}\n`));
|
| 141 | else {
|
| 142 | const values = encoder.stringified.slice(lastSentIndex + 1).join(",");
|
| 143 | controller.enqueue(textEncoder.encode(`P${deferredId}:[${values}]\n`));
|
| 144 | lastSentIndex = encoder.stringified.length - 1;
|
| 145 | }
|
| 146 | });
|
| 147 | return processingChain;
|
| 148 | }, (reason) => {
|
| 149 | processingChain = processingChain.then(async () => {
|
| 150 | if (!reason || typeof reason !== "object" || !(reason instanceof Error)) reason = new Error("An unknown error occurred");
|
| 151 | const id = await flatten.call(encoder, reason);
|
| 152 | if (Array.isArray(id)) {
|
| 153 | controller.enqueue(textEncoder.encode(`E${deferredId}:[["Z",${id[0]}]]\n`));
|
| 154 | encoder.index++;
|
| 155 | lastSentIndex++;
|
| 156 | } else if (id < 0) controller.enqueue(textEncoder.encode(`E${deferredId}:${id}\n`));
|
| 157 | else {
|
| 158 | const values = encoder.stringified.slice(lastSentIndex + 1).join(",");
|
| 159 | controller.enqueue(textEncoder.encode(`E${deferredId}:[${values}]\n`));
|
| 160 | lastSentIndex = encoder.stringified.length - 1;
|
| 161 | }
|
| 162 | });
|
| 163 | return processingChain;
|
| 164 | }).finally(() => {
|
| 165 | delete encoder.deferred[Number(deferredId)];
|
| 166 | }));
|
| 167 | }
|
| 168 | await Promise.race(Object.values(encoder.deferred));
|
| 169 | }
|
| 170 | raceDone();
|
| 171 | }
|
| 172 | await Promise.all(Object.values(encoder.deferred));
|
| 173 | await processingChain;
|
| 174 | controller.close();
|
| 175 | onComplete?.();
|
| 176 | } });
|
| 177 | }
|
| 178 |
|
| 179 | export { decode, encode };
|