Location>code7788 >text

Server-side SSE Data Proxy with fetch-based EventSource implementation

Popularity:240 ℃/2024-11-12 10:40:40

Server-side SSE Data Proxy with fetch-based EventSource implementation

Server-Sent Events(SSE)is a program that pushes real-time updates from the server to the client in one direction, and the basic principle is that the client pushes real-time updates to the client through theHTTPThe request opens a persistent connection to the server through which the server can continuously send event data.SSEIdeal for applications that require continuous data updates, such as real-time notifications, message pushes, and dynamic content updates, as compared to theWebSocketof data communication solutions that are more lightweight.SSEEasier to implement and more suitable for simple unidirectional data flow scenarios.

descriptive

SSEEssentially the use ofHTTPLong links as well asReadableStreamA unidirectional data streaming scheme is implemented where the client can maintain a one-way connection to the server and continuously receive real-time events pushed by the server without requiring the client to constantly send requests to the server for data updates. Instead, the browser implements the basicEventSourceobject, which can easily handle the server-side response, and the server-side can naturally handle the server-side response by keeping on theResponseobject to write data to realize the streaming response. And in our actual business needs, both server-side and client-side are not so ideal scenarios: the

  • server-side preprocessing response, in the process of realizing the requirements of a streaming-like dialog, we usually put theLLMThe reasoning data is forwarded to the client through the server side, and in the process of processing on the server side we will need to filter, review and other operations on the data, so we need to accept the streaming response on the server side for data preprocessing before streaming the response to the client.
  • The server-side data is forwarded directly, and in the absence of data preprocessing, it would be cumbersome to receive the data streaming response on the server side and then forward it to the client, so we can directly treat the request as aHTTPThe long connection proxies to the request address of the target without actually implementing the receipt of the response and then forwarding it to the client.
  • on the basis offetchrequesting data.EventSourceObjects can only initiateGETrequest, and it is not possible to define the request header and carry the request body, which in cases where authentication is required requires encoding everything into theURLOn the other hand, most browsers do not have theURLAll are limited in length to2000character, and therefore based on thefetchrealizationSSEData requests, on the other hand, can solve the above problems.

Here we first go through theEventSourceobject to implement the basicSSEAs a result ofEventSourceobjects are browser-implementedAPI, which is part of the client-side implementation, so here we also need to first use theImplementing a streaming response to data on the server side, the text deals with theDEMOall in/WindRunnerMax/webpack-simple-environmentCenter.

It's easier to implement a basic streaming data response in the server side, we first need to set the response header totext/event-stream;, note that the response header is required to be set before the response body, otherwise the response header will not be set until the response body is executed.before and afterwill result in a response ofERR_INVALID_CHUNKED_ENCODING

// packages/fetch-sse/server/modules/
const ping = (req: , res: <>) => {
  (200, {
    "Content-Type": "text/event-stream; charset=utf-8",
    "Cache-Control": "no-cache",
    "Connection": "keep-alive",
  });
}

SSEIn fact, it is a kind of agreement, then since it is an agreement naturally need to have a fixed format in thetext/event-streamThe response format for each set of data in the\n\nseparated, whereas data in a group that needs to be passed more than one type needs to be separated by the\nfor example, if we need to pass bothideventcap (a poem)dataData for field.

id: 1
event: message
data: hello world

id: 2
event: custom
data: hello
data: world

existServer-Sent EventsThe event comes with auto-reconnect with the eventidmanagement methods, and of course all this processing is preset in the browserEventSourceto achieve this, if we use thefetchTo implement this you need to manage it yourself. But it works in our current basic example, and in addition we can pass messages by customizing the event name, if we pass only the:xxx\nformat can also be used as a comment, so we can declare information about it when we create a connection: the

// packages/fetch-sse/server/modules/
("retry: 10000\n");
("id: -1\n");
("event: connect\n");
("data: " + new Date() + "\n\n");

Then on the client side you need to pass theEventSourceobject creates the connection and then receives the data from the server side as described above via a custom event, whereas in practice if you don't specify a specific event name, i.e. the aforementionedconnectevent, it will default to the defaultmessageevent, which means that the event name is not required here.

// packages/fetch-sse/client/components/
const onConnect = useMemoFn((e: MessageEvent<string>) => {
  prepend("Start Time: " + );
});
const source = new EventSource("/ping");
("connect", onConnect);

For the defaultmessageevent, we similarly output it on the server side, as we mentioned earlier as long as we don't call theThis will cause the entire connection to hang, so if we want to maintain the connection we just need to keep sending data to the client via a timer.

// packages/fetch-sse/server/modules/
let index = 0;
const interval = setInterval(() => {
  ("id: " + index++ + "\n");
  ("data: " + new Date() + "\n\n");
}, 1000);

And on the client side we can add a new function to thesourceThe object addsonmessageevent binding, or you can justaddEventListener(message)to bind the event. In addition, when we successfully bind an event via theEventSourceAfter the object creates a connection, we can add the connection to the browser console'sNetworkThe panel seesEventStreamof the data transfer panel, we define theidtypedatatimeAll will be displayed here.

// packages/fetch-sse/client/components/
const prepend = (text: string) => {
  const el = ;
  if (!el) return;
  const child = ("div");
   = text;
  (child);
};

const onMessage = (e: MessageEvent<string>) => {
  prepend("Ping: " + );
};

const source = new EventSource("/ping");
 = onMessage;

On the server side we also need to note that when the user's client-side connection is closed we also need to close the server-side request to avoid additional resource utilization, of course, in our timer here if it is not closed then it is a memory leak and not just additional resource utilization.

("close", () => {
  ("[ping] connection close");
  clearInterval(interval);
  ();
});

In addition, when not passedHTTP/2When establishing a connection, theSSEIndividual domains are limited by the maximum number of connections, which can be troublesome when opening multiple tabs. This limit is designed by the browser for data requests, and is set to a very low6Number of connections. This limit is per domain request, so it means we can open the6classifier for individual things or people, general, catch-all classifierSSEconnect toand the simultaneous opening of the6classifier for individual things or people, general, catch-all classifierSSEconnect toInstead, useHTTP/2When, at the same timeHTTPThe maximum number of connections is negotiated between the server and the client and defaults to100

server-side

Before the server side can handle data forwarding and proxying, we naturally need to define the data source for the whole event. Here we don't need to actually dock for exampleOpenAICozestreaming response that just needs to be simulated, so here we'll start by defining the/streaminterface to simulate streaming output. One thing to note here is that normally our outputs areMarkdownformat, then it's only natural that here\nThe symbols of theSSEagreements\nis needed as a keyword, so we'll need to encode/decode it as a way to avoid the\nkeyword, then either use theorencodeURIComponentIt's all possible, and here we'll keep it simple and just put the\nReplace with\\n

// packages/fetch-sse/server/modules/
const content = `# Exodus

- Zhuge Liang

The late Emperor demised in the middle of his reign before he was halfway through his career, and today, with the three divisions under the Emperor and the weakened state of Yizhou, this is a time of great danger. However, the ministers of defense are tireless in the interior, and the loyal soldiers forget their lives in the exterior, because they want to repay Your Majesty for the honor bestowed upon them by the late Emperor. It is appropriate to open up the Holy Hearing, to light the legacy of the late emperor, to restore the spirit of ambition, should not be presumptuous, citing the metaphor of loss of righteousness, in order to plug the road of loyal advice.

...

I am now far away from the table and I don't know what to say. `.replace(/\n/g, " \\n").

Setting up response headers and other processing will not be described too much here, and there may be two types of output in the actual model inference process, one is to output all the contents of this conversation, similar to the string for thesliceAlways from0start, the other outputs only the latest contentdeltasimilar tosliceThe last output of theendas the next output of thestart. Let's keep it simple here and go with the first approach of removing the content from the0Start output is constantly pushed to the client.

Since this is an analog streaming output, we set a timer directly and randomly generate the step length for this output and then use it as the newstartRecord it and output the content to the client immediately afterward, here we directly use the defaultmessageevent, and when the output reaches the end, the timer will be closed and the connection will be closed. Of course, we can't ignore the fact that when the connected client closes, we need to take the initiative to clean up the current timer to avoid wasting computing resources on the server side.

// packages/fetch-sse/server/modules/
("event: connect\n");
("data: " + () + "\n\n");

let start = 0;
const interval = setInterval(() => {
  const slice = (() * 30) + 1;
  start = start + slice;
  ("event: message\n");
  ("data: " + (0, start) + "\n\n");
  if (start >= ) {
    clearInterval(interval);
    ();
  }
}, 500);

("close", () => {
  ("[stream] connection close");
  clearInterval(interval);
  ();
});

data forwarding

After defining the data source interface, we can start to realize the function of data forwarding to achieve the server-side preprocessing response, that is, here we can filter the data, review and other operations. So we need to accept the streaming response on the server side, after the data preprocessing and then streaming response to the client. So in this forwarding interface first we need to initiate a request to the data source interface, where we directly use thenode-fetchto initiate the request.

// packages/fetch-sse/server/modules/
import fetch from "node-fetch";
const response = await fetch("http://127.0.0.1:8800/stream")

utilizationnode-fetchIt is important to note that we are directly using thets-nodeservice is started, so it's still a good idea if theCJSsneak intoESMwould cause an exception to be thrown, so here we need to select theversion. In addition, we need to define theAbortControllerin order to terminate the request in time when the client closes the connection in thenode-fetchcenterIt's still readable.ReadableStreamThis is the way to handle the forwarding ofSSEResponse.

// packages/fetch-sse/server/modules/
const ctrl = new AbortController();
const response = await fetch("http://127.0.0.1:8800/stream", {
  signal:  as AbortSignal,
});
const readable = ;
if (!readable) return null;

("close", () => {
  ("[transfer] connection close");
  ();
  ();
});

On the server side we are notEventSourceobject to receive the data, then naturally we can only receive the data based on theSSEprotocol to parse the data on its own, and since we are parsing the data through theReadableStreamto implement the data read, then we need to stream the binary data and not parse the delimiters directly. So here we implement theStreamParserWhen receiving theUint8ArrayAfter the binary data, we first merge it into the newbufferand then iterates over the current data when it encounters\nIf you do not want to use it, it will be dispatched to theonLinemethod to process the data.

// packages/fetch-sse/server/utils/
export class StreamParser {
  private compose(data: Uint8Array) {
    const buffer = new Uint8Array( + );
    ();
    (data, );
     = buffer;
    return buffer;
  }

  public onBinary(bytes: Uint8Array) {
    const buffer = (bytes);
    const len = ;
    let start = 0;

    for (let i = 0; i < len; i++) {
      if (buffer[i] === 10) {
        ((start, i));
        start = i + 1;
      }
    }
     = (start);
  }
}

When processing theonLineWhen we do, we'll need to base it on theSSEprotocol to parse the data on a row-by-row basis, the format of the data we'll be working with will bex: xxx;With our handling of the\nis as the end node will not be passed the parameter, then at this point if our data pass length is0Then it is necessary to initiateonMessageevent, passing the event name and data to a predefined event handler. After that we can use theTextDecoderto parse it into a string, which can then be parsed according to the:to separate and parse the data now.

// packages/fetch-sse/server/utils/
export class StreamParser {
  private onLine(bytes: Uint8Array) {
    if ( === 0) {
      if ( && ) {
         =  || "";
        ( as Message);
      }
       = {};
      return;
    }
    const decoder = new TextDecoder();
    const line = (bytes);
    const [field, ...rest] = (":");
    const value = (":").trim();
    switch (field) {
      case "id":
         = value;
        break;
      case "event":
         = value;
        break;
      case "data":
         =  || "message";
         = value;
        break;
      default:
        break;
    }
  }
}

It is important to note here that theNodehit the nail on the headReadableStreamWith the browser-implementedReadableStreamThe function signatures are not the same, so here it's straightforward and convenient to use theawaitJust iterate the data, and of course use theon("data") on("end")to receive the data and end the response. We also need to bind theonMessageevent to receive the parsed data and respond to the target client.

// packages/fetch-sse/server/utils/
const parser = new StreamParser();
 = message => {
  (`event: ${}\n`);
  (`data: ${}\n\n`);
};

for await (const chunk of readable) {
  const buffer = chunk as Buffer;
  const uint = new Uint8Array(buffer);
  (uint);
}

();

requesting agent

When no data preprocessing is required, we can directly treat the request as aHTTPA long connection proxies to the request address of the target without actually implementing a response that is received and then forwarded to the client. Here we can do this directly with the help of thehttpmodule to implement forwarding, you first need thenode:urlmodule to parse the destination address, and then you can pass theto initiate the request, and when the connection is established you can directly send the data to thepipeTargetedResponseobject, of course, using the("data") + It's also possible.

// packages/fetch-sse/server/modules/
const targetUrl = new URL("http://127.0.0.1:8800/stream");
const options:  = {
  hostname: ,
  port: ,
  path: ,
  method: ,
  headers: ,
};
const proxyReq = (options, proxyRes => {
  ( || 404, );
  (res);
});

Here we naturally also need to deal with some special cases, first for thePOSTdemandingbodydata processing, we need to forward all the data from the request to the new request as well, and the same can be done here using the("data") + to realize. And for exception handling we also need to pass the response error message to the client, where the error code response is still important, and will close the request to the target. When the client's request is closed, it is also necessary to close the target's request, as well as end the response.

(proxyReq);

("error", error => {
  ("proxy error", error);
  (502, { "Content-Type": "text/plain" });
  ("Bad Gateway");
});

("close", () => {
  ("[proxy] connection close");
  ();
  ();
});

There's actually another problem here, if you use the("close")to listen for connection closures on the client side, then in thePOSTProblems will occur in the request. We can just execute the followingnodeprogram, and then you can use thecurlto initiate the request, after which it actively breaks the link, and then it can be found that the("close")would be triggered too early, instead of executing after we actively disconnect the request.

echo "
const http = require('http');
const server = ((req, res) => {
  ('close', () => {
    ('close');
  });
  ('data', (chunk) => {
    ('data:', new TextDecoder().decode(chunk));
  });
  setTimeout(() => ('end'), 10000);
});
(8001);
" | node;
curl -X POST http://127.0.0.1:8001 \
-H "Content-Type: application/json"  \
-d '{"key1":"value1", "key2":"value2"}'

In fact here in our request there is("close")("close")("close")These three events, inreqThe event will be carried by the abovebodydata, so here it is possible to use therescap (a poem)socketevent to listen to the client's connection closure. To make it easier for us to trigger the event, here we directly use thesocketevent to listen for connection closure on the client, in addition to thesocketAttributes in thenode16The attribute before is namedconnection

echo "
const http = require('http');
const server = ((req, res) => {
  ('close', () => {
    ('res close');
  });
  ('close', () => {
    ('socket close');
  });
  ('data', (chunk) => {
    ('data:', new TextDecoder().decode(chunk));
  });
  setTimeout(() => ('end'), 10000);
});
(8001);
" | node;
curl -X POST http://127.0.0.1:8001 \
-H "Content-Type: application/json"  \
-d '{"key1":"value1", "key2":"value2"}'

client (computing)

On the client side we need to use thefetchrealizationSSEBy means of thefetchYou can pass the request header and request body, and you can send thePOSTand other types of requests to avoid only being able to sendGETrequest and need to encode everything into theURLon the problem. If the connection is broken, we can also control the retry policy for theEventSource objectThe browser will silently retry a couple times for you and then stop, which is not good enough for any type of robust application. If you need to do some custom validation and processing before parsing the event source, you can also access the response object, which is useful for applying server-side programs before theAPIDesigns such as gateways are very effective.

fetch implementation

on the basis offetchimplementation is actually fairly simple, we first need to create aAbortControllerobject in order to terminate the request in time for the client to close the connection, and then it is possible to pass thefetchto initiate the request, and when the request is successful we can pass theto readReadableStream

// packages/fetch-sse/client/components/
const signal = new AbortController();
fetch("/proxy", { method: "POST", signal:  })
  .then(res => {
    onOpen(res);
    const body = ;
    if (!body) return null;
  })

For the streaming of data, as opposed to the server-side implementation of theStreamParserapproach is consistent, and earlier we mentioned that since theReadableStreamfunction signature is different, here we'll just use thePromiseis handled by the chained calls to theUint8ArrayThe data is handled in the same way as before. There's actually another interesting thing here, using theEventSourceobject in the browser console'sNetworkIt can be seen in theEventStreamof the data transfer panel, while using thefetchof data exchanges are not recorded.

// packages/fetch-sse/client/components/
const reader = ();
const parser = new StreamParser();
 = onMessage;
const process = (res: ReadableStreamReadResult<Uint8Array>) => {
  if () return null;
  ();
  reader
    .read()
    .then(process)
    .catch(() => null);
};
().then(process);

streaming interaction

Once our data transfer scheme is realized, we can implement streaming interactions on the client side. When we implement a streaming interaction with the help ofStreamParsermethod to parse the traveled data, the decoding operation is required, which is the opposite of the encoding scheme described above, and here it is only necessary to set the\\nReplace with\nThat's all it takes. Then here we set the output interactions for both speeds, and if there is too much unoutput text content, the10msto output a text, otherwise it starts with50msThe speed of the output text.

// packages/fetch-sse/client/components/
const onMessage = useMemoFn((e: Message) => {
  if ( !== "message") return null;
  setPainting(true);
  const data = ;
  const text = (/\\n/g, "\n");
  const start = ;
  const len = ;
  const delay = len - start > 50 ? 10 : 50;
  const process = () => {
    ++;
    const end = ;
    append((0, end));
    if (end < len) {
       = setTimeout(process, delay);
    }
    if (! && end >= len) {
      setPainting(false);
    }
  };
  setTimeout(process, delay);
});

Once we've parsed out the data, we need to apply it to theDOMStructurally, one thing to note here is that if we refresh the entireDOMIf the content, it will cause us to not be able to select the previously output content to copy, that is to say, we can not output the content while selecting the content. So here we need to refine the update, the simplest solution is to update by row, we can record the last rendering of the row index, the update range is the last index to the current index.

// packages/fetch-sse/client/components/
const append = (text: string) => {
  const el = ;
  if (!el) return null;
  const mdIt = MarkdownIt();
  const textHTML = (text);
  const dom = new DOMParser().parseFromString(textHTML, "text/html");
  const current = ;
  const children = ();
  for (let i = current; i < ; i++) {
    children[i] && children[i].remove();
  }
  const next = ;
  for (let i = current; i < ; i++) {
    next[i] && (next[i].cloneNode(true));
  }
   =  - 1;
};

Here there is also a scrolling interaction needs to be dealt with, when the user freely scrolls the content, we can not force the user to scroll back to the bottom of the position, so we need to record whether the user scrolls over, when the user scrolls over the time we no longer automatically scroll, if the - together withThe difference between the1If it is, then it is considered to be automatically scrolled, and it is also important to note here that thescrollTounavailablesmoothscrolling effect, which would cause ouronScrollRolling calculations are inaccurate.

const append = (text: string) => {
   && ({ top:  });
};

useEffect(() => {
  const el = ;
  if (!el) return;
   = () => {
    if ( -  -  <= 1) {
       = true;
    } else {
       = false;
    }
  };
  return () => {
     = null;
  };
}, []);

In the streaming output here, we can also realize the cursor blinking effect, which is relatively simple, we can just use theCSSanimation is implemented with pseudo-classes, and it is important to note here that if it is implemented without pseudo-classes, it will result in our previousDOMNode appending requires a bit more processing. In addition, since the processing ofMarkdownThere will actually be nesting of nodes, so the handling of nodes will require a:notto concretize the process.

// packages/fetch-sse/client/styles/
@keyframes blink {
  0% { opacity: 1; }
  50% { opacity: 0; }
  100% { opacity: 1; }
}

.textarea {
  &.painting > *:last-child:not(ol):not(ul),
  &.painting > ol:last-child > li:last-child,
  &.painting > ul:last-child > li:last-child {
    &::after {
      animation: blink 1s infinite;
      background-color: #000;
      content: '';
      display: inline-block;
      height: 1em;
      margin-top: -2px;
      vertical-align: middle;
      width: 1px;
    }
  }
}

question of the day

/WindrunnerMax/EveryDay

consultation

/Azure/fetch-event-source
/zh-CN/docs/Web/API/EventSource
/blog/2017/05/server-sent_events.html
/docs//api/#messagesocket
/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events
/questions/7348736/how-to-check-if-connection-was-aborted-in-node-js-server
/questions/76115409/why-does-node-js-express-call-request-close-on-post-request-with-data-before-r