Service Worker-based WebRTC LAN large file transfer capability
Service Worker
is a script that resides in the background of the user's browser and is able to intercept and process network requests for a rich offline experience, cache management and network efficiency optimization. Request interception is one of its key features, which is accomplished by listening to thefetch
Event.Service Worker
All requests to the web can be captured and selectively processed, such as reading responses from the cache or modifying and redirecting requests, leading to reliable offline browsing and faster content loading.
- Online Experience./
- Project address./WindrunnerMax/FileTransfer
descriptive
Some time ago, I saw a question raised in the group about why, when downloading a file from an object store, the implementation carries anGitHub Pages
address of the object store, in theory we could just click on the connection to download the content from the object store, but surprisingly it looks like there's an intermediate link, like it needs to be connected to by theGitHub Pages
Intercept and transit before downloading to the local, the link address is similar to the following content. At this point, if we click download on the download page, and then open the browser's download management function, you can find that the download address will actually become a more strange address, and this address we actually directly open in the browser will respond to the404
。
<! -- Download page -->
//examples/
<! -- Browser Download Manager -->
///712864/
It is obvious from the link that here is the use of theto serve as an intermediary for downloading files from the
README
We can see in theis based on
ServiceWorker
The large file download program. So some time ago we had time to study its implementation, usually when we need to schedule file downloads, we may directly through the<a />
Tabs open the target link directly in the browser to start the download, however, there are three more obvious problems with this approach.
- If the directly opened resource is an image, video and other resources that the browser can directly parse, then the browser will not trigger the download behavior, but will directly preview the open resource in the browser, that is, the default
Content-Disposition
value isinline
, will not trigger a value ofattachment
of the download behavior. Of course, using the<a />
tabbeddownload
can solve this problem, however, this attribute is only available in the same sourceURL
、blob:
cap (a poem)data:
It will only come into effect under the agreement. - If the files we upload to the object store have the problem of renamed resources, then to prevent the files from being overwritten, we might randomly generate the resource name or put a timestamp after the resource, or even just generate the file name without the extension
HASH
value. Then when the file is downloaded, we need to actually restore the filename back, however this process still relies on the response'sattachment; filename=
or<a />
tabbeddownload
attribute to rename the file. - If we are requesting a resource that requires checksum permissions in order to download it properly, then we can directly use the
<a />
tag to make a resource request is simply initiating theGET
request, and placing the key on the link address of the request obviously doesn't do any real checksumming. Of course by issuing a temporaryToken
and returnsGET
Requesting an address is certainly possible, but when it comes to more complex access control and audit trails, generating a temporary download link may not be enough to achieve the high security requirements, and similar problems are found in theEventSource
object implementation of theSSE
It is more pronounced in the
And in our project, which happens to have this historical legacy, our resource files are stored in theOSS-Object Storage Service
object store, and to prevent the problem of resource renaming, the default resource policy is to not carry the file extension at all, but to generate the filename directly into theHASH
value, and since the domain name is infrastructure-ownedCDN
Accelerated domains, which cannot be configured throughCNAME
to be defined as the domain name of our site, which means that our resources are bound to have cross-domain issues, which is equivalent to touching all the limitations.
Then in this case, we are required to rename the file to the original resource name, after all, the operating system can't recognize the contents of the file without the extension, and ourCDN
The resources are non-existentContent-Disposition
response header and the original resource name, and the file is not a resource under the same domain name. In this case we need to implement cross-domain resource renaming to support the user's download behavior, so the solution we take here is to first use thefetch
downloads the file into memory and then passes it through thecreateObjectURL
Create it asblob:
protocol resources, thus supporting the<a />
tabbeddownload
Properties.
Another problem arises when downloading files in this way. Downloading all the files and keeping them in memory may result in the following problemsOOM
phenomenon, it is not very clear for modern browsers that a singleTab
The memory limit for pages is instead dynamically allocated based on system resources, but anytime a large enough file is downloaded in memory, it still triggers theOOM
causing the browser page to crash. In this case, then, it's a good idea to make it easier for the browser to crash the page by setting theService Worker
acts as an intermediary to intercept the download request, and in response to theHeader
add inContent-Disposition
to support file renaming, and theStream API
This is to stream the download behavior, thus avoiding downloading all the files into memory. To summarize, here we have solved two problems in this way.
- The download of cross-domain resources solves the problem of renaming cross-domain resources by hijacking the request and adding the appropriate header, and in doing so, directly schedules the browser
IO
to enable downloads. - Avoid memory overflow problems by
Stream API
commander-in-chief (military)fetch
The requested data is written to the file in slices as a way to do a streaming download and avoid writing the file all the way to memory.
Then in addition to downloading files in the object store, there are many application scenarios for this type of data processing, for example, when we need to download files in bulk and compress them, we can activelyfetch
postponedReadableStream
Read, andpipe
to a compression-like implementation, such asThe browser scheme, again
pipe
until (a time)WritableStream
resemblanceto write to the file in real time, which allows for efficient file reading and writing without holding it all in memory.
It just so happens that earlier weImplemented LAN file transfer based on WebRTC.And through theWebRTC
The transferred files will also have to deal with the same problem of large file transfers, and because they are not inherentlyHTTP
agreement, it is naturally impossible to carryContent-Disposition
and other response headers. This way our large file transfers have to be intercepted with the help of a man-in-the-middle, and at this point we intercept it by simulating theHTTP
request to generate a virtual download link, and since it is itself a sharded transfer, we can easily generate a virtual download link with the help of theStream API
to realize the streaming download capability. So this article takes a look at theWebRTC
to implement a file transfer based on theService Worker
The relevant implementations of the large file transfer scheme in the text are all in the/WindrunnerMax/FileTransfer
Center.
Stream API
Browser-implementedStream API
exist inReadableStream
、WritableStream
、TransformStream
Three stream types, of whichReadableStream
used to represent a readable stream.WritableStream
is used to represent writable streams, whileTransformStream
is used to represent readable and writable streams. Since in the browserStream
The timing and mechanism of realization are not the same.ReadableStream
Compatibility withFetch API
is essentially the same, and theWritableStream
cap (a poem)TransformStream
The compatibility is a little bit worse.
data flow
At the very beginning of the contactStream API
I had trouble understanding the data flow of the entire pipeline, for the buffer and the backpressure and so on is not difficult to understand in itself, but in the actual will be theStream
When applying this, I found that I was not able to understand the direction of data flow for the entire stream model. In my understanding, the whole pipeline should start with theWritableStream
The start is used to write/produce data, while subsequent pipelines should use theReadableStream
to read/consume data, while the whole connection process can be done via thepipeTo
Link up.
const writable = new WritableStream();
const readable = new ReadableStream();
(readable); // TypeError: is not a function
const writer = ();
const reader = ();
// ...
("xxx");
().then(({ value, done }) => {
(value, done);
});
Of course, this is a wrong example, for the understanding of streams we should refer to(used form a nominal expression)
Stream
module tonode:fs
(used form a nominal expression)createReadStream
together withcreateWriteStream
As an example, it will be easier for us to understand the whole model. OurStream
The model is based on theReadableStream
as a starting point, i.e., data production is done withown
IO
based read file that writes the contents to theReadableStream
in its own events, and we, as data processors, do the data processing, which in turn writes the processed data to theWritableStream
to consume, i.e., the succeeding pipeline is toWritableStream
For the end of the line.
const fs = require("node:fs");
const path = require("node:path");
const sourceFilePath = ("./");
const destFilePath = ("./");
const readStream = (sourceFilePath, { encoding: "UTF-8" });
const writeStream = (destFilePath, { encoding: "UTF-8" });
("data", chunk => {
(chunk);
});
("end", () => {
();
});
Then in the browser, ourStream API
The same is true for theReadableStream
For starters.Fetch API
(used form a nominal expression)is a good example, where the data also starts with a
IO
based web requests. In the browser ourReadableStream
(used form a nominal expression)API
together withitself is still a bit different, for example in the browser
ReadableStream
(used form a nominal expression)Reader
There is no similaron("data", () => null)
event listener, and the previous example is just to give us a better understanding of the whole stream model, where we are of course taking the browser'sAPI
Mainly.
After talking so much aboutStream API
We return to the questions directed at theWebRTC
Passed data implementations for similarFetch
The data transfer is done with the help of the browser's ownIO
to controlReadableStream
of data production, and ourWebRTC
are only transmission channels, so at the time of initial data production for the pipeline, theReadableStream
is something we need to control ourselves, so we first thought of theWritable -> Readable
way, on the other hand, is designed to fit this part of the implementation. And in fact this approach actually fits better with theTransformStream
model, whose own ability is to transform data streams, and we can similarly transform data streams with the help of theTransformStream
to read and write to the stream.
const transformStream = new TransformStream<number, number>({
transform(chunk, controller) {
(chunk + 1);
},
});
const writer = ();
const reader = ();
const process = (res: { value?: number; done: boolean }) => {
const { value, done } = res;
(value, done);
if (done) return;
().then(process);
};
().then(process);
(1);
(2);
();
Then here we can realize that forReadableStream
The data processing, in the context of the data processing based on theWebRTC
In the data transfer implementation of theDataChannel
of the data stream itself, then at this point we can pass theReadableStream
(used form a nominal expression)Controller
to place data into the buffer queue as a way to write data, and subsequent data consumption can be done using theReadableStream
(used form a nominal expression)Reader
to achieve this, so that we can implement streaming data transfer with the help of buffer queues.
const readable = new ReadableStream<number>({
start(controller) {
(1);
(2);
();
},
});
const reader = ();
const process = (res: { value?: number; done: boolean }) => {
const { value, done } = res;
(value, done);
if (done) return;
().then(process);
};
().then(process);
Back pressure issues
So here's a question we can think about, if at this point ourDataChannel
of the data stream is transmitted at a very fast rate, that is, the data is constantly beingenqueue
If we consume the data very slowly, for example, if our hard disk writes slowly, then the queue of data grows, which can lead to a memory overflow. There is actually a specialized term for this problem, namelyBack Pressure
Back pressure issues inReadableStream
In theto get the size of the current queue, as a way to control the rate of data production, as a way to avoid data backlogs.
const readable = new ReadableStream<number>({
start(controller) {
(); // 1
(1);
(); // 0
(2);
(); // -1
();
}
});
As for the backpressure problem, we can simply understand that when our data production rate is greater than the data consumption rate, it leads to a backlog of data, then for theReadableStream
together withWritableStream
We can get the relevant queuing strategy, in fact, we can easily understand that the so-called backpressure pressure is from the buffer queue in the un-consumed blocks, of course, we can also preset the buffer queue length is relatively large, except that this although avoided thedesiredSize
for negative values, but it does not solve the back pressure problem.
- insofar as
ReadableStream
, backpressure comes from blocks that have been queued but not yet read. - insofar as
WritableStream
The backpressure comes from blocks that have been written but not yet processed by the underlying receiver.
And in the previousReadableStream
implementation, we can clearly see that it doesn't carry a default handling mechanism for backpressure itself, even though we can pass thedesiredSize
to determine the current pressure on the built-in queue, but we don't have very clear feedback on how fast the data is being produced, and we'd prefer to control it based on event-driven rather than something like thesetTimeout
to rotate through the inspections, and of course we can pass thepull
method to passively control the amount of data in the queue. While theWritableStream
Instead, there is a built-in backpressure handling method, i.e., theThis method allows us to determine the pressure on the current queue, and in this way we can control the rate of data production.
(async () => {
const writable = new WritableStream();
const writer = ();
await (1);
await (1);
await (1);
("written"); // written
await ;
await (1);
("written"); // Nil
})();
So in ourWebRTC
In the data transfer, the backpressure is handled conveniently by theTransformStream
(used form a nominal expression)writable
end to enable the writing of data, while consumption is done through thereadable
end to achieve this, so that we have good control over the rate at which data is produced, and we can put theTransformStream
After defining thereadable
end-to-endpostMessage
treat it asTransferable Object
floatService Worker
Just spend it in the center.
// packages/webrtc/client/worker/
export class WorkerEvent {
public static start(fileId: string, fileName: string, fileSize: number, fileTotal: number) {
const ts = new TransformStream();
.(
{
key: MESSAGE_TYPE.TRANSFER_START,
id: fileId,
readable: ,
} as MessageType,
[]
);
}
public static async post(fileId: string, data: ArrayBuffer) {
const writer = (fileId);
if (!writer) return void 0;
await ;
return (new Uint8Array(data));
}
public static close(fileId: string) {
?.({
key: MESSAGE_TYPE.TRANSFER_CLOSE,
id: fileId,
} as MessageType);
const writer = (fileId);
writer?.close();
}
}
Fetch
existFetch API
(used form a nominal expression)Response
object, the presence of theattribute to get the response's
ReadableStream
The interface is also used to represent readable streams in the same way as the above objects. This interface allows us to read data in streams without having to read all the data into memory at once in order to process the data incrementally, for example when using thefetch
realizationSSE - Server-Sent Events
response, it is possible to maintain long links in conjunction with theReadableStream
to realize the response of the data.
in the context ofFetch
Methods, in contact withStream API
Previously we may have mainly handled this by calling the()
etc. to read the data, in fact these same methods will implicitly call the()
to read the data. While theStream API
Prior to this, if we wanted to work with a resource such as a video, text file, etc., we had to download the entire file, wait for it to be deserialized into the proper format, and then work with all the data directly.
Therefore, in the previous researchOne of the more puzzling questions I have is that since the data we are requesting still needs to be downloaded from all of it into memory, in this case we use the
It still won't be possible to stream data to the hard disk, and there will still be browser
Tab
page of memory overflow problems. And after learning about theFetch API
(used form a nominal expression)attribute, it becomes clear about how the entire stream is handled, and we can keep calling the
read()
method passes the data to theService Worker
Just schedule a download.
So the scheduling file downloads are presumably the same as described above for theWebRTC
The transmission is similar, in that we have finished hijacking the intermediary of the data requestService Worker
After that, we just need to initiate the main thread portion of thefetch
request, and then responds to the data with theIframe
initiates the hijacked download request, which is then passed through the()
The slice reads the data and keeps writing it to theTransformStream
(used form a nominal expression)Writer
We can also implement some effects such as download progress and so on.
const fileId = "xxxxxx";
const worker = await ("./");
const channel = new MessageChannel();
({ type: "INIT_CHANNEL" }, [channel.port2]);
const ts = new TransformStream();
channel.(
{ key: "TRANSFER_START", id: fileId, readable: , },
[]
);
const src = `/${fileId}` + `?X-File-Id=${fileId}` +
`&X-File-Size=42373` + `&X-File-Total=1` + `&X-File-Name=`;
const iframe = ("iframe");
= true;
= src;
= fileId;
(iframe);
const writer = ();
fetch("./").then(res => {
const reader = ();
const process = (res) => {
const { value, done } = res;
if (done) {
();
return;
}
(value);
().then(process);
};
().then(process);
});
Service Worker
Service Worker
As an independent thread running in the background, it has the ability to act as a web request broker, intercepting, modifying, or even replacing web requests and responses altogether, enabling advanced features such as cache management, performance enhancement, offline access, and fine-grained control and optimization of requests. This is where we can leverage theService Worker
Add to our request responseContent-Disposition
and other response headers as a way to trigger the browser's ability to download with the help of the browser'sIO
Enables downloading of large files.
Environment Setup
After passingService Worker
Before implementing a man-in-the-middle to intercept web requests, we can take a look at theService Worker
hit the nail on the headTS
The environment andWebpack
configuration. We usuallyTS
development environmentlib
stapledom
、、
esnext
and as a result ofWorker
The global variables and the methods held in it are not the same, so its ownlib
The environment needs to be changed toWebWorker
、ESNext
, and if the module is not actively introduced or exported.TS
would consider it to be asuse, so we should default to exporting an empty object even if we don't have a default import and export, and take care to put it in the
tsconfig
centerinclude
Related modules.
// packages/webrtc/client/worker/
/// <reference lib="esnext" />
/// <reference lib="webworker" />
declare let self: ServiceWorkerGlobalScope;
export {};
Service Worker
itself as an independentJs
file, it must be run under the same-origin policy, where it needs to be configured as a separate route loading path if you need to focus on the routing environment of the deployment environment. For our static resources themselves we need to configure our implementation of the separateWorker
configured as an entry file into the packaging tool, and to facilitate the handling of theSW
Whether or not it is registered and cached for updates, we usually fix it to a definite filename as a way to ensure its uniqueness in the cache.
// packages/webrtc/
/**
* @type {import("@rspack/cli").Configuration}
*/
const Worker = {
context: __dirname,
entry: {
worker: "./client/worker/",
},
devtool: isDev ? "source-map" : false,
output: {
clean: true,
filename: "[name].js",
path: (__dirname, "build/static"),
},
};
= [/** ... */, Worker];
existService Worker
In itsinstall
incidents andactivate
event handles the logic for its installation and activation separately, usually the newService Worker
After the installation is complete it will go into a waiting phase until the oldService Worker
is completely uninstalled and then activated, so we can justonInstall
hourskipWaiting
inonActive
event in theTakes over all client pages immediately upon activation, no need to wait for page refreshes, which is great for debugging our
SW
It is very useful when it comes to the
// packages/webrtc/client/worker/
("install", () => {
();
("Service Worker Installed");
});
("activate", event => {
(());
("Service Worker Activate");
});
Request Interception
Next we're going to look at theService Worker
The ability to intercept web requests in theMDN
There is a need forFetch Event
The detailed descriptions of theFetch Event
is only able to be used inService Worker
The same is used in the And here our interception of requests and responses is very simple, we just need to get the relevant information from the address of the request, theid
、name
、size
、total
and then through theReadableStream
tectonic (geology)Response
As a response will suffice, the main thing to focus on here is theContent-Disposition
together withContent-Length
Two response headers, which is our key configuration for triggering the download.
// packages/webrtc/client/worker/
= event => {
const url = new URL();
const search = ;
const fileId = (HEADER_KEY.FILE_ID);
const fileName = (HEADER_KEY.FILE_NAME);
const fileSize = (HEADER_KEY.FILE_SIZE);
const fileTotal = (HEADER_KEY.FILE_TOTAL);
if (!fileId || !fileName || !fileSize || !fileTotal) {
return void 0;
}
const transfer = (fileId);
if (!transfer) {
return (new Response(null, { status: 404 }));
}
const [readable] = transfer;
const newFileName = encodeURIComponent(fileName).replace(/['()]/g, escape).replace(/\*/g, "%2A");
const responseHeader = new Headers({
[HEADER_KEY.FILE_ID]: fileId,
[HEADER_KEY.FILE_SIZE]: fileSize,
[HEADER_KEY.FILE_NAME]: newFileName,
"Content-Type": "application/octet-stream; charset=utf-8",
"Content-Security-Policy": "default-src 'none'",
"X-Content-Security-Policy": "default-src 'none'",
"X-WebKit-CSP": "default-src 'none'",
"X-XSS-Protection": "1; mode=block",
"Cross-Origin-Embedder-Policy": "require-corp",
"Content-Disposition": "attachment; filename*=UTF-8''" + newFileName,
"Content-Length": fileSize,
});
const response = new Response(readable, {
headers: responseHeader,
});
return (response);
}
Another interesting thing here is that in the above implementation we can see a check for getting the relevant information from the request address, and if the check fails it returns theundefined
. This is actually a pretty common interceptionCase
, i.e., requests that don't qualify we just let them go, and the question I've always wondered about before is that any request that goes through theService Worker
Intercepted requests are added to ourNetwork
The request with the gear symbol appears in the console panel, that is, from theService Worker
The requests initiated in this will look very confusing when debugging.
In fact, this would simply be a problem with our use, as is evident from the alert message that this is from theService Worker
The request was initiated in theService Worker
and the main reason for triggering this request entry is that we called thefetch
method, while either returning directly to thefetch
or through(fetch)
will all trigger this request entry, so when we intercept the request, if the condition is not met then we directly return theundefined
Ready to go.
// will initiate the request again
return fetch(); return (fetch())
return (fetch()).
// Will not initiate the request again
return ;
Then we need to move on to the question of how the download should be triggered, where theService Worker
merely intercepts the request, while theWebRTC
The transmission does not actually initiate anyHTTP
request, so we need to trigger this request proactively, thanks to theService Worker
can intercept almost any request, including static resources, network requests, etc., so we can directly leverage the creation of theIframe
The download is accomplished by matching an agreed-upon field name, which in this case is actually the rather strange link address we mentioned at the beginning.
// packages/webrtc/client/worker/
const src =
`/${fileId}` +
`?${HEADER_KEY.FILE_ID}=${fileId}` +
`&${HEADER_KEY.FILE_SIZE}=${fileSize}` +
`&${HEADER_KEY.FILE_TOTAL}=${fileTotal}` +
`&${HEADER_KEY.FILE_NAME}=${fileName}`;
const iframe = ("iframe");
= true;
= src;
= fileId;
(iframe);
Here we may wonder the question why our request information is coming from theURL
instead of directly constructing the associatedHeader
information in theService Worker
It is sufficient to forward the agreed upon response headers directly, i.e. why use theIframe
rather thanfetch
request and carry the request header to enable downloads. This is actually because even if there is a"Content-Disposition": "attachment; xxx"
response header.fetch
Requests also do not support the ability to initiate downloads directly.
I actually did a little more research hereThe realization that it is equally interesting that the
The runtime environment itself is a
Iframe
assume (office)So let's call it what it is.
So at this point, our main thread is called
. At this point we are in the
B
The name of the program is registered in the(used form a nominal expression)
Service Worker
After that, we pass thepython3 -m 9000
etc. as a service resource to openA
address of the new ware port9001
show (a ticket)B
address to ensure that it exists across domains.
<!-- -->
<iframe src="http://localhost:9001/" hidden></iframe>
<!-- -->
<script>
("./", { scope: "./" });
</script>
//
= (e) => {
();
if (("ping")) {
(new Response("pong"));
}
};
At this point we are in theCreate a new
iframe
addresslocalhost:9001/ping
, that is, similar to the one used in theThe kind of temporary download address that is created, we can find out that this address can be listened to surprisingly, i.e.
Service Worker
The request could be intercepted, at the time I thought this was amazing because it was theoretically impossible to intercept in the case of a different domain name, and I had thought that I had found somethingiframe
feature, and finally realized that we were accessing the9001
source address, that is, the equivalent is still in thesource, if at this point we are accessing a resource under the
9000
resources will not have this effect anymore.
const iframe = ("iframe");
= true;
= "http://localhost:9001/ping";
(iframe);
In addition, if we actually open the address bar of our browser directly to thehttp://localhost:9001/ping
It is also equally possible to getpong
response, which means that theService Worker
The scope of the interception is in the registeredscope
If it is necessary to do so, then we can actually use it based on theSW
to realize the offlinePWA
application without relying on the routing of the server response as well as the interface. In addition, this effect is not visible in ourWebRTC
practicalSW
is also present, and when we click on the download link again we can't get a response, it's because we've checked that thetransfer
Doesn't exist. It's a direct response.404
。
const transfer = (fileId);
if (!transfer) {
return (new Response(null, { status: 404 }));
}
data communication
Getting back to the point, next we need to implement the same function as theService Worker
The communication scheme is up, and the implementation here is a bit more conventional. First we have to register theService Worker
In the sameScope
Only one can be registered underService Worker
If you register more than oneService Worker
Then the post-registrationService Worker
will overwrite the first registeredService Worker
Of course, this problem doesn't exist.WebWorker
in. Here we make use of thegetRegistration
together withregister
respectively, to get the currently activeService Worker
As well as registering for a newService Worker
。
// packages/webrtc/client/worker/
if (!) {
("Service Worker Not Supported");
return (null);
}
try {
const serviceWorker = await ("./");
if (serviceWorker) {
= serviceWorker;
return (serviceWorker);
}
const worker = await (
.PUBLIC_PATH + "?" + .RANDOM_ID,
{ scope: "./" }
);
= worker;
return worker;
} catch (error) {
("Service Worker Register Error", error);
return (null);
}
collaboration withService Worker
For data communication, we can leverageMessageChannel
to realize.MessageChannel
is a bi-directional communication channel that can be used in two differentContext
Passing messages in, for example, the main thread with theWorker
threads to communicate data with each other. All we need to do is to create in the main thread aMessageChannel
and then turn itport2
ports are connected viapostMessage
pass on toService Worker
but (not)Service Worker
Then you can pass the[0]
Get thisport2
We can then use these twoport
Direct communication now.
Perhaps we'll ponder the question of why we can put theport2
floatService Worker
in which, theoretically, ourpostMessage
You can only pass serializableStructured Clone
objects, such as strings, numbers, and other data types, while theport2
itself exists as a non-serializable object. So here's where theTransferable objects
The concept of a transferable object is an object that has its own resources that can be transferred from one context to another, ensuring that the resources are only available in one context at a time, and that after the transfer the original object is no longer available, it no longer points to the transferred resource, and any attempt to read or write to the object will throw an exception.
// packages/webrtc/client/worker/
if (!) {
= new MessageChannel();
. = event => {
("WorkerEvent", );
};
?.active?.postMessage({ type: MESSAGE_TYPE.INIT_CHANNEL }, [
.port2,
]);
}
Because here we don't need to receive data fromService Worker
message, so here we have the same message for theport1
The received message is simply printed. While initializing theCHANNEL
The time when we willport2
is placed as a transferable object in the second parameter, as a way to add a new parameter in theService Worker
You can receive thisport2
, since all of our future messaging is done by theMessageChannel
proceeds, so here's theonmessage
The role is simply to receiveport2
Object Port.
// packages/webrtc/client/worker/
= event => {
const port = [0];
if (!port) return void 0;
};
Then immediately after that we need to use theTransformStream
Performs data reads and writes now, due to theTransformStream
itself is also a transferable object, so we can define it directly in the main thread, and then, when initializing the file download, set thereadable
end-to-endService Worker
in the downloadedReadableStream
instance constructionResponse
object. Then the next step in the main thread is to create theiframe
After triggering the download behavior, we can then add theFetch Event
choose frommap
retrievereadable
Up.
// packages/webrtc/client/worker/
const ts = new TransformStream();
.(
{
key: MESSAGE_TYPE.TRANSFER_START,
id: fileId,
readable: ,
} as MessageType,
[]
);
(fileId, ());
// tectonic (geology) iframe Trigger download behavior
// ...
// packages/webrtc/client/worker/
= event => {
const payload = as MessageType;
if (!payload) return void 0;
if ( === MESSAGE_TYPE.TRANSFER_START) {
const { id, readable } = payload;
(id, [readable]);
}
};
// 在Trigger download behavior后 through (a gap) map retrieve readable
// ...
In the main thread, we are concerned with the writing of content and the built-in backpressure control due to theTransformStream
With its own internal implementation of queues as well as backpressure control, we don't need to be too concerned about the problems caused by data production, since in the previous implementation of ourWebRTC
The feedback link for the download is well-established, and we only need to resort to theawait
Just control the write speed. The interesting thing here is that even thoughTransformStream
(used form a nominal expression)readable
together withwritable
Both ends are now running in two contexts, which are still capable of reading and writing data as well as backpressure control.
// packages/webrtc/client/worker/
const writer = (fileId);
if (!writer) return void 0;
// perceptual BackPressure Need to be proactive await ready
await ;
return (new Uint8Array(data));
Then in the number of data blocks i.e.total
Once the last block of the transfer is complete, we need to recycle the entire transfer behavior. The first step is toTransformStream
(used form a nominal expression)writable
end needs to be closed, thisWriter
must be actively scheduled to close the method, otherwise the browser can not sense the completion of the download, will always be in the state of waiting for the completion of the download, the second is that we need to create theiframe
through (a gap)body
Recycling on theService Worker
We also need to integrate themap
The data in it is cleaned up to avoid problems such as previous links still being able to respond.
// packages/webrtc/client/worker/
const iframe = (fileId);
iframe && ();
?.({
key: MESSAGE_TYPE.TRANSFER_CLOSE,
id: fileId,
} as MessageType);
const writer = (fileId);
writer?.close();
(fileId);
// packages/webrtc/client/worker/
= event => {
const payload = as MessageType;
if (!payload) return void 0;
if ( === MESSAGE_TYPE.TRANSFER_CLOSE) {
const { id } = payload;
(id);
}
};
Compatibility Considerations
In modern browsersService Worker
、Fetch API
、Stream API
All of them are already relatively well supported, and here we use the relatively recent features ofTransformStream
The compatibility is also good in2022
Browser versions released after the year are largely supported, however, if weMDN
(used form a nominal expression)TransformStream
A closer look in compatibility then reveals thatTransformStream
act astransferable
existSafari
Medium has not been supported so far.
So what is the problem that will be created here, we can note that in the previousTRANSFER_START
The time when we are placing theTransformStream
(used form a nominal expression)readable
end-to-endTransferable Object
floatService Worker
in it, then at this point due to theSafari
does not support this behavior, ourReadableStream
Naturally, it can't be passed on toService Worker
and therefore our subsequent downloads will not continue, so if you need to be compatible with theSafari
case, we need to deal with this issue.
The reason for this problem is that we can't putReadableStream
Transfer of ownership toService Worker
in the program, so a simpler approach that can be thought of is to directly add a new program to theService Worker
Medium DefinitionReadableStream
That's all it takes. That is, when the transfer starts, we instantiate theReadableStream
and save its controller object, when the data is passed, we directly pass the data blockenqueue
to the buffer queue, and at the end of the transfer, we directly call the()
method will suffice, and thisreadable
object we can then use directly as a request interceptor for theResponse
The response is downloadable content.
let controller: ReadableStreamDefaultController | null = null;
const readable = new ReadableStream({
start(ctr) {
controller = ctr;
},
cancel(reason) {
("ReadableStream Aborted", reason);
},
});
(fileId, [readable, controller!, Number(fileTotal)]);
= event => {
const data = as BufferType;
destructureChunk(data).then(({ id, series, data }) => {
const stream = (id);
if (!stream) return void 0;
const [, controller, size] = stream;
(new Uint8Array(data));
if (series === size - 1) {
();
(id);
}
});
};
Then here we realize the backpressure problem we talked about earlier, since here we don't have any feedback mechanism for backpressure, but instead we just take all the data blocks from the main thread andenqueue
until (a time)ReadableStream
in the data transfer rate than the browser-controlled downloads.IO
With fast speeds, it's easy to get a backlog of data. Therefore we need to think of a way to realize the control of back pressure, then we can think of the following way relatively easily.
- When instantiating the
ReadableStream
object with the help of theCountQueuingStrategy
Creating a large enough buffer is possible because we already know the size of the file and the number of chunks during the transfer itself. Of course, we probably don't need to create a buffer equal to the number of chunks; we can divide it by the number of chunks.2
Rounding or logarithmic is fine, after all, the download is also consuming by writing to the hard disk. - When instantiating the
ReadableStream
time-dependentunderlyingSource
objects, except for thestart
In addition to the methods there are actuallypull
method will be called repeatedly when the stream's internal data block queue is not full until it reaches its high watermark, we can then control the frequency of the stream by using this method call as an event-driven mechanism, it should be noted that it will only be called repeatedly if it is queued up for at least one data block, and it will only be called repeatedly if it is queued up for at least one data block before it reaches its high watermark.pull
Functions are not called repeatedly if they are not actually in the queue block at the time of the function call.
Let's first look at the problem of allocating a sufficiently large buffer queue here, and if you think about it in depth, even if a sufficiently large buffer is allocated, and we don't actually implement any feedback mechanism to control the production aspects of slowing down the data, then this buffer, even if it is large enough, doesn't solve our memory overflow problem, although even if a sufficiently large buffer is allocated at instantiation, it doesn't immediately allocate such a large amount of memory. At this point, then, even if we don't allocate that large a buffer, the queue implemented in default mode is exactly the same, except that its internaldesiredSize
will become a relatively large negative value, and the data is not actually lost, because at this point the browser's streaming implementation stores the data in memory until it is read by the consumer.
So let's take a look at the second implementation, viapull
method we do get the information from theReadableStream
of buffer queue feedback, then we can simply implement a way to control the flow, considering that we'll have two states, production over consumption and consumption over production, then we can't simply use thepull
We should implement another buffer queue internally, and our event-driven data entry should have two parts, respectively, when the buffer queue enters the data, we need to check if the data pulled last time was not successful but waiting, then we need to schedule the last time thepull
not completed by the end of the yearPromise
, which is the case where consumption is greater than production, and there is also the event ofpull
When you check the buffer queue directly to see if there is any data, if there is, then directly put data, that is, the production is greater than the consumption of the case.
const pending = new WeakMap<ReadableStream, (stream: string) => void>();
const queue = ["1", "2", "3", "4"];
const strategy = new CountQueuingStrategy({ highWaterMark: 3 });
const underlyingSource: UnderlyingDefaultSource<string> = {
pull(controller) {
if (!) {
("Pull Pending");
return new Promise<void>(resolve => {
const handler = (stream: string) => {
(stream);
(readable);
("Pull Restore", stream);
resolve();
};
(readable, handler);
});
}
const next = ();
(next);
("Pull", next);
return void 0;
},
};
const readable = new ReadableStream<string>(underlyingSource, strategy);
const write = (stream: string) => {
if ((readable)) {
("Write Pending Pull", stream);
(readable)!(stream);
} else {
("Write Queue", stream);
(stream);
}
};
// Make the read task post let sb do sth pull commander-in-chief (military) Readable The buffer queue is full.
setTimeout(async () => {
// this time queue Data still exists in the queue Production is greater than consumption
const reader = ();
("Read Twice");
// post-readout queue The data in the queue has been read. Consumption equals production
("Read", await ());
// post-readout queue The queue is empty. Readable The buffer queue is not full.
// beyond Readable Still launching pull event Consumption over production
("Read", await ());
("Write Twice");
// Write to the pending pull mandates Consumption equals production
write("5");
// write queue formation Production is greater than consumption
write("6");
}, 100);
// Pull 1
// Pull 2
// Pull 3
// Read Twice
// Pull 4
// Read {value: '1', done: false}
// Pull Pending
// Read {value: '2', done: false}
// Write Twice
// Write Pending Pull 5
// Pull Restore 5
// Write Queue 6
It looks like we've achieved a pretty awesome solution based on thepull
buffer queue control, but a closer look reveals that we seem to have overlooked something; are we simply putting theReadableStream
The built-in buffer queue is raised to the outside, and we actually still face memory pressure, except that here the data backlog has been moved away from theReadableStream
After moving to our own defined array, we don't seem to have solved the problem at all.
So let's think about what the problem is when we use theTransformStream
The time when our back pressure control seems to be merelyawait
So what exactly does it mean here, we can obviously think out here is the carrier feedback mechanism, that is to say, when it thinks that the internal queue is under pressure, it will actively block the producer's data production, and our implementation does not seem to be from theService Worker
to the feedback mechanism in the main thread, which is why we don't have a way to deal with backpressure.
So let's look at it a little more essentially, the way we communicate ispostMessage
, so what is the problem here, or if we want to use the main thread with theawait
What we lack if we control the backpressure directly in the same way as we do is obviously because we have no way to get a response to the post-transmission event, so here because thepostMessage
It's one-way communication. There's no way we can do it.postMessage().then()
Such an operation, even as we can do it in thepostMessage
immediately afterwardready
pendingPromise
The response data is waited for by theresolve
, from which a similar operation can be done.
This is not a complicated operation, so could we make it more generalized, similar to thefetch
implementation, when we initiate a request/push, we can leverage thePromise
for a certain period of time or even keep waiting for its corresponding response, and since ourpostMessage
is a one-way data transfer, we would need to add the level of the dataid
flag so that we can know exactly what the current response should beresolve
closest connectionPromise
。
With this in mind, we need to deal with the transmission of the data, i.e. since we need to append identifying information to the original data is not an easy task, in thepostMessage
If it's a string we can just construct another layer of objects, however if it's a string we can use theArrayBuffer
data, we need to manipulate its ownBuffer
, which is obviously a bit fiddly. So I wanted some easy way to serialize it so that it could then be transferred as a string, and here I considered theBASE64
、Uint8Array
、Uint32Array
of the serialization method.
Let's take the simplest8
bytes, for example, and separately calculate the serializedBASE64
、Uint8Array
、Uint32Array
Volume Problem. If every bit of our data at this point is0
words, respectively, the encoding results are computed asAAAAAAAAAAA=
、[0,0,0,0,0,0,0,0]
、[0,0]
Occupied12
Characters,17
Characters,5
The volume of the character.
const buffer = new ArrayBuffer(8);
const input = new Uint8Array(buffer);
(0);
const binaryStr = (null, input);
("BASE64", btoa(binaryStr)) ; // AAAAAAAAAAA=
const uint8Array = new Uint8Array(input);
("Uint8Array", uint8Array); // Uint8Array(8) [0, 0, 0, 0, 0, 0, 0, 0]
const uint32Array = new Uint32Array();
("Uint32Array", uint32Array); // Uint32Array(2) [0, 0]
In the result above we appear to beUint32Array
has the best serialization results, however this is the case when all of the bits we described above are populated with the0
case, however, it is certainly not so ideal in actual transmission, so let's take the counterexample and fill it all up with the1
to test the effect. The results become different at this point, with the encoding calculated separately as//////////8=
、[255,255,255,255,255,255,255,255]
、[4294967295,4294967295]
Occupied12
Characters,33
Characters,23
The volume of the character.
const buffer = new ArrayBuffer(8);
const input = new Uint8Array(buffer);
(0);
const binaryStr = (null, input);
("BASE64", btoa(binaryStr)) ; // //////////8=
const uint8Array = new Uint8Array(input);
("Uint8Array", uint8Array); // Uint8Array(8) [255, 255, 255, 255, 255, 255, 255, 255]
const uint32Array = new Uint32Array();
("Uint32Array", uint32Array); // Uint32Array(2) [4294967295, 4294967295]
So it looks like it's stillBASE64
The serialization result is more robust because of its inherently bitwise encoding method, which will encode every6 bits
Code Total64
Fetch the characters in the array by index, so that it becomes every3
bytes i.e.24 bits
would be encoded as4
characters into32 bits
And at this point we have8
Bytes that is64 bits
It is not possible to be24 bits
is completely divisible, then at this point we first deal with the pre6
bytes, if all bits are0
If so, then the former8
characters would all beA
And at this point we're still left with16 bits
Then we'll fill in the8 bits
Round it up to24 bits
and then encoded as4
characters (the last6 bits
leave it (to sb)=
filled), so the end result is that the12
Characters.
However here I realized that I was overthinking it, and that we don't actually need to think about serialization coding in ourRTC DataChannel
It's true that it has to be a plain string orArrayBuffer
and other data that can't be transferred directly to the object, but in thepostMessage
The data we can pass in theThe Structured Clone Algorithm
algorithmically controlled, and theArrayBuffer
objects are also listed, and without the aid of thetransfer
capability to implement the ownership issue, which will actually execute the built-in serialization methods. In my actual testsChrome
、Firefox
、Safari
All support this direct data transfer, which after all is done in the same browser, and its data transfer can be a bit more relaxed.
<!-- -->
<script>
("./", { scope: "./" }).then(res => {
= res;
})
</script>
//
= (event) => {
("Message", event);
= event;
};
// Console execution heed SW The data response as well as the value
const buffer = new ArrayBuffer(8);
const input = new Uint8Array(buffer);
(255);
({ id: "test", buffer })
So what do we know about the need to start withService Worker
The data implementation of the response is much simpler; after all, we just need to treat it as a normal object now, and we don't need to think about any serialization issues. At this point we take advantage of thePromise
characteristic, when receiving thepostMessage
response from the global store to look up the currentid
correspondingresolve
We can also add a timeout mechanism to avoid the need for a timeout mechanism, so that we can easily provide backpressure feedback.resolve
of the backlog.
// analog (device, as opposed digital) onMessage methodologies
let onMainMessage: ((event: { id: string; payload: string }) => void) | null = null;
let onWorkerMessage: ((event: { id: string; payload: string }) => void) | null = null;
// analog (device, as opposed digital) postMessage methodologies
const postToWorker = (id: string, payload: string) => {
onWorkerMessage?.({ id, payload });
};
const postToMain = (id: string, payload: string) => {
onMainMessage?.({ id, payload });
};
// Worker
(() => {
onWorkerMessage = ({ id, payload }) => {
("Worker Receive", id, payload);
setTimeout(() => {
postToMain(id, "pong");
}, 1000);
};
})();
// Main
(() => {
const map = new Map<string, (value: { id: string; payload: string }) => void>();
onMainMessage = ({ id, payload }) => {
const resolve = (id);
resolve?.({ id, payload });
(id);
};
const post = (payload: string) => {
const id = ().toString(36).slice(2);
return new Promise<{ id: string; payload: string }>(resolve => {
(id, resolve);
postToWorker(id, payload);
});
};
post("ping").then(res => {
("Main Receive", , );
});
})();
question of the day
/WindrunnerMax/EveryDay
consultation
/post/6844904029244358670
/jimmywarting/
/jimmywarting/native-file-system-adapter
/zh-CN/docs/Web/API/FetchEvent
/docs/latest/api/#types-of-streams
/en-US/docs/Web/API/TransformStream
/zh/oss/user-guide/map-custom-domain-names-5
/zh-CN/docs/Web/HTML/Element/a#download
/zh-CN/docs/Web/HTTP/Headers/Content-Disposition
/en-US/docs/Web/API/Streams_API/Concepts#backpressure
/en-US/docs/Web/API/Web_Workers_API/Transferable_objects
/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm