Location>code7788 >text

Service Worker-based WebRTC LAN large file transfer capability

Popularity:679 ℃/2024-09-09 10:24:39

Service Worker-based WebRTC LAN large file transfer capability

Service Workeris a script that resides in the background of the user's browser and is able to intercept and process network requests for a rich offline experience, cache management and network efficiency optimization. Request interception is one of its key features, which is accomplished by listening to thefetchEvent.Service WorkerAll requests to the web can be captured and selectively processed, such as reading responses from the cache or modifying and redirecting requests, leading to reliable offline browsing and faster content loading.

  • Online Experience./
  • Project address./WindrunnerMax/FileTransfer

descriptive

Some time ago, I saw a question raised in the group about why, when downloading a file from an object store, the implementation carries anGitHub Pagesaddress of the object store, in theory we could just click on the connection to download the content from the object store, but surprisingly it looks like there's an intermediate link, like it needs to be connected to by theGitHub PagesIntercept and transit before downloading to the local, the link address is similar to the following content. At this point, if we click download on the download page, and then open the browser's download management function, you can find that the download address will actually become a more strange address, and this address we actually directly open in the browser will respond to the404

<! -- Download page -->
//examples/

<! -- Browser Download Manager -->
///712864/

It is obvious from the link that here is the use of theto serve as an intermediary for downloading files from theREADMEWe can see in theis based onServiceWorkerThe large file download program. So some time ago we had time to study its implementation, usually when we need to schedule file downloads, we may directly through the<a />Tabs open the target link directly in the browser to start the download, however, there are three more obvious problems with this approach.

  • If the directly opened resource is an image, video and other resources that the browser can directly parse, then the browser will not trigger the download behavior, but will directly preview the open resource in the browser, that is, the defaultContent-Dispositionvalue isinline, will not trigger a value ofattachmentof the download behavior. Of course, using the<a />tabbeddownloadcan solve this problem, however, this attribute is only available in the same sourceURLblob:cap (a poem)data:It will only come into effect under the agreement.
  • If the files we upload to the object store have the problem of renamed resources, then to prevent the files from being overwritten, we might randomly generate the resource name or put a timestamp after the resource, or even just generate the file name without the extensionHASHvalue. Then when the file is downloaded, we need to actually restore the filename back, however this process still relies on the response'sattachment; filename=or<a />tabbeddownloadattribute to rename the file.
  • If we are requesting a resource that requires checksum permissions in order to download it properly, then we can directly use the<a />tag to make a resource request is simply initiating theGETrequest, and placing the key on the link address of the request obviously doesn't do any real checksumming. Of course by issuing a temporaryTokenand returnsGETRequesting an address is certainly possible, but when it comes to more complex access control and audit trails, generating a temporary download link may not be enough to achieve the high security requirements, and similar problems are found in theEventSourceobject implementation of theSSEIt is more pronounced in the

And in our project, which happens to have this historical legacy, our resource files are stored in theOSS-Object Storage Serviceobject store, and to prevent the problem of resource renaming, the default resource policy is to not carry the file extension at all, but to generate the filename directly into theHASHvalue, and since the domain name is infrastructure-ownedCDNAccelerated domains, which cannot be configured throughCNAMEto be defined as the domain name of our site, which means that our resources are bound to have cross-domain issues, which is equivalent to touching all the limitations.

Then in this case, we are required to rename the file to the original resource name, after all, the operating system can't recognize the contents of the file without the extension, and ourCDNThe resources are non-existentContent-Dispositionresponse header and the original resource name, and the file is not a resource under the same domain name. In this case we need to implement cross-domain resource renaming to support the user's download behavior, so the solution we take here is to first use thefetchdownloads the file into memory and then passes it through thecreateObjectURLCreate it asblob:protocol resources, thus supporting the<a />tabbeddownloadProperties.

Another problem arises when downloading files in this way. Downloading all the files and keeping them in memory may result in the following problemsOOMphenomenon, it is not very clear for modern browsers that a singleTabThe memory limit for pages is instead dynamically allocated based on system resources, but anytime a large enough file is downloaded in memory, it still triggers theOOMcausing the browser page to crash. In this case, then, it's a good idea to make it easier for the browser to crash the page by setting theService Workeracts as an intermediary to intercept the download request, and in response to theHeaderadd inContent-Dispositionto support file renaming, and theStream APIThis is to stream the download behavior, thus avoiding downloading all the files into memory. To summarize, here we have solved two problems in this way.

  • The download of cross-domain resources solves the problem of renaming cross-domain resources by hijacking the request and adding the appropriate header, and in doing so, directly schedules the browserIOto enable downloads.
  • Avoid memory overflow problems byStream APIcommander-in-chief (military)fetchThe requested data is written to the file in slices as a way to do a streaming download and avoid writing the file all the way to memory.

Then in addition to downloading files in the object store, there are many application scenarios for this type of data processing, for example, when we need to download files in bulk and compress them, we can activelyfetchpostponedReadableStreamRead, andpipeto a compression-like implementation, such asThe browser scheme, againpipeuntil (a time)WritableStreamresemblanceto write to the file in real time, which allows for efficient file reading and writing without holding it all in memory.

It just so happens that earlier weImplemented LAN file transfer based on WebRTC.And through theWebRTCThe transferred files will also have to deal with the same problem of large file transfers, and because they are not inherentlyHTTPagreement, it is naturally impossible to carryContent-Dispositionand other response headers. This way our large file transfers have to be intercepted with the help of a man-in-the-middle, and at this point we intercept it by simulating theHTTPrequest to generate a virtual download link, and since it is itself a sharded transfer, we can easily generate a virtual download link with the help of theStream APIto realize the streaming download capability. So this article takes a look at theWebRTCto implement a file transfer based on theService WorkerThe relevant implementations of the large file transfer scheme in the text are all in the/WindrunnerMax/FileTransferCenter.

Stream API

Browser-implementedStream APIexist inReadableStreamWritableStreamTransformStreamThree stream types, of whichReadableStreamused to represent a readable stream.WritableStreamis used to represent writable streams, whileTransformStreamis used to represent readable and writable streams. Since in the browserStreamThe timing and mechanism of realization are not the same.ReadableStreamCompatibility withFetch APIis essentially the same, and theWritableStreamcap (a poem)TransformStreamThe compatibility is a little bit worse.

data flow

At the very beginning of the contactStream APII had trouble understanding the data flow of the entire pipeline, for the buffer and the backpressure and so on is not difficult to understand in itself, but in the actual will be theStreamWhen applying this, I found that I was not able to understand the direction of data flow for the entire stream model. In my understanding, the whole pipeline should start with theWritableStreamThe start is used to write/produce data, while subsequent pipelines should use theReadableStreamto read/consume data, while the whole connection process can be done via thepipeToLink up.

const writable = new WritableStream();
const readable = new ReadableStream();
(readable); // TypeError:  is not a function
const writer = ();
const reader = ();
// ...
("xxx");
().then(({ value, done }) => {
  (value, done);
});

Of course, this is a wrong example, for the understanding of streams we should refer to(used form a nominal expression)Streammodule tonode:fs(used form a nominal expression)createReadStreamtogether withcreateWriteStreamAs an example, it will be easier for us to understand the whole model. OurStreamThe model is based on theReadableStreamas a starting point, i.e., data production is done withownIObased read file that writes the contents to theReadableStreamin its own events, and we, as data processors, do the data processing, which in turn writes the processed data to theWritableStreamto consume, i.e., the succeeding pipeline is toWritableStreamFor the end of the line.

const fs = require("node:fs");
const path = require("node:path");

const sourceFilePath = ("./");
const destFilePath = ("./");
const readStream = (sourceFilePath, { encoding: "UTF-8" });
const writeStream = (destFilePath, { encoding: "UTF-8" });

("data", chunk => {
  (chunk);
});
("end", () => {
  ();
});

Then in the browser, ourStream APIThe same is true for theReadableStreamFor starters.Fetch API(used form a nominal expression)is a good example, where the data also starts with aIObased web requests. In the browser ourReadableStream(used form a nominal expression)APItogether withitself is still a bit different, for example in the browserReadableStream(used form a nominal expression)ReaderThere is no similaron("data", () => null)event listener, and the previous example is just to give us a better understanding of the whole stream model, where we are of course taking the browser'sAPIMainly.

After talking so much aboutStream APIWe return to the questions directed at theWebRTCPassed data implementations for similarFetchThe data transfer is done with the help of the browser's ownIOto controlReadableStreamof data production, and ourWebRTCare only transmission channels, so at the time of initial data production for the pipeline, theReadableStreamis something we need to control ourselves, so we first thought of theWritable -> Readableway, on the other hand, is designed to fit this part of the implementation. And in fact this approach actually fits better with theTransformStreammodel, whose own ability is to transform data streams, and we can similarly transform data streams with the help of theTransformStreamto read and write to the stream.

const transformStream = new TransformStream<number, number>({
  transform(chunk, controller) {
    (chunk + 1);
  },
});
const writer = ();
const reader = ();
const process = (res: { value?: number; done: boolean }) => {
  const { value, done } = res;
  (value, done);
  if (done) return;
  ().then(process);
};
().then(process);
(1);
(2);
();

Then here we can realize that forReadableStreamThe data processing, in the context of the data processing based on theWebRTCIn the data transfer implementation of theDataChannelof the data stream itself, then at this point we can pass theReadableStream(used form a nominal expression)Controllerto place data into the buffer queue as a way to write data, and subsequent data consumption can be done using theReadableStream(used form a nominal expression)Readerto achieve this, so that we can implement streaming data transfer with the help of buffer queues.

const readable = new ReadableStream<number>({
  start(controller) {
    (1);
    (2);
    ();
  },
});
const reader = ();
const process = (res: { value?: number; done: boolean }) => {
  const { value, done } = res;
  (value, done);
  if (done) return;
  ().then(process);
};
().then(process);

Back pressure issues

So here's a question we can think about, if at this point ourDataChannelof the data stream is transmitted at a very fast rate, that is, the data is constantly beingenqueueIf we consume the data very slowly, for example, if our hard disk writes slowly, then the queue of data grows, which can lead to a memory overflow. There is actually a specialized term for this problem, namelyBack PressureBack pressure issues inReadableStreamIn theto get the size of the current queue, as a way to control the rate of data production, as a way to avoid data backlogs.

const readable = new ReadableStream<number>({
  start(controller) {
    (); // 1
    (1);
    (); // 0
    (2);
    (); // -1
    ();
  }
});

As for the backpressure problem, we can simply understand that when our data production rate is greater than the data consumption rate, it leads to a backlog of data, then for theReadableStreamtogether withWritableStreamWe can get the relevant queuing strategy, in fact, we can easily understand that the so-called backpressure pressure is from the buffer queue in the un-consumed blocks, of course, we can also preset the buffer queue length is relatively large, except that this although avoided thedesiredSizefor negative values, but it does not solve the back pressure problem.

  • insofar asReadableStream, backpressure comes from blocks that have been queued but not yet read.
  • insofar asWritableStreamThe backpressure comes from blocks that have been written but not yet processed by the underlying receiver.

And in the previousReadableStreamimplementation, we can clearly see that it doesn't carry a default handling mechanism for backpressure itself, even though we can pass thedesiredSizeto determine the current pressure on the built-in queue, but we don't have very clear feedback on how fast the data is being produced, and we'd prefer to control it based on event-driven rather than something like thesetTimeoutto rotate through the inspections, and of course we can pass thepullmethod to passively control the amount of data in the queue. While theWritableStreamInstead, there is a built-in backpressure handling method, i.e., theThis method allows us to determine the pressure on the current queue, and in this way we can control the rate of data production.

(async () => {
  const writable = new WritableStream();
  const writer = ();
  await (1);
  await (1);
  await (1);
  ("written"); // written
  await ;
  await (1);
  ("written"); // Nil
})();

So in ourWebRTCIn the data transfer, the backpressure is handled conveniently by theTransformStream(used form a nominal expression)writableend to enable the writing of data, while consumption is done through thereadableend to achieve this, so that we have good control over the rate at which data is produced, and we can put theTransformStreamAfter defining thereadableend-to-endpostMessagetreat it asTransferable ObjectfloatService WorkerJust spend it in the center.

// packages/webrtc/client/worker/
export class WorkerEvent {
  public static start(fileId: string, fileName: string, fileSize: number, fileTotal: number) {
    const ts = new TransformStream();
    .(
      {
        key: MESSAGE_TYPE.TRANSFER_START,
        id: fileId,
        readable: ,
      } as MessageType,
      []
    );
  }

  public static async post(fileId: string, data: ArrayBuffer) {
    const writer = (fileId);
    if (!writer) return void 0;
    await ;
    return (new Uint8Array(data));
  }

  public static close(fileId: string) {
    ?.({
      key: MESSAGE_TYPE.TRANSFER_CLOSE,
      id: fileId,
    } as MessageType);
    const writer = (fileId);
    writer?.close();
  }
}

Fetch

existFetch API(used form a nominal expression)Responseobject, the presence of theattribute to get the response'sReadableStreamThe interface is also used to represent readable streams in the same way as the above objects. This interface allows us to read data in streams without having to read all the data into memory at once in order to process the data incrementally, for example when using thefetchrealizationSSE - Server-Sent Eventsresponse, it is possible to maintain long links in conjunction with theReadableStreamto realize the response of the data.

in the context ofFetchMethods, in contact withStream APIPreviously we may have mainly handled this by calling the()etc. to read the data, in fact these same methods will implicitly call the()to read the data. While theStream APIPrior to this, if we wanted to work with a resource such as a video, text file, etc., we had to download the entire file, wait for it to be deserialized into the proper format, and then work with all the data directly.

Therefore, in the previous researchOne of the more puzzling questions I have is that since the data we are requesting still needs to be downloaded from all of it into memory, in this case we use theIt still won't be possible to stream data to the hard disk, and there will still be browserTabpage of memory overflow problems. And after learning about theFetch API(used form a nominal expression)attribute, it becomes clear about how the entire stream is handled, and we can keep calling theread()method passes the data to theService WorkerJust schedule a download.

So the scheduling file downloads are presumably the same as described above for theWebRTCThe transmission is similar, in that we have finished hijacking the intermediary of the data requestService WorkerAfter that, we just need to initiate the main thread portion of thefetchrequest, and then responds to the data with theIframeinitiates the hijacked download request, which is then passed through the()The slice reads the data and keeps writing it to theTransformStream(used form a nominal expression)WriterWe can also implement some effects such as download progress and so on.

const fileId = "xxxxxx";
const worker = await ("./");
const channel = new MessageChannel();
({ type: "INIT_CHANNEL" }, [channel.port2]);
const ts = new TransformStream();
channel.(
  { key: "TRANSFER_START", id: fileId, readable: , },
  []
);
 const src = `/${fileId}` + `?X-File-Id=${fileId}` +
      `&X-File-Size=42373` + `&X-File-Total=1` + `&X-File-Name=`;
const iframe = ("iframe");
 = true;
 = src;
 = fileId;
(iframe);
const writer = ();
fetch("./").then(res => {
  const reader = ();
  const process = (res) => {
    const { value, done } = res;
    if (done) {
      ();
      return;
    }
    (value);
    ().then(process);
  };
  ().then(process);
});

Service Worker

Service WorkerAs an independent thread running in the background, it has the ability to act as a web request broker, intercepting, modifying, or even replacing web requests and responses altogether, enabling advanced features such as cache management, performance enhancement, offline access, and fine-grained control and optimization of requests. This is where we can leverage theService WorkerAdd to our request responseContent-Dispositionand other response headers as a way to trigger the browser's ability to download with the help of the browser'sIOEnables downloading of large files.

Environment Setup

After passingService WorkerBefore implementing a man-in-the-middle to intercept web requests, we can take a look at theService Workerhit the nail on the headTSThe environment andWebpackconfiguration. We usuallyTSdevelopment environmentlibstapledomesnextand as a result ofWorkerThe global variables and the methods held in it are not the same, so its ownlibThe environment needs to be changed toWebWorkerESNext, and if the module is not actively introduced or exported.TSwould consider it to be asuse, so we should default to exporting an empty object even if we don't have a default import and export, and take care to put it in thetsconfigcenterincludeRelated modules.

// packages/webrtc/client/worker/
/// <reference lib="esnext" />
/// <reference lib="webworker" />
declare let self: ServiceWorkerGlobalScope;
export {};

Service Workeritself as an independentJsfile, it must be run under the same-origin policy, where it needs to be configured as a separate route loading path if you need to focus on the routing environment of the deployment environment. For our static resources themselves we need to configure our implementation of the separateWorkerconfigured as an entry file into the packaging tool, and to facilitate the handling of theSWWhether or not it is registered and cached for updates, we usually fix it to a definite filename as a way to ensure its uniqueness in the cache.

// packages/webrtc/
/**
 * @type {import("@rspack/cli").Configuration}
 */
const Worker = {
  context: __dirname,
  entry: {
    worker: "./client/worker/",
  },
  devtool: isDev ? "source-map" : false,
  output: {
    clean: true,
    filename: "[name].js",
    path: (__dirname, "build/static"),
  },
};

 = [/** ... */, Worker];

existService WorkerIn itsinstallincidents andactivateevent handles the logic for its installation and activation separately, usually the newService WorkerAfter the installation is complete it will go into a waiting phase until the oldService Workeris completely uninstalled and then activated, so we can justonInstallhourskipWaitinginonActiveevent in theTakes over all client pages immediately upon activation, no need to wait for page refreshes, which is great for debugging ourSWIt is very useful when it comes to the

// packages/webrtc/client/worker/
("install", () => {
  ();
  ("Service Worker Installed");
});

("activate", event => {
  (());
  ("Service Worker Activate");
});

Request Interception

Next we're going to look at theService WorkerThe ability to intercept web requests in theMDNThere is a need forFetch EventThe detailed descriptions of theFetch Eventis only able to be used inService WorkerThe same is used in the And here our interception of requests and responses is very simple, we just need to get the relevant information from the address of the request, theidnamesizetotaland then through theReadableStreamtectonic (geology)ResponseAs a response will suffice, the main thing to focus on here is theContent-Dispositiontogether withContent-LengthTwo response headers, which is our key configuration for triggering the download.

// packages/webrtc/client/worker/
 = event => {
  const url = new URL();
  const search = ;
  const fileId = (HEADER_KEY.FILE_ID);
  const fileName = (HEADER_KEY.FILE_NAME);
  const fileSize = (HEADER_KEY.FILE_SIZE);
  const fileTotal = (HEADER_KEY.FILE_TOTAL);
  if (!fileId || !fileName || !fileSize || !fileTotal) {
    return void 0;
  }
  const transfer = (fileId);
  if (!transfer) {
    return (new Response(null, { status: 404 }));
  }
  const [readable] = transfer;
  const newFileName = encodeURIComponent(fileName).replace(/['()]/g, escape).replace(/\*/g, "%2A");
  const responseHeader = new Headers({
    [HEADER_KEY.FILE_ID]: fileId,
    [HEADER_KEY.FILE_SIZE]: fileSize,
    [HEADER_KEY.FILE_NAME]: newFileName,
    "Content-Type": "application/octet-stream; charset=utf-8",
    "Content-Security-Policy": "default-src 'none'",
    "X-Content-Security-Policy": "default-src 'none'",
    "X-WebKit-CSP": "default-src 'none'",
    "X-XSS-Protection": "1; mode=block",
    "Cross-Origin-Embedder-Policy": "require-corp",
    "Content-Disposition": "attachment; filename*=UTF-8''" + newFileName,
    "Content-Length": fileSize,
  });
  const response = new Response(readable, {
    headers: responseHeader,
  });
  return (response);
}

Another interesting thing here is that in the above implementation we can see a check for getting the relevant information from the request address, and if the check fails it returns theundefined. This is actually a pretty common interceptionCase, i.e., requests that don't qualify we just let them go, and the question I've always wondered about before is that any request that goes through theService WorkerIntercepted requests are added to ourNetworkThe request with the gear symbol appears in the console panel, that is, from theService WorkerThe requests initiated in this will look very confusing when debugging.

In fact, this would simply be a problem with our use, as is evident from the alert message that this is from theService WorkerThe request was initiated in theService Workerand the main reason for triggering this request entry is that we called thefetchmethod, while either returning directly to thefetchor through(fetch)will all trigger this request entry, so when we intercept the request, if the condition is not met then we directly return theundefinedReady to go.

// will initiate the request again
return fetch(); return (fetch())
return (fetch()).

// Will not initiate the request again
return ;

Then we need to move on to the question of how the download should be triggered, where theService Workermerely intercepts the request, while theWebRTCThe transmission does not actually initiate anyHTTPrequest, so we need to trigger this request proactively, thanks to theService Workercan intercept almost any request, including static resources, network requests, etc., so we can directly leverage the creation of theIframeThe download is accomplished by matching an agreed-upon field name, which in this case is actually the rather strange link address we mentioned at the beginning.

// packages/webrtc/client/worker/
const src =
  `/${fileId}` +
  `?${HEADER_KEY.FILE_ID}=${fileId}` +
  `&${HEADER_KEY.FILE_SIZE}=${fileSize}` +
  `&${HEADER_KEY.FILE_TOTAL}=${fileTotal}` +
  `&${HEADER_KEY.FILE_NAME}=${fileName}`;
const iframe = ("iframe");
 = true;
 = src;
 = fileId;
(iframe);

Here we may wonder the question why our request information is coming from theURLinstead of directly constructing the associatedHeaderinformation in theService WorkerIt is sufficient to forward the agreed upon response headers directly, i.e. why use theIframerather thanfetchrequest and carry the request header to enable downloads. This is actually because even if there is a"Content-Disposition": "attachment; xxx"response header.fetchRequests also do not support the ability to initiate downloads directly.

I actually did a little more research hereThe realization that it is equally interesting that theThe runtime environment itself is aIframeassume (office)So let's call it what it is.So at this point, our main thread is called. At this point we are in theBThe name of the program is registered in the(used form a nominal expression)Service WorkerAfter that, we pass thepython3 -m 9000etc. as a service resource to openAaddress of the new ware port9001show (a ticket)Baddress to ensure that it exists across domains.

<!--  -->
<iframe src="http://localhost:9001/" hidden></iframe>

<!--  -->
<script>
    ("./", { scope: "./" });
</script>
// 
 = (e) => {
  ();
  if (("ping")) {
    (new Response("pong"));
  }
};

At this point we are in theCreate a newiframeaddresslocalhost:9001/ping, that is, similar to the one used in theThe kind of temporary download address that is created, we can find out that this address can be listened to surprisingly, i.e.Service WorkerThe request could be intercepted, at the time I thought this was amazing because it was theoretically impossible to intercept in the case of a different domain name, and I had thought that I had found somethingiframefeature, and finally realized that we were accessing the9001source address, that is, the equivalent is still in thesource, if at this point we are accessing a resource under the9000resources will not have this effect anymore.

const iframe = ("iframe");
 = true;
 = "http://localhost:9001/ping";
(iframe);

In addition, if we actually open the address bar of our browser directly to thehttp://localhost:9001/pingIt is also equally possible to getpongresponse, which means that theService WorkerThe scope of the interception is in the registeredscopeIf it is necessary to do so, then we can actually use it based on theSWto realize the offlinePWAapplication without relying on the routing of the server response as well as the interface. In addition, this effect is not visible in ourWebRTCpracticalSWis also present, and when we click on the download link again we can't get a response, it's because we've checked that thetransferDoesn't exist. It's a direct response.404

const transfer = (fileId);
if (!transfer) {
  return (new Response(null, { status: 404 }));
}

data communication

Getting back to the point, next we need to implement the same function as theService WorkerThe communication scheme is up, and the implementation here is a bit more conventional. First we have to register theService WorkerIn the sameScopeOnly one can be registered underService WorkerIf you register more than oneService WorkerThen the post-registrationService Workerwill overwrite the first registeredService WorkerOf course, this problem doesn't exist.WebWorkerin. Here we make use of thegetRegistrationtogether withregisterrespectively, to get the currently activeService WorkerAs well as registering for a newService Worker

// packages/webrtc/client/worker/
if (!) {
  ("Service Worker Not Supported");
  return (null);
}
try {
  const serviceWorker = await ("./");
  if (serviceWorker) {
     = serviceWorker;
    return (serviceWorker);
  }
  const worker = await (
    .PUBLIC_PATH + "?" + .RANDOM_ID,
    { scope: "./" }
  );
   = worker;
  return worker;
} catch (error) {
  ("Service Worker Register Error", error);
  return (null);
}

collaboration withService WorkerFor data communication, we can leverageMessageChannelto realize.MessageChannelis a bi-directional communication channel that can be used in two differentContextPassing messages in, for example, the main thread with theWorkerthreads to communicate data with each other. All we need to do is to create in the main thread aMessageChanneland then turn itport2ports are connected viapostMessagepass on toService Workerbut (not)Service WorkerThen you can pass the[0]Get thisport2We can then use these twoportDirect communication now.

Perhaps we'll ponder the question of why we can put theport2floatService Workerin which, theoretically, ourpostMessageYou can only pass serializableStructured Cloneobjects, such as strings, numbers, and other data types, while theport2itself exists as a non-serializable object. So here's where theTransferable objectsThe concept of a transferable object is an object that has its own resources that can be transferred from one context to another, ensuring that the resources are only available in one context at a time, and that after the transfer the original object is no longer available, it no longer points to the transferred resource, and any attempt to read or write to the object will throw an exception.

// packages/webrtc/client/worker/
if (!) {
   = new MessageChannel();
  . = event => {
    ("WorkerEvent", );
  };
  ?.active?.postMessage({ type: MESSAGE_TYPE.INIT_CHANNEL }, [
    .port2,
  ]);
}

Because here we don't need to receive data fromService Workermessage, so here we have the same message for theport1The received message is simply printed. While initializing theCHANNELThe time when we willport2is placed as a transferable object in the second parameter, as a way to add a new parameter in theService WorkerYou can receive thisport2, since all of our future messaging is done by theMessageChannelproceeds, so here's theonmessageThe role is simply to receiveport2Object Port.

// packages/webrtc/client/worker/
 = event => {
  const port = [0];
  if (!port) return void 0;
};

Then immediately after that we need to use theTransformStreamPerforms data reads and writes now, due to theTransformStreamitself is also a transferable object, so we can define it directly in the main thread, and then, when initializing the file download, set thereadableend-to-endService Workerin the downloadedReadableStreaminstance constructionResponseobject. Then the next step in the main thread is to create theiframeAfter triggering the download behavior, we can then add theFetch Eventchoose frommapretrievereadableUp.

// packages/webrtc/client/worker/
const ts = new TransformStream();
.(
  {
    key: MESSAGE_TYPE.TRANSFER_START,
    id: fileId,
    readable: ,
  } as MessageType,
  []
);
(fileId, ());
// tectonic (geology) iframe Trigger download behavior
// ...

// packages/webrtc/client/worker/
 = event => {
  const payload = as MessageType;
  if (!payload) return void 0;
  if ( === MESSAGE_TYPE.TRANSFER_START) {
    const { id, readable } = payload;
    (id, [readable]);
  }
};
// 在Trigger download behavior后 through (a gap) map retrieve readable
// ...

In the main thread, we are concerned with the writing of content and the built-in backpressure control due to theTransformStreamWith its own internal implementation of queues as well as backpressure control, we don't need to be too concerned about the problems caused by data production, since in the previous implementation of ourWebRTCThe feedback link for the download is well-established, and we only need to resort to theawaitJust control the write speed. The interesting thing here is that even thoughTransformStream(used form a nominal expression)readabletogether withwritableBoth ends are now running in two contexts, which are still capable of reading and writing data as well as backpressure control.

// packages/webrtc/client/worker/
const writer = (fileId);
if (!writer) return void 0;
// perceptual BackPressure Need to be proactive await ready
await ;
return (new Uint8Array(data));

Then in the number of data blocks i.e.totalOnce the last block of the transfer is complete, we need to recycle the entire transfer behavior. The first step is toTransformStream(used form a nominal expression)writableend needs to be closed, thisWritermust be actively scheduled to close the method, otherwise the browser can not sense the completion of the download, will always be in the state of waiting for the completion of the download, the second is that we need to create theiframethrough (a gap)bodyRecycling on theService WorkerWe also need to integrate themapThe data in it is cleaned up to avoid problems such as previous links still being able to respond.

// packages/webrtc/client/worker/
const iframe = (fileId);
iframe && ();
?.({
  key: MESSAGE_TYPE.TRANSFER_CLOSE,
  id: fileId,
} as MessageType);
const writer = (fileId);
writer?.close();
(fileId);

// packages/webrtc/client/worker/
 = event => {
  const payload =  as MessageType;
  if (!payload) return void 0;
  if ( === MESSAGE_TYPE.TRANSFER_CLOSE) {
    const { id } = payload;
    (id);
  }
};

Compatibility Considerations

In modern browsersService WorkerFetch APIStream APIAll of them are already relatively well supported, and here we use the relatively recent features ofTransformStreamThe compatibility is also good in2022Browser versions released after the year are largely supported, however, if weMDN(used form a nominal expression)TransformStreamA closer look in compatibility then reveals thatTransformStreamact astransferableexistSafariMedium has not been supported so far.

So what is the problem that will be created here, we can note that in the previousTRANSFER_STARTThe time when we are placing theTransformStream(used form a nominal expression)readableend-to-endTransferable ObjectfloatService Workerin it, then at this point due to theSafaridoes not support this behavior, ourReadableStreamNaturally, it can't be passed on toService Workerand therefore our subsequent downloads will not continue, so if you need to be compatible with theSafaricase, we need to deal with this issue.

The reason for this problem is that we can't putReadableStreamTransfer of ownership toService Workerin the program, so a simpler approach that can be thought of is to directly add a new program to theService WorkerMedium DefinitionReadableStreamThat's all it takes. That is, when the transfer starts, we instantiate theReadableStreamand save its controller object, when the data is passed, we directly pass the data blockenqueueto the buffer queue, and at the end of the transfer, we directly call the()method will suffice, and thisreadableobject we can then use directly as a request interceptor for theResponseThe response is downloadable content.

let controller: ReadableStreamDefaultController | null = null;
const readable = new ReadableStream({
  start(ctr) {
    controller = ctr;
  },
  cancel(reason) {
    ("ReadableStream Aborted", reason);
  },
});
(fileId, [readable, controller!, Number(fileTotal)]);

 = event => {
  const data =  as BufferType;
  destructureChunk(data).then(({ id, series, data }) => {
    const stream = (id);
    if (!stream) return void 0;
    const [, controller, size] = stream;
    (new Uint8Array(data));
    if (series === size - 1) {
      ();
      (id);
    }
  });
};

Then here we realize the backpressure problem we talked about earlier, since here we don't have any feedback mechanism for backpressure, but instead we just take all the data blocks from the main thread andenqueueuntil (a time)ReadableStreamin the data transfer rate than the browser-controlled downloads.IOWith fast speeds, it's easy to get a backlog of data. Therefore we need to think of a way to realize the control of back pressure, then we can think of the following way relatively easily.

  • When instantiating theReadableStreamobject with the help of theCountQueuingStrategyCreating a large enough buffer is possible because we already know the size of the file and the number of chunks during the transfer itself. Of course, we probably don't need to create a buffer equal to the number of chunks; we can divide it by the number of chunks.2Rounding or logarithmic is fine, after all, the download is also consuming by writing to the hard disk.
  • When instantiating theReadableStreamtime-dependentunderlyingSourceobjects, except for thestartIn addition to the methods there are actuallypullmethod will be called repeatedly when the stream's internal data block queue is not full until it reaches its high watermark, we can then control the frequency of the stream by using this method call as an event-driven mechanism, it should be noted that it will only be called repeatedly if it is queued up for at least one data block, and it will only be called repeatedly if it is queued up for at least one data block before it reaches its high watermark.pullFunctions are not called repeatedly if they are not actually in the queue block at the time of the function call.

Let's first look at the problem of allocating a sufficiently large buffer queue here, and if you think about it in depth, even if a sufficiently large buffer is allocated, and we don't actually implement any feedback mechanism to control the production aspects of slowing down the data, then this buffer, even if it is large enough, doesn't solve our memory overflow problem, although even if a sufficiently large buffer is allocated at instantiation, it doesn't immediately allocate such a large amount of memory. At this point, then, even if we don't allocate that large a buffer, the queue implemented in default mode is exactly the same, except that its internaldesiredSizewill become a relatively large negative value, and the data is not actually lost, because at this point the browser's streaming implementation stores the data in memory until it is read by the consumer.

So let's take a look at the second implementation, viapullmethod we do get the information from theReadableStreamof buffer queue feedback, then we can simply implement a way to control the flow, considering that we'll have two states, production over consumption and consumption over production, then we can't simply use thepullWe should implement another buffer queue internally, and our event-driven data entry should have two parts, respectively, when the buffer queue enters the data, we need to check if the data pulled last time was not successful but waiting, then we need to schedule the last time thepullnot completed by the end of the yearPromise, which is the case where consumption is greater than production, and there is also the event ofpullWhen you check the buffer queue directly to see if there is any data, if there is, then directly put data, that is, the production is greater than the consumption of the case.

const pending = new WeakMap<ReadableStream, (stream: string) => void>();
const queue = ["1", "2", "3", "4"];
const strategy = new CountQueuingStrategy({ highWaterMark: 3 });

const underlyingSource: UnderlyingDefaultSource<string> = {
  pull(controller) {
    if (!) {
      ("Pull Pending");
      return new Promise<void>(resolve => {
        const handler = (stream: string) => {
          (stream);
          (readable);
          ("Pull Restore", stream);
          resolve();
        };
        (readable, handler);
      });
    }
    const next = ();
    (next);
    ("Pull", next);
    return void 0;
  },
};

const readable = new ReadableStream<string>(underlyingSource, strategy);
const write = (stream: string) => {
  if ((readable)) {
    ("Write Pending Pull", stream);
    (readable)!(stream);
  } else {
    ("Write Queue", stream);
    (stream);
  }
};

// Make the read task post let sb do sth pull commander-in-chief (military) Readable The buffer queue is full.
setTimeout(async () => {
  // this time queue Data still exists in the queue Production is greater than consumption
  const reader = ();
  ("Read Twice");
  // post-readout queue The data in the queue has been read. Consumption equals production
  ("Read", await ());
  // post-readout queue The queue is empty. Readable The buffer queue is not full.
  // beyond Readable Still launching pull event Consumption over production
  ("Read", await ());
  ("Write Twice");
  // Write to the pending pull mandates Consumption equals production
  write("5");
  // write queue formation Production is greater than consumption
  write("6");
}, 100);

// Pull 1
// Pull 2
// Pull 3
// Read Twice
// Pull 4
// Read {value: '1', done: false}
// Pull Pending
// Read {value: '2', done: false}
// Write Twice
// Write Pending Pull 5
// Pull Restore 5
// Write Queue 6

It looks like we've achieved a pretty awesome solution based on thepullbuffer queue control, but a closer look reveals that we seem to have overlooked something; are we simply putting theReadableStreamThe built-in buffer queue is raised to the outside, and we actually still face memory pressure, except that here the data backlog has been moved away from theReadableStreamAfter moving to our own defined array, we don't seem to have solved the problem at all.

So let's think about what the problem is when we use theTransformStreamThe time when our back pressure control seems to be merelyawait So what exactly does it mean here, we can obviously think out here is the carrier feedback mechanism, that is to say, when it thinks that the internal queue is under pressure, it will actively block the producer's data production, and our implementation does not seem to be from theService Workerto the feedback mechanism in the main thread, which is why we don't have a way to deal with backpressure.

So let's look at it a little more essentially, the way we communicate ispostMessage, so what is the problem here, or if we want to use the main thread with theawaitWhat we lack if we control the backpressure directly in the same way as we do is obviously because we have no way to get a response to the post-transmission event, so here because thepostMessageIt's one-way communication. There's no way we can do it.postMessage().then()Such an operation, even as we can do it in thepostMessageimmediately afterwardreadypendingPromiseThe response data is waited for by theresolve, from which a similar operation can be done.

This is not a complicated operation, so could we make it more generalized, similar to thefetchimplementation, when we initiate a request/push, we can leverage thePromisefor a certain period of time or even keep waiting for its corresponding response, and since ourpostMessageis a one-way data transfer, we would need to add the level of the dataidflag so that we can know exactly what the current response should beresolveclosest connectionPromise

With this in mind, we need to deal with the transmission of the data, i.e. since we need to append identifying information to the original data is not an easy task, in thepostMessageIf it's a string we can just construct another layer of objects, however if it's a string we can use theArrayBufferdata, we need to manipulate its ownBuffer, which is obviously a bit fiddly. So I wanted some easy way to serialize it so that it could then be transferred as a string, and here I considered theBASE64Uint8ArrayUint32Arrayof the serialization method.

Let's take the simplest8bytes, for example, and separately calculate the serializedBASE64Uint8ArrayUint32ArrayVolume Problem. If every bit of our data at this point is0words, respectively, the encoding results are computed asAAAAAAAAAAA=[0,0,0,0,0,0,0,0][0,0]Occupied12Characters,17Characters,5The volume of the character.

const buffer = new ArrayBuffer(8);
const input = new Uint8Array(buffer);
(0);

const binaryStr = (null, input);
("BASE64", btoa(binaryStr)) ; // AAAAAAAAAAA=
const uint8Array = new Uint8Array(input);
("Uint8Array", uint8Array); // Uint8Array(8) [0, 0, 0, 0, 0, 0, 0, 0]
const uint32Array = new Uint32Array();
("Uint32Array", uint32Array); // Uint32Array(2) [0, 0]

In the result above we appear to beUint32Arrayhas the best serialization results, however this is the case when all of the bits we described above are populated with the0case, however, it is certainly not so ideal in actual transmission, so let's take the counterexample and fill it all up with the1to test the effect. The results become different at this point, with the encoding calculated separately as//////////8=[255,255,255,255,255,255,255,255][4294967295,4294967295]Occupied12Characters,33Characters,23The volume of the character.

const buffer = new ArrayBuffer(8);
const input = new Uint8Array(buffer);
(0);

const binaryStr = (null, input);
("BASE64", btoa(binaryStr)) ; // //////////8=
const uint8Array = new Uint8Array(input);
("Uint8Array", uint8Array); // Uint8Array(8) [255, 255, 255, 255, 255, 255, 255, 255]
const uint32Array = new Uint32Array();
("Uint32Array", uint32Array); // Uint32Array(2) [4294967295, 4294967295]

So it looks like it's stillBASE64The serialization result is more robust because of its inherently bitwise encoding method, which will encode every6 bitsCode Total64Fetch the characters in the array by index, so that it becomes every3bytes i.e.24 bitswould be encoded as4characters into32 bitsAnd at this point we have8Bytes that is64 bitsIt is not possible to be24 bitsis completely divisible, then at this point we first deal with the pre6bytes, if all bits are0If so, then the former8characters would all beAAnd at this point we're still left with16 bitsThen we'll fill in the8 bitsRound it up to24 bitsand then encoded as4characters (the last6 bitsleave it (to sb)=filled), so the end result is that the12Characters.

However here I realized that I was overthinking it, and that we don't actually need to think about serialization coding in ourRTC DataChannelIt's true that it has to be a plain string orArrayBufferand other data that can't be transferred directly to the object, but in thepostMessageThe data we can pass in theThe Structured Clone Algorithmalgorithmically controlled, and theArrayBufferobjects are also listed, and without the aid of thetransfercapability to implement the ownership issue, which will actually execute the built-in serialization methods. In my actual testsChromeFirefoxSafariAll support this direct data transfer, which after all is done in the same browser, and its data transfer can be a bit more relaxed.

<!--  -->
 <script>
    ("./", { scope: "./" }).then(res => {
         = res;
    })
</script>
//
 = (event) => {
  ("Message", event);
   = event;
};

// Console execution heed SW The data response as well as the value
const buffer = new ArrayBuffer(8);
const input = new Uint8Array(buffer);
(255);
({ id: "test", buffer })

So what do we know about the need to start withService WorkerThe data implementation of the response is much simpler; after all, we just need to treat it as a normal object now, and we don't need to think about any serialization issues. At this point we take advantage of thePromisecharacteristic, when receiving thepostMessageresponse from the global store to look up the currentidcorrespondingresolveWe can also add a timeout mechanism to avoid the need for a timeout mechanism, so that we can easily provide backpressure feedback.resolveof the backlog.

// analog (device, as opposed digital) onMessage methodologies
let onMainMessage: ((event: { id: string; payload: string }) => void) | null = null;
let onWorkerMessage: ((event: { id: string; payload: string }) => void) | null = null;

// analog (device, as opposed digital) postMessage methodologies
const postToWorker = (id: string, payload: string) => {
  onWorkerMessage?.({ id, payload });
};
const postToMain = (id: string, payload: string) => {
  onMainMessage?.({ id, payload });
};

// Worker
(() => {
  onWorkerMessage = ({ id, payload }) => {
    ("Worker Receive", id, payload);
    setTimeout(() => {
      postToMain(id, "pong");
    }, 1000);
  };
})();

// Main
(() => {
  const map = new Map<string, (value: { id: string; payload: string }) => void>();
  onMainMessage = ({ id, payload }) => {
    const resolve = (id);
    resolve?.({ id, payload });
    (id);
  };
  const post = (payload: string) => {
    const id = ().toString(36).slice(2);
    return new Promise<{ id: string; payload: string }>(resolve => {
      (id, resolve);
      postToWorker(id, payload);
    });
  };
  post("ping").then(res => {
    ("Main Receive", , );
  });
})();

question of the day

/WindrunnerMax/EveryDay

consultation

/post/6844904029244358670
/jimmywarting/
/jimmywarting/native-file-system-adapter
/zh-CN/docs/Web/API/FetchEvent
/docs/latest/api/#types-of-streams
/en-US/docs/Web/API/TransformStream
/zh/oss/user-guide/map-custom-domain-names-5
/zh-CN/docs/Web/HTML/Element/a#download
/zh-CN/docs/Web/HTTP/Headers/Content-Disposition
/en-US/docs/Web/API/Streams_API/Concepts#backpressure
/en-US/docs/Web/API/Web_Workers_API/Transferable_objects
/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm