Project ReactorAs the core implementation framework of the responsive programming paradigm,Strictly follow the Reactive Streams specification system, its architectural design completely includes four core components defined by the specification: Publisher (data source), Subscriber (subscriber), Subscription (subscription relationship) and Processor (processor node). In this framework,FluxandMonoNot only does it realize the standard semantics of the Publisher interface, but it also builds a complete responsive data stream processing paradigm: establish production-consumption channels through subscription relationships, realize non-blocking data push based on the event-driven mechanism, and ensure the system's elastic communication through the backpressure protocol.
Basic Process
Understanding the working principle of Project Reactor as a whole can help us more clearly grasp the various concepts and operations in it and avoid losing our direction. In fact, from the overall perspective, the entire Reactor is based on the subscription-release model. Flux and Mono are the default Publishers in the system, simplifying our work on customizing Publishers. Flux and Mono integrate a large number of operators, and the existence of these operators reduces our need to customize Subscriber and Processor. With these combinations of operators, we can operate directly on data sources and elements without having to write additional Processors and Subscribers ourselves. Unless in special circumstances, it is not recommended to actively customize Subscriber and Processor.
Create data source (Flux, Mono) -> Convert and process data (map, filter...) -> subscribe to data source
1. Responsive data source:
1.1 Flux and Mono
As the core publisher of Project Reactor, the main differences between Flux and Mono are as follows:
-
Flux represents an asynchronous sequence of 0-N elements
-
Mono represents asynchronous operation of 0-1 result
// Create Flux
("1", "2", "3").subscribe(::println);
// Create Mono
("a").subscribe(::println);
1.2 Data source type
After understanding Flux and Mono, we learned how to simply create data sources. Flux and Mono also provide us with a lot of ways to create data sources, which are roughly divided into the following categories.
- Empty data source: Used to represent a completion signal without data (such as the result of a deletion operation).
- Dynamic Generation:
and
/
Allows manual control of element emission (synchronous or asynchronous).
- Async data source: from
Future
、Callable
orSupplier
Get data in it and supports non-blocking operations. - Time Driven:
Delayed transmission,
Periodic emission incremental value.
- Merge/Combination:
zip
Strictly align elements,merge
Unorderly merged,concat
Sequentially connected. - Backpressure adaptation: By
FluxSink
orMonoSink
Manually control backpressure and element emission.
Classification summary of how Mono and Flux data source creation
category | describe | Mono method example | Flux method example |
---|---|---|---|
Empty data source | Creates a data stream that does not emit any elements. | () |
() |
Single element | Emit a single static value or object. | (T) |
(T...) |
Multiple elements | Emit multiple static values or objects (onlyFlux support). |
N/A |
(T1, T2...) |
Collection/array | Generate elements from a collection or array. | N/A |
(List<T>) (T[])
|
Stream | From JavaStream Generate elements. |
N/A |
(Stream<T>) |
Dynamic generation | Generate elements dynamically through generator functions. | (sink -> {...}) |
(sink -> {...}) (sink -> {...})
|
Asynchronous data source | From asynchronous operation (e.g.Future 、Callable ) Get data. |
(Future) (Callable)
|
(Publisher) (Supplier<Stream>)
|
Error signal | Directly transmit the error signal. | (Throwable) |
(Throwable) |
Delay initialization | Lazy data generation (logic execution is performed when subscribed). |
(() -> ...) (Supplier)
|
(() -> ...) (Supplier<Stream>)
|
Time-driven | Generate data based on time (such as timing, delay). | (Duration) |
(Duration) |
Merge/combination | Merge multiple data sources. | (Mono1, Mono2...) |
(Flux1, Flux2...) (Flux1, Flux2...) (Flux1, Flux2...)
|
Back pressure adaptation | Adapt external backpressure mechanism (e.g.Sink Manual control). |
(MonoSink) |
(FluxSink) |
Conditional trigger | Generate data according to conditions (e.g.first 、takeUntil )。 |
(Mono1, Mono2) |
(Publisher...) (Predicate)
|
1.3 Data source publishing model
The publishing model of Project Reactor is the core mechanism of its responsive programming, mainly divided intoCold PublisherandHot Publisher. Their differences lie in the way data streams are generated, shared, and subscribers' consumption behavior. The following is a detailed explanation:
1.3.1. Cold Publisher
definition: Cold publishers generate for each subscriberIndependent data flow. Each subscriber triggers the complete generation process of the data source, even if other subscribers have subscribed.
Features:
- Data flow independent: Each subscriber consumes data from scratch.
- Delayed generation: Data is generated only when subscribed (lazy calculation).
- Resource Isolation: The data generation logic of different subscribers does not affect each other.
Applicable scenarios:
-
Each subscriber needs to obtain complete data (such as HTTP requests, database queries).
-
The generation cost of data sources is high, but subscribers need to be independent.
Code Example
// Create a cold publisher Flux<Integer> coldFlux = (1, 3).doOnNext(i -> ("Cold Publisher Published: " + i)); // The first subscriber (i -> ("Subscriber 1: " + i)); // The second subscriber (i -> ("Subscriber 2: " + i));
Output
Cold publisher sends: 1 Subscribers 1: 1 Cold publisher sends: 2 Subscribers 1: 2 Cold publisher sends: 1 Subscriber 2: 1 Cold publisher sends: 2 Subscriber 2: 2
1.3.2. Hot Publisher
-
definition: Hot publishers share oneUnified data flow, all subscribers consume the same data. The generation of data sources is not related to the subscriber's subscription time, and the subscriber who subscribes later may miss out on early data.
Features:
- Data flow sharing: All subscribers receive the same data source.
- Real-time: The generation of data sources is independent of subscription behavior.
- Resource reuse: Multiple subscribers share the same data generation logic.
Applicable scenarios:
- Real-time event push (such as sensor data, stock quotations).
- Data is required to avoid repeated generation of high-cost operations (such as WebSocket messages).
There are several ways to implement hot publishers:
-
ConnectableFlux (manual control)
passpublish()
Method willFlux
Convert toConnectableFlux
, need to be called manuallyconnect()
Start the data flow.
Code Example
// Create ConnectableFlux and convert to hot publisher
ConnectableFlux<Integer> hotFlux = (1, 3)
.doOnNext(i -> ("Hot Publisher Published: " + i))
.publish(); // Convert to ConnectableFlux
// Subscriber A
(i -> ("Subscriber A: " + i));
// Subscriber B
(i -> ("Subscriber B: " + i));
// Manually trigger the start of data flow
();
Output
Hot publisher Published: 1
Subscriber A: 1
Subscriber B: 1
Hot publisher Published: 2
Subscriber A: 2
Subscriber B: 2
Hot Publisher Published: 3
Subscriber A: 3
Subscriber B: 3
-
autoConnect()
(Automatic connection)
The data flow is automatically started when the specified number of subscribers is reached.
Flux<Integer> autoFlux = (1, 3)
.doOnNext(i -> ("Hot Publisher Published: " + i))
.publish()
.autoConnect(2);// Automatically start when there are 2 subscribers
(i -> ("Subscriber A: " + i));
(i -> ("Subscriber B: " + i));
Output
Hot publisher Published: 1
Subscriber A: 1
Subscriber B: 1
Hot publisher Published: 2
Subscriber A: 2
Subscriber B: 2
Hot Publisher Published: 3
Subscriber A: 3
Subscriber B: 3
-
share()
(Simplified hot publisher)
Equivalent topublish().refCount(1)
: Started when the first subscriber arrives and terminated when the last unsubscribe is terminated.
Flux<Long> sharedFlux = ((1))
.doOnNext(i -> ("Hot Publisher Published: " + i))
.take(5)
.share();
(i -> ("Subscriber A: " + i));
(2500);
(i -> ("Subscriber B: " + i)); // Subscriber B misses the first 2 data
Output
Hot publisher Published: 0
Subscriber A: 0
Hot publisher Published: 1
Subscriber A: 1
Hot publisher Published: 2
Subscriber A: 2
Subscriber B: 2
Hot Publisher Published: 3
Subscriber A: 3
Subscriber B: 3
Hot Publisher Published: 4
Subscriber A: 4
Subscriber B: 4
-
replay()
(Historical data cache)
Allow new subscribers to consume historical data before subscription (cache policy can be configurable).
ConnectableFlux<Integer> replayFlux = (1, 3)
.doOnNext(i -> ("Hot Publisher Published: " + i))
.replay(2);// Cache the last 2 data
(i -> ("Subscriber A: " + i));
();
(1000);
(i -> ("Subscriber B: " + i)); // Subscriber B receives the last 2 data
Output:
Hot publisher Published: 1
Subscriber A: 1
Hot publisher Published: 2
Subscriber A: 2
Hot Publisher Published: 3
Subscriber A: 3
Subscriber B: 2
Subscriber B: 3
Cold publisher and hot publisher comparison table
characteristic | Cold publisher | Hot Publisher |
---|---|---|
Data generation timing | Generate when subscribed | Generate in advance (or byconnect() trigger) |
Subscriber independence | Each subscriber consumes complete data independently | Share the same data stream |
Resource consumption | High (independently generated by each subscriber) | Low (shared generation logic) |
Typical scenarios | Database query, static data | Real-time events, broadcasts |
2. A powerful operator ecosystem
2.1 Core Operator Classification
category | Operator example | Function description |
---|---|---|
Conversion operator |
buffer , map , flatMap , window
|
Modify the element structure or content in the stream (such as grouping, mapping, flattening) |
Filter operator |
filter , take , skip
|
Filter elements by condition (such as retaining elements that meet the conditions, skipping the first N items) |
Combination operators |
merge , concat , zip
|
Merge multiple streams (such as sequential joining, parallel merge, and element pairing) |
Conditional operator |
any , all , hasElement
|
Determine whether the element in the stream meets the condition (such as whether there are elements that meet the condition) |
Mathematical operators |
count , sum , reduce
|
Perform aggregate calculations on elements (such as counting totals, summing, and accumulating) |
Error handling operator |
onErrorReturn , onErrorResume
|
Provide alternative values or switch to alternate streams in the event of an exception (such as returning static values, dynamic recovery logic) |
Tool operators |
delay , timeout , log , subscribe
|
Control flow life cycle (such as delayed sending, timeout interrupts, logging, triggering subscriptions) |
The entire data source operation |
doOnNext ,, ,doOnRequest ,doOnSubscribe ,doOnComplete wait |
Among them, starting with doOn can operate on different states of the entire data link. |
2.2 Common operations (similar to Java Stream)
//Conversion operators, filtering operations, conditions and mathematical operators. Java-like Stream will not be described too much here.
//map
(1, 2, 3).map(i -> i + 1).subscribe(::println);
//filter
("a", "b", "c").filter(s -> ("a")).subscribe(::println);
// flatMap
("a", "b", "c").flatMap(s -> (())).subscribe(::println);
//reduce
(1, 2, 3).reduce(0, (a, b) -> a + b).subscribe(::println);
//Window window usage
(1, 2, 3, 4, 5, 6).window(3, 1).flatMap(e -> (0, Integer::sum)).subscribe(::println);
//Buffer backpressure or batch processing will cache data
(1, 2, 3, 4, 5, 6).buffer(3, 1).subscribe(::println);
2.3 Combination operators
- zip
The zip operator can combine multiple (up to 8) streams into one stream. The way to merge is to correspond the elements in the two streams one by one in order, and then combine the two elements into one element. If the lengths of the two streams are inconsistent, the length of the final combined stream is the length of the shorter flow in the two streams.
Flux<String> flux1 = ("a", "b", "c");
Flux<String> flux2 = ("d", "e", "f");
Flux<String> flux3 = ("1", "2", "3");
(flux1, flux2, flux3).subscribe(::println);
//Output
[a,d,1]
[b,e,2]
[c,f,3]
- merge
The merge operator can merge two streams into one stream by placing the elements in the two streams alternately into the merged stream. Run simultaneously and run successively according to time.
Flux<Integer> flux3 = (1, 2, 3).delayElements((80));
Flux<Integer> flux4 = (4, 5, 6).delayElements((50));
(flux4).subscribe(::println);
//The output is processed according to time, so the result is likely to be like this, and it may be slightly different.
4
1
5
2
6
3
- concat
The concat operator can merge two streams into one stream, by merging the elements in the two streams into the merged stream in order. Run it in order, and then run flux2 after flux1 is completed.
Flux<Integer> flux1 = (1, 2, 3).delayElements((80));
Flux<Integer> flux2 = (4, 5, 6).delayElements((50));
(flux2).subscribe(::println);
//Output
1
2
3
4
5
6
2.4 The entire data source operation
Project Reactor provides a large number of methods starting with doOn, which are used to insert side effect logic (such as logging, monitoring, or resource management) during the life cycle of a data stream, Not modifying the data stream itself, only for observing or triggering behavior。
The usage method of each method is roughly the same. Here is a simple example using doOnRequest and doOnNext.
(1, 2, 3, 4, 5, 6).doOnNext(s -> ("doOnNext: " + s)).subscribe();
("-----------------------------------------------------------------------------------------------------------------------------
(1, 2, 3).doOnRequest(s -> ("doOnRequest: " + s)).subscribe(::println);
//Output
doOnNext: 1
doOnNext: 2
doOnNext: 3
doOnNext: 4
doOnNext: 5
doOnNext: 6
----------------
doOnRequest: 9223372036854775807
1
2
3
Below are the usage scenarios and triggering opportunities for each method.
method | Trigger timing | Parameter Type | Typical scenarios |
---|---|---|---|
doOnSubscribe |
When subscribe | Consumer<Subscription> |
Resource Initialization |
doOnNext |
When element push | Consumer<T> |
Logging, status update |
doOnError |
When an error occurs | Consumer<Throwable> |
Error monitoring, alarm |
doOnComplete |
When the stream ends normally | Runnable |
Completion notification |
doOnRequest |
When requesting data downstream | Consumer<Long> |
Back pressure debugging, request volume monitoring |
doOnCancel |
When unsubscribe | Runnable |
Resource release |
doOnEach |
When all events occur | Consumer<Signal<T>> |
Unified event handling |
doOnTerminate |
Before the stream terminates (before completion/error) | Runnable |
Cleanup logic before termination |
doAfterTerminate |
After the stream terminates (after completion/error) | Runnable |
Statistics after termination |
doOnDiscard |
When an element is discarded | Consumer<T> |
Resource recovery, data consistency check |
3. Execution control: subscription and scheduling
3.1 Subscription mechanism
The subscribe operator is used to subscribe to elements in the stream. When elements in the stream are not subscribed, all operations will not be triggered. Only when elements in the stream are subscribed, all operations will be triggered. Through reading the above content, I believe you have already learned a rough idea of Project Reactor's publishing subscription model. There are many examples of subscriptions above, so I won't go into details here.
3.2 Scheduler Policy
Schedulers
is the core tool for managing threads and concurrent tasks, which controls the execution context of responsive streams. By rationally selecting the scheduler, you can optimize resource utilization, avoid blockage, and improve application performance
Scheduler | Threading model | Applicable scenarios | Things to note |
---|---|---|---|
immediate |
Current thread | Lightweight synchronization operation | Avoid blockage |
single |
Single threaded | Strictly executed in sequence | Avoid long-term blockage |
boundedElastic |
Dynamic thread pool | Blocking I/O operations | Control the maximum number of threads and queue capacity |
parallel |
Fixed-size thread pool | Computation-intensive parallel tasks | The number of threads is equal to the number of CPU cores by default |
fromExecutorService |
Custom thread pool | Integrate existing thread pools | Manage your life cycle |
3.3 Default Scheduler
In Project Reactor, it is very convenient to switch the thread scheduler used by publishOn and subscribeOn.
(1, 10)
.publishOn(()) //Switch scheduler
.log("publish thread:")
.flatMap(n -> (() -> n).subscribeOn(())) //Switch scheduler
.log("subscribe thread:")
.subscribe();
3.4 Custom virtual thread scheduler
Of course, in JDK17 and the changed version, you can also combine virtual threads to further increase the concurrency.
Scheduler customSchedule = (());
(1, 10)
.publishOn(customSchedule)
.log("publish thread:")
.flatMap(n -> (() -> n).subscribeOn(()))
.log("subscribe thread:")
.subscribe();
4. Advanced control components
4.1 The relationship between Processor and Sink
In Project Reactor,Processor
was once a key component, but as Reactor 3.4+ evolved, the official gradually marked it as Deprecated, and recommend the use of more modern Sink
API Alternative. The following are the reasons for deprecation and the core differences between the two.
1. Thread safety
-
processor:most
Processor
Implementation (such asDirectProcessor
、UnicastProcessor
)Non-thread safe, call directlyonNext
、onComplete
Methods such as this need to be manually synchronized. -
Sink: Atomic operation:
Sink
supplytryEmitNext
、tryEmitError
etc. to ensure the security of multi-threaded data push.
2. Role positioning
- Processor:At the same time
Publisher
andSubscriber
Although this design is flexible, it leads to unclear responsibilities and is easy to misuse. - Sink:As a pure producer only (only generate data streams)
3. Backpressure treatment
-
processor
There are large differences in support for backpressure:
-
DirectProcessor
Backpressure is completely ignored (unbounded queue). -
UnicastProcessor
Supports backpressure for single subscribers, but requires manual configuration of buffers.
-
-
Sink: Built-in configuration, through
onBackpressureBuffer
、onBackpressureError
The chain method directly defines the backpressure behavior.
4. Complex life cycle management
- processor: Need to be explicitly called
onComplete
oronError
End the stream, if omissioned, may cause resource leakage or subscriber hang. -
Sink:pass
tryEmitComplete
andtryEmitError
Clearly end the flow and avoid resource leakage.
5. API Design
- processor:
Processor
The API is not optimized for modern responsive programming modes (such as lack of built-in support for retry and replay). -
Sink:Flexible and simple, through
of
multicast()
、unicast()
orreplay()
Quickly configure multi-subscriber behavior.
4.2 API usage example
Since the processor has been deprecated, it is not recommended to use it, so I will not introduce it too much here.
-
1: Send a single data
<String> sink = (); Mono<String> mono = (); ( value -> ("Received: " + value), error -> ("Error: " + error), () -> ("Completed") ); ("Hello"); // equivalent to tryEmitNext + tryEmitComplete
-
2: Send multiple data
// Create multicast Sink and design buffered pressed strategies <String> sink = ().multicast().onBackpressureBuffer(); //Convert to flux Flux<String> hotFlux = ().map(String::toUpperCase); // Subscriber A (i -> ("Subscriber A: " + i)); // Subscriber B (i -> ("Subscriber B: " + i)); // Send data ("hello"); ("world"); ();
-
3: Support historical data
// Create a replay Sink, retaining the last 2 elements
<String> sink = ().replay().limit(2);
("A");
("B");
// Subscriber 1 (receive historical data A, B)
().subscribe(s -> ("Sub1: " + s));
// Push new data
("C");
// Subscriber 2 (receive historical data B, C)
().subscribe(s -> ("Sub2: " + s));
//Output
Sub1: A
Sub1: B
Sub1: C
Sub2: B
Sub2: C
4.2 Backpressure
4.2.1 Backpressure Strategy
1.onBackpressureBuffer
(Buffering policy)
- Behavior: Store unconsumed data in a buffer and send it when waiting for downstream requests.
- Configuration Options
- Buffer size: You can specify bounded or unbounded (the default is unbounded and should be used with caution).
- Overflow strategy
-
ERROR
: Throwed when the buffer is fullIllegalStateException
。 -
DROP_LATEST
: Discard new data and retain old data. -
DROP_OLDEST
: Discard the oldest data and keep the new data.
-
- Applicable scenarios: Allow short-term speed mismatch, but memory usage needs to be controlled.
2. onBackpressureError
(Error policy)
- Behavior: When the buffer is full or downstream is not requested, Throw an error immediately(
IllegalStateException
)。 - Applicable scenarios: Strictly require real-time, tolerate data loss but requires rapid failure.
3. directBestEffort
(Try your best strategy)
- Behavior: No buffer, push data directly to the downstream. If the downstream does not request,Silently discard new data。
- Features: Avoid memory usage, but may lead to data loss.
- Applicable scenarios: Real-time event processing (such as logs, metric collection), allowing occasional loss.
4. replay
(Replay strategy)
-
Behavior
New subscribers replay historical data
-
It also supports real-time data push.
-
Can configure the buffer size for replay (such as keeping the most recent ones
N
element).
-
-
Backpressure treatment
- For new subscribers: Follow backpressure requests when replaying historical data.
- For real-time data:
onBackpressureBuffer
ordirectBestEffort
Strategy.
-
Applicable scenarios: Scenarios where new subscribers need to obtain historical data.
4.2.2. Default policy
-
multicast()
: Used by defaultdirectBestEffort
(No buffer). -
unicast()
: Used by defaultonBackpressureBuffer
(Unbounded buffer). -
replay()
: All historical data is retained by default (unbounded buffer).
5. Hooks and Context
5.1 Hooks
In Project Reactor,Hooks is a set of global callback mechanisms that allow the default behavior of the Reactor library Customized extension, used to debug, monitor or modify the execution logic of responsive streams.
1. The core uses of Hooks
- Global Error Handling: Catch exceptions that have not been processed downstream.
- Operator life cycle monitoring: Insert custom logic before and after operator execution.
- Debugging and tracking: Enhance stack trace information and locate asynchronous flow problems.
- Behavior modification: Implementation of dynamic replacement or wrapping operators.
2. Commonly used Hooks and functions
1. onOperatorError
-
effect: Capture the thrown during the execution of the operatorUnhandled exception。
-
Typical scenarios: Unified logging and conversion error types.
((error, context) -> { ("Global catch exception: " + error); return error; });
2. onNextDropped
-
effect: The treatment is caused by downstream unsubscribe, back pressure overflow, etc.Discarded
onNext
element。 -
Typical scenarios: Record missing data for audit or compensation.
(item -> ("Element is discarded: " + item) );
3. onErrorDropped
-
effect: The processing has been terminated due to downstream (if called
onComplete
) and was DiscardedonError
Signal。 -
Typical scenarios: Avoid silence ignoring errors.
(error -> ("Error discarded: " + error) );
4. onOperatorDebug
-
effect: EnableDebug mode, generate enhanced stack trace information (including subscription point location) for the asynchronous operator.
-
cost: Increase performance overhead,Only for development environment use。
(); // Enable debug mode
5. onEachOperator
/ onLastOperator
-
effectexistBefore and after each operator is executedInsert custom logic (such as logs, metric collection).
-
Typical scenarios: Performance monitoring, dynamic modification of data flow.
(operator -> { long start = (); return original -> (signal -> ("operator time-consuming: " + (() - start) + "ms") ); });
6. Reset Hooks
-
Restore default behavior
javaCopy (); (); ();
4.4 Context
In Project Reactor,Context
is used to pass between stages of a responsive streamContext data’s core mechanism. It solves traditionThreadLocal
Limitations in asynchronous, multithreaded environments allow data to be safely passed in operator chains. The following isContext
The detailed analysis of the design ideas, API usage and typical scenarios.
1. Why do you need a Context?
- question: In an asynchronous responsive stream, data may be processed by different threads.
ThreadLocal
Cannot pass across threads. - Solution:
Context
Provides an immutable key-value store bound to the subscription chain, ensuring that context data is securely accessible throughout the life of the stream.
2. Characteristics of Context
- Immutability: Each modification will generate a new instance to ensure thread safety.
- Subscription Chain Binding: Data is passed along with the subscription chain, not on threads (It should be noted that the Context is passed from the bottom to the top)。
- Key-value storagesimilar
Map
Structure, support type-safe keys (ContextKey
)。 -
From bottom to up (Downstream → Upstream)
- Write order: The call afterward
contextWrite
Will overwrite the first call. - Read order: Downstream (near the subscription point)
Context
Priority to access.
- Write order: The call afterward
passcontextWrite
The operator willContext
Write to the responsive stream, throughdeferContextual
Read Context in stream
//Note that since the ontext is passed from the bottom to the top, it must be written in the bottom (point A) before it can be read at (point B)
("A", "B", "C", "D")
//Record as point B. Splice the values in Context
.flatMap(s -> {
("ssss:" + s);
return (ctx -> (s + ("suffix")));
}
)
//Remember as point A and write to Context (key: must be called before the read operation)
.contextWrite(("suffix", "-ctx"))
// Subscribe to output results
.subscribe(::println);
Example of Context propagation from bottom up (Downstream → Upstream)
Due to the bottom-up propagation characteristic of Context, the Context isPoint BThe value ofPoint AValue of
("A", "B", "C", "D")
// Splice the values in the Context
.flatMap(s -> {
//Because ctx222 will overwrite ctx111, the splicing here is ctx222.
("ssss:" + s);
return (ctx -> (s + ("suffix")));
}
)
//Note as point B, writing to Context ctx222 will overwrite ctx111
.contextWrite(("suffix", "-ctx222"))
//Remember as point A and write to Context
.contextWrite(("suffix", "-ctx111"))
// Subscribe to output results
.subscribe(::println);
//Output
ssss:A
A-ctx222
ssss:B
B-ctx222
ssss:C
C-ctx222
ssss:D
D-ctx222
Conclusion
By deeply understanding the core concepts of Project Reactor, we can better master the responsive programming paradigm and build a more efficient and resilient distributed system. The combination with modern virtual threads provides a better solution for building a new generation of high concurrent applications. By rationally selecting scheduling strategies and optimizing thread models, we can keep the code concise while giving full play to the hardware performance.
The road is long and arduous, and I will search up and down