Server-side SSE Data Proxy with fetch-based EventSource implementation
Server-Sent Events(SSE)
is a program that pushes real-time updates from the server to the client in one direction, and the basic principle is that the client pushes real-time updates to the client through theHTTP
The request opens a persistent connection to the server through which the server can continuously send event data.SSE
Ideal for applications that require continuous data updates, such as real-time notifications, message pushes, and dynamic content updates, as compared to theWebSocket
of data communication solutions that are more lightweight.SSE
Easier to implement and more suitable for simple unidirectional data flow scenarios.
descriptive
SSE
Essentially the use ofHTTP
Long links as well asReadableStream
A unidirectional data streaming scheme is implemented where the client can maintain a one-way connection to the server and continuously receive real-time events pushed by the server without requiring the client to constantly send requests to the server for data updates. Instead, the browser implements the basicEventSource
object, which can easily handle the server-side response, and the server-side can naturally handle the server-side response by keeping on theResponse
object to write data to realize the streaming response. And in our actual business needs, both server-side and client-side are not so ideal scenarios: the
- server-side preprocessing response, in the process of realizing the requirements of a streaming-like dialog, we usually put the
LLM
The reasoning data is forwarded to the client through the server side, and in the process of processing on the server side we will need to filter, review and other operations on the data, so we need to accept the streaming response on the server side for data preprocessing before streaming the response to the client. - The server-side data is forwarded directly, and in the absence of data preprocessing, it would be cumbersome to receive the data streaming response on the server side and then forward it to the client, so we can directly treat the request as a
HTTP
The long connection proxies to the request address of the target without actually implementing the receipt of the response and then forwarding it to the client. - on the basis of
fetch
requesting data.EventSource
Objects can only initiateGET
request, and it is not possible to define the request header and carry the request body, which in cases where authentication is required requires encoding everything into theURL
On the other hand, most browsers do not have theURL
All are limited in length to2000
character, and therefore based on thefetch
realizationSSE
Data requests, on the other hand, can solve the above problems.
Here we first go through theEventSource
object to implement the basicSSE
As a result ofEventSource
objects are browser-implementedAPI
, which is part of the client-side implementation, so here we also need to first use theImplementing a streaming response to data on the server side, the text deals with the
DEMO
all in/WindRunnerMax/webpack-simple-environment
Center.
It's easier to implement a basic streaming data response in the server side, we first need to set the response header totext/event-stream;
, note that the response header is required to be set before the response body, otherwise the response header will not be set until the response body is executed.before and after
will result in a response of
ERR_INVALID_CHUNKED_ENCODING
。
// packages/fetch-sse/server/modules/
const ping = (req: , res: <>) => {
(200, {
"Content-Type": "text/event-stream; charset=utf-8",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
});
}
SSE
In fact, it is a kind of agreement, then since it is an agreement naturally need to have a fixed format in thetext/event-stream
The response format for each set of data in the\n\n
separated, whereas data in a group that needs to be passed more than one type needs to be separated by the\n
for example, if we need to pass bothid
、event
cap (a poem)data
Data for field.
id: 1
event: message
data: hello world
id: 2
event: custom
data: hello
data: world
existServer-Sent Events
The event comes with auto-reconnect with the eventid
management methods, and of course all this processing is preset in the browserEventSource
to achieve this, if we use thefetch
To implement this you need to manage it yourself. But it works in our current basic example, and in addition we can pass messages by customizing the event name, if we pass only the:xxx\n
format can also be used as a comment, so we can declare information about it when we create a connection: the
// packages/fetch-sse/server/modules/
("retry: 10000\n");
("id: -1\n");
("event: connect\n");
("data: " + new Date() + "\n\n");
Then on the client side you need to pass theEventSource
object creates the connection and then receives the data from the server side as described above via a custom event, whereas in practice if you don't specify a specific event name, i.e. the aforementionedconnect
event, it will default to the defaultmessage
event, which means that the event name is not required here.
// packages/fetch-sse/client/components/
const onConnect = useMemoFn((e: MessageEvent<string>) => {
prepend("Start Time: " + );
});
const source = new EventSource("/ping");
("connect", onConnect);
For the defaultmessage
event, we similarly output it on the server side, as we mentioned earlier as long as we don't call theThis will cause the entire connection to hang, so if we want to maintain the connection we just need to keep sending data to the client via a timer.
// packages/fetch-sse/server/modules/
let index = 0;
const interval = setInterval(() => {
("id: " + index++ + "\n");
("data: " + new Date() + "\n\n");
}, 1000);
And on the client side we can add a new function to thesource
The object addsonmessage
event binding, or you can justaddEventListener(message)
to bind the event. In addition, when we successfully bind an event via theEventSource
After the object creates a connection, we can add the connection to the browser console'sNetwork
The panel seesEventStream
of the data transfer panel, we define theid
、type
、data
、time
All will be displayed here.
// packages/fetch-sse/client/components/
const prepend = (text: string) => {
const el = ;
if (!el) return;
const child = ("div");
= text;
(child);
};
const onMessage = (e: MessageEvent<string>) => {
prepend("Ping: " + );
};
const source = new EventSource("/ping");
= onMessage;
On the server side we also need to note that when the user's client-side connection is closed we also need to close the server-side request to avoid additional resource utilization, of course, in our timer here if it is not closed then it is a memory leak and not just additional resource utilization.
("close", () => {
("[ping] connection close");
clearInterval(interval);
();
});
In addition, when not passedHTTP/2
When establishing a connection, theSSE
Individual domains are limited by the maximum number of connections, which can be troublesome when opening multiple tabs. This limit is designed by the browser for data requests, and is set to a very low6
Number of connections. This limit is per domain request, so it means we can open the6
classifier for individual things or people, general, catch-all classifierSSE
connect toand the simultaneous opening of the
6
classifier for individual things or people, general, catch-all classifierSSE
connect toInstead, use
HTTP/2
When, at the same timeHTTP
The maximum number of connections is negotiated between the server and the client and defaults to100
。
server-side
Before the server side can handle data forwarding and proxying, we naturally need to define the data source for the whole event. Here we don't need to actually dock for exampleOpenAI
、Coze
streaming response that just needs to be simulated, so here we'll start by defining the/stream
interface to simulate streaming output. One thing to note here is that normally our outputs areMarkdown
format, then it's only natural that here\n
The symbols of theSSE
agreements\n
is needed as a keyword, so we'll need to encode/decode it as a way to avoid the\n
keyword, then either use theor
encodeURIComponent
It's all possible, and here we'll keep it simple and just put the\n
Replace with\\n
。
// packages/fetch-sse/server/modules/
const content = `# Exodus
- Zhuge Liang
The late Emperor demised in the middle of his reign before he was halfway through his career, and today, with the three divisions under the Emperor and the weakened state of Yizhou, this is a time of great danger. However, the ministers of defense are tireless in the interior, and the loyal soldiers forget their lives in the exterior, because they want to repay Your Majesty for the honor bestowed upon them by the late Emperor. It is appropriate to open up the Holy Hearing, to light the legacy of the late emperor, to restore the spirit of ambition, should not be presumptuous, citing the metaphor of loss of righteousness, in order to plug the road of loyal advice.
...
I am now far away from the table and I don't know what to say. `.replace(/\n/g, " \\n").
Setting up response headers and other processing will not be described too much here, and there may be two types of output in the actual model inference process, one is to output all the contents of this conversation, similar to the string for theslice
Always from0
start, the other outputs only the latest contentdelta
similar toslice
The last output of theend
as the next output of thestart
. Let's keep it simple here and go with the first approach of removing the content from the0
Start output is constantly pushed to the client.
Since this is an analog streaming output, we set a timer directly and randomly generate the step length for this output and then use it as the newstart
Record it and output the content to the client immediately afterward, here we directly use the defaultmessage
event, and when the output reaches the end, the timer will be closed and the connection will be closed. Of course, we can't ignore the fact that when the connected client closes, we need to take the initiative to clean up the current timer to avoid wasting computing resources on the server side.
// packages/fetch-sse/server/modules/
("event: connect\n");
("data: " + () + "\n\n");
let start = 0;
const interval = setInterval(() => {
const slice = (() * 30) + 1;
start = start + slice;
("event: message\n");
("data: " + (0, start) + "\n\n");
if (start >= ) {
clearInterval(interval);
();
}
}, 500);
("close", () => {
("[stream] connection close");
clearInterval(interval);
();
});
data forwarding
After defining the data source interface, we can start to realize the function of data forwarding to achieve the server-side preprocessing response, that is, here we can filter the data, review and other operations. So we need to accept the streaming response on the server side, after the data preprocessing and then streaming response to the client. So in this forwarding interface first we need to initiate a request to the data source interface, where we directly use thenode-fetch
to initiate the request.
// packages/fetch-sse/server/modules/
import fetch from "node-fetch";
const response = await fetch("http://127.0.0.1:8800/stream")
utilizationnode-fetch
It is important to note that we are directly using thets-node
service is started, so it's still a good idea if theCJS
sneak intoESM
would cause an exception to be thrown, so here we need to select theversion. In addition, we need to define the
AbortController
in order to terminate the request in time when the client closes the connection in thenode-fetch
centerIt's still readable.
ReadableStream
This is the way to handle the forwarding ofSSE
Response.
// packages/fetch-sse/server/modules/
const ctrl = new AbortController();
const response = await fetch("http://127.0.0.1:8800/stream", {
signal: as AbortSignal,
});
const readable = ;
if (!readable) return null;
("close", () => {
("[transfer] connection close");
();
();
});
On the server side we are notEventSource
object to receive the data, then naturally we can only receive the data based on theSSE
protocol to parse the data on its own, and since we are parsing the data through theReadableStream
to implement the data read, then we need to stream the binary data and not parse the delimiters directly. So here we implement theStreamParser
When receiving theUint8Array
After the binary data, we first merge it into the newbuffer
and then iterates over the current data when it encounters\n
If you do not want to use it, it will be dispatched to theonLine
method to process the data.
// packages/fetch-sse/server/utils/
export class StreamParser {
private compose(data: Uint8Array) {
const buffer = new Uint8Array( + );
();
(data, );
= buffer;
return buffer;
}
public onBinary(bytes: Uint8Array) {
const buffer = (bytes);
const len = ;
let start = 0;
for (let i = 0; i < len; i++) {
if (buffer[i] === 10) {
((start, i));
start = i + 1;
}
}
= (start);
}
}
When processing theonLine
When we do, we'll need to base it on theSSE
protocol to parse the data on a row-by-row basis, the format of the data we'll be working with will bex: xxx;
With our handling of the\n
is as the end node will not be passed the parameter, then at this point if our data pass length is0
Then it is necessary to initiateonMessage
event, passing the event name and data to a predefined event handler. After that we can use theTextDecoder
to parse it into a string, which can then be parsed according to the:
to separate and parse the data now.
// packages/fetch-sse/server/utils/
export class StreamParser {
private onLine(bytes: Uint8Array) {
if ( === 0) {
if ( && ) {
= || "";
( as Message);
}
= {};
return;
}
const decoder = new TextDecoder();
const line = (bytes);
const [field, ...rest] = (":");
const value = (":").trim();
switch (field) {
case "id":
= value;
break;
case "event":
= value;
break;
case "data":
= || "message";
= value;
break;
default:
break;
}
}
}
It is important to note here that theNode
hit the nail on the headReadableStream
With the browser-implementedReadableStream
The function signatures are not the same, so here it's straightforward and convenient to use theawait
Just iterate the data, and of course use theon("data") on("end")
to receive the data and end the response. We also need to bind theonMessage
event to receive the parsed data and respond to the target client.
// packages/fetch-sse/server/utils/
const parser = new StreamParser();
= message => {
(`event: ${}\n`);
(`data: ${}\n\n`);
};
for await (const chunk of readable) {
const buffer = chunk as Buffer;
const uint = new Uint8Array(buffer);
(uint);
}
();
requesting agent
When no data preprocessing is required, we can directly treat the request as aHTTP
A long connection proxies to the request address of the target without actually implementing a response that is received and then forwarded to the client. Here we can do this directly with the help of thehttp
module to implement forwarding, you first need thenode:url
module to parse the destination address, and then you can pass theto initiate the request, and when the connection is established you can directly send the data to the
pipe
TargetedResponse
object, of course, using the("data") +
It's also possible.
// packages/fetch-sse/server/modules/
const targetUrl = new URL("http://127.0.0.1:8800/stream");
const options: = {
hostname: ,
port: ,
path: ,
method: ,
headers: ,
};
const proxyReq = (options, proxyRes => {
( || 404, );
(res);
});
Here we naturally also need to deal with some special cases, first for thePOST
demandingbody
data processing, we need to forward all the data from the request to the new request as well, and the same can be done here using the("data") +
to realize. And for exception handling we also need to pass the response error message to the client, where the error code response is still important, and will close the request to the target. When the client's request is closed, it is also necessary to close the target's request, as well as end the response.
(proxyReq);
("error", error => {
("proxy error", error);
(502, { "Content-Type": "text/plain" });
("Bad Gateway");
});
("close", () => {
("[proxy] connection close");
();
();
});
There's actually another problem here, if you use the("close")
to listen for connection closures on the client side, then in thePOST
Problems will occur in the request. We can just execute the followingnode
program, and then you can use thecurl
to initiate the request, after which it actively breaks the link, and then it can be found that the("close")
would be triggered too early, instead of executing after we actively disconnect the request.
echo "
const http = require('http');
const server = ((req, res) => {
('close', () => {
('close');
});
('data', (chunk) => {
('data:', new TextDecoder().decode(chunk));
});
setTimeout(() => ('end'), 10000);
});
(8001);
" | node;
curl -X POST http://127.0.0.1:8001 \
-H "Content-Type: application/json" \
-d '{"key1":"value1", "key2":"value2"}'
In fact here in our request there is("close")
、("close")
、("close")
These three events, inreq
The event will be carried by the abovebody
data, so here it is possible to use theres
cap (a poem)socket
event to listen to the client's connection closure. To make it easier for us to trigger the event, here we directly use thesocket
event to listen for connection closure on the client, in addition to thesocket
Attributes in thenode16
The attribute before is namedconnection
。
echo "
const http = require('http');
const server = ((req, res) => {
('close', () => {
('res close');
});
('close', () => {
('socket close');
});
('data', (chunk) => {
('data:', new TextDecoder().decode(chunk));
});
setTimeout(() => ('end'), 10000);
});
(8001);
" | node;
curl -X POST http://127.0.0.1:8001 \
-H "Content-Type: application/json" \
-d '{"key1":"value1", "key2":"value2"}'
client (computing)
On the client side we need to use thefetch
realizationSSE
By means of thefetch
You can pass the request header and request body, and you can send thePOST
and other types of requests to avoid only being able to sendGET
request and need to encode everything into theURL
on the problem. If the connection is broken, we can also control the retry policy for theEventSource object
The browser will silently retry a couple times for you and then stop, which is not good enough for any type of robust application. If you need to do some custom validation and processing before parsing the event source, you can also access the response object, which is useful for applying server-side programs before theAPI
Designs such as gateways are very effective.
fetch implementation
on the basis offetch
implementation is actually fairly simple, we first need to create aAbortController
object in order to terminate the request in time for the client to close the connection, and then it is possible to pass thefetch
to initiate the request, and when the request is successful we can pass theto read
ReadableStream
。
// packages/fetch-sse/client/components/
const signal = new AbortController();
fetch("/proxy", { method: "POST", signal: })
.then(res => {
onOpen(res);
const body = ;
if (!body) return null;
})
For the streaming of data, as opposed to the server-side implementation of theStreamParser
approach is consistent, and earlier we mentioned that since theReadableStream
function signature is different, here we'll just use thePromise
is handled by the chained calls to theUint8Array
The data is handled in the same way as before. There's actually another interesting thing here, using theEventSource
object in the browser console'sNetwork
It can be seen in theEventStream
of the data transfer panel, while using thefetch
of data exchanges are not recorded.
// packages/fetch-sse/client/components/
const reader = ();
const parser = new StreamParser();
= onMessage;
const process = (res: ReadableStreamReadResult<Uint8Array>) => {
if () return null;
();
reader
.read()
.then(process)
.catch(() => null);
};
().then(process);
streaming interaction
Once our data transfer scheme is realized, we can implement streaming interactions on the client side. When we implement a streaming interaction with the help ofStreamParser
method to parse the traveled data, the decoding operation is required, which is the opposite of the encoding scheme described above, and here it is only necessary to set the\\n
Replace with\n
That's all it takes. Then here we set the output interactions for both speeds, and if there is too much unoutput text content, the10ms
to output a text, otherwise it starts with50ms
The speed of the output text.
// packages/fetch-sse/client/components/
const onMessage = useMemoFn((e: Message) => {
if ( !== "message") return null;
setPainting(true);
const data = ;
const text = (/\\n/g, "\n");
const start = ;
const len = ;
const delay = len - start > 50 ? 10 : 50;
const process = () => {
++;
const end = ;
append((0, end));
if (end < len) {
= setTimeout(process, delay);
}
if (! && end >= len) {
setPainting(false);
}
};
setTimeout(process, delay);
});
Once we've parsed out the data, we need to apply it to theDOM
Structurally, one thing to note here is that if we refresh the entireDOM
If the content, it will cause us to not be able to select the previously output content to copy, that is to say, we can not output the content while selecting the content. So here we need to refine the update, the simplest solution is to update by row, we can record the last rendering of the row index, the update range is the last index to the current index.
// packages/fetch-sse/client/components/
const append = (text: string) => {
const el = ;
if (!el) return null;
const mdIt = MarkdownIt();
const textHTML = (text);
const dom = new DOMParser().parseFromString(textHTML, "text/html");
const current = ;
const children = ();
for (let i = current; i < ; i++) {
children[i] && children[i].remove();
}
const next = ;
for (let i = current; i < ; i++) {
next[i] && (next[i].cloneNode(true));
}
= - 1;
};
Here there is also a scrolling interaction needs to be dealt with, when the user freely scrolls the content, we can not force the user to scroll back to the bottom of the position, so we need to record whether the user scrolls over, when the user scrolls over the time we no longer automatically scroll, if the -
together withThe difference between the
1
If it is, then it is considered to be automatically scrolled, and it is also important to note here that thescrollTo
unavailablesmooth
scrolling effect, which would cause ouronScroll
Rolling calculations are inaccurate.
const append = (text: string) => {
&& ({ top: });
};
useEffect(() => {
const el = ;
if (!el) return;
= () => {
if ( - - <= 1) {
= true;
} else {
= false;
}
};
return () => {
= null;
};
}, []);
In the streaming output here, we can also realize the cursor blinking effect, which is relatively simple, we can just use theCSS
animation is implemented with pseudo-classes, and it is important to note here that if it is implemented without pseudo-classes, it will result in our previousDOM
Node appending requires a bit more processing. In addition, since the processing ofMarkdown
There will actually be nesting of nodes, so the handling of nodes will require a:not
to concretize the process.
// packages/fetch-sse/client/styles/
@keyframes blink {
0% { opacity: 1; }
50% { opacity: 0; }
100% { opacity: 1; }
}
.textarea {
&.painting > *:last-child:not(ol):not(ul),
&.painting > ol:last-child > li:last-child,
&.painting > ul:last-child > li:last-child {
&::after {
animation: blink 1s infinite;
background-color: #000;
content: '';
display: inline-block;
height: 1em;
margin-top: -2px;
vertical-align: middle;
width: 1px;
}
}
}
question of the day
/WindrunnerMax/EveryDay
consultation
/Azure/fetch-event-source
/zh-CN/docs/Web/API/EventSource
/blog/2017/05/server-sent_events.html
/docs//api/#messagesocket
/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events
/questions/7348736/how-to-check-if-connection-was-aborted-in-node-js-server
/questions/76115409/why-does-node-js-express-call-request-close-on-post-request-with-data-before-r