Location>code7788 >text

Project Reactor for Responsive Programming

Popularity:340 ℃/2025-03-24 16:19:00

Project ReactorAs the core implementation framework of the responsive programming paradigm,Strictly follow the Reactive Streams specification system, its architectural design completely includes four core components defined by the specification: Publisher (data source), Subscriber (subscriber), Subscription (subscription relationship) and Processor (processor node). In this framework,FluxandMonoNot only does it realize the standard semantics of the Publisher interface, but it also builds a complete responsive data stream processing paradigm: establish production-consumption channels through subscription relationships, realize non-blocking data push based on the event-driven mechanism, and ensure the system's elastic communication through the backpressure protocol.

Basic Process

Understanding the working principle of Project Reactor as a whole can help us more clearly grasp the various concepts and operations in it and avoid losing our direction. In fact, from the overall perspective, the entire Reactor is based on the subscription-release model. Flux and Mono are the default Publishers in the system, simplifying our work on customizing Publishers. Flux and Mono integrate a large number of operators, and the existence of these operators reduces our need to customize Subscriber and Processor. With these combinations of operators, we can operate directly on data sources and elements without having to write additional Processors and Subscribers ourselves. Unless in special circumstances, it is not recommended to actively customize Subscriber and Processor.

Create data source (Flux, Mono) -> Convert and process data (map, filter...) -> subscribe to data source

1. Responsive data source:

1.1 Flux and Mono

As the core publisher of Project Reactor, the main differences between Flux and Mono are as follows:

  • Flux represents an asynchronous sequence of 0-N elements

  • Mono represents asynchronous operation of 0-1 result

// Create Flux
 ("1", "2", "3").subscribe(::println);

 // Create Mono
 ("a").subscribe(::println);

1.2 Data source type

After understanding Flux and Mono, we learned how to simply create data sources. Flux and Mono also provide us with a lot of ways to create data sources, which are roughly divided into the following categories.

  1. ‌Empty data source: Used to represent a completion signal without data (such as the result of a deletion operation).
  2. ‌Dynamic Generation:and/Allows manual control of element emission (synchronous or asynchronous).
  3. ‌Async data source: fromFutureCallableorSupplierGet data in it and supports non-blocking operations.
  4. ‌Time Driven:Delayed transmission,Periodic emission incremental value.
  5. ‌Merge/Combination:zipStrictly align elements,mergeUnorderly merged,concatSequentially connected.
  6. Backpressure adaptation: ByFluxSinkorMonoSinkManually control backpressure and element emission.

Classification summary of how Mono and Flux data source creation

category describe Mono method example Flux method example
Empty data source Creates a data stream that does not emit any elements. () ()
Single element Emit a single static value or object. (T) (T...)
Multiple elements Emit multiple static values ​​or objects (onlyFluxsupport). N/A (T1, T2...)
Collection/array Generate elements from a collection or array. N/A (List<T>) (T[])
Stream From JavaStreamGenerate elements. N/A (Stream<T>)
Dynamic generation Generate elements dynamically through generator functions. (sink -> {...}) (sink -> {...}) (sink -> {...})
Asynchronous data source From asynchronous operation (e.g.FutureCallable) Get data. (Future) (Callable) (Publisher) (Supplier<Stream>)
Error signal Directly transmit the error signal. (Throwable) (Throwable)
Delay initialization Lazy data generation (logic execution is performed when subscribed). (() -> ...) (Supplier) (() -> ...) (Supplier<Stream>)
Time-driven Generate data based on time (such as timing, delay). (Duration) (Duration)
Merge/combination Merge multiple data sources. (Mono1, Mono2...) (Flux1, Flux2...) (Flux1, Flux2...) (Flux1, Flux2...)
Back pressure adaptation Adapt external backpressure mechanism (e.g.SinkManual control). (MonoSink) (FluxSink)
Conditional trigger Generate data according to conditions (e.g.firsttakeUntil)。 (Mono1, Mono2) (Publisher...) (Predicate)

1.3 Data source publishing model

The publishing model of Project Reactor is the core mechanism of its responsive programming, mainly divided intoCold PublisherandHot Publisher‌. Their differences lie in the way data streams are generated, shared, and subscribers' consumption behavior. The following is a detailed explanation:

1.3.1. Cold Publisher

definition‌: Cold publishers generate for each subscriber‌Independent data flow‌. Each subscriber triggers the complete generation process of the data source, even if other subscribers have subscribed.

Features‌:

  1. Data flow independent‌: Each subscriber consumes data from scratch.
  2. Delayed generation‌: Data is generated only when subscribed (lazy calculation).
  3. Resource Isolation‌: The data generation logic of different subscribers does not affect each other.

Applicable scenarios‌:

  • Each subscriber needs to obtain complete data (such as HTTP requests, database queries).

  • The generation cost of data sources is high, but subscribers need to be independent.

    Code Example

    // Create a cold publisher
     Flux<Integer> coldFlux = (1, 3).doOnNext(i -> ("Cold Publisher Published: " + i));
     // The first subscriber
     (i -> ("Subscriber 1: " + i));
     // The second subscriber
     (i -> ("Subscriber 2: " + i));

    Output

    Cold publisher sends: 1
     Subscribers 1: 1
     Cold publisher sends: 2
     Subscribers 1: 2
     Cold publisher sends: 1
     Subscriber 2: 1
     Cold publisher sends: 2
     Subscriber 2: 2

1.3.2. Hot Publisher

  • definition‌: Hot publishers share oneUnified data flow‌, all subscribers consume the same data. The generation of data sources is not related to the subscriber's subscription time, and the subscriber who subscribes later may miss out on early data.

    Features‌:

    1. Data flow sharing‌: All subscribers receive the same data source.
    2. Real-time‌: The generation of data sources is independent of subscription behavior.
    3. Resource reuse‌: Multiple subscribers share the same data generation logic.

    Applicable scenarios‌:

    • Real-time event push (such as sensor data, stock quotations).
    • Data is required to avoid repeated generation of high-cost operations (such as WebSocket messages).

There are several ways to implement hot publishers:

  1. ConnectableFlux (manual control)

passpublish()Method willFluxConvert toConnectableFlux, need to be called manuallyconnect()Start the data flow.

Code Example

// Create ConnectableFlux and convert to hot publisher
         ConnectableFlux<Integer> hotFlux = (1, 3)
                 .doOnNext(i -> ("Hot Publisher Published: " + i))
                 .publish(); // Convert to ConnectableFlux
         // Subscriber A
         (i -> ("Subscriber A: " + i));
         // Subscriber B
         (i -> ("Subscriber B: " + i));
         // Manually trigger the start of data flow
         ();

Output

Hot publisher Published: 1
 Subscriber A: 1
 Subscriber B: 1
 Hot publisher Published: 2
 Subscriber A: 2
 Subscriber B: 2
 Hot Publisher Published: 3
 Subscriber A: 3
 Subscriber B: 3
  1. autoConnect()‌(Automatic connection)

The data flow is automatically started when the specified number of subscribers is reached.

Flux<Integer> autoFlux = (1, 3)
                 .doOnNext(i -> ("Hot Publisher Published: " + i))
                 .publish()
                 .autoConnect(2);// Automatically start when there are 2 subscribers
         (i -> ("Subscriber A: " + i));
         (i -> ("Subscriber B: " + i));

Output

Hot publisher Published: 1
 Subscriber A: 1
 Subscriber B: 1
 Hot publisher Published: 2
 Subscriber A: 2
 Subscriber B: 2
 Hot Publisher Published: 3
 Subscriber A: 3
 Subscriber B: 3
  1. share()‌ (Simplified hot publisher)

Equivalent topublish().refCount(1): Started when the first subscriber arrives and terminated when the last unsubscribe is terminated.

Flux<Long> sharedFlux = ((1))
                 .doOnNext(i -> ("Hot Publisher Published: " + i))
                 .take(5)
                 .share();
         (i -> ("Subscriber A: " + i));
         (2500);
         (i -> ("Subscriber B: " + i)); // Subscriber B misses the first 2 data

Output

Hot publisher Published: 0
 Subscriber A: 0
 Hot publisher Published: 1
 Subscriber A: 1
 Hot publisher Published: 2
 Subscriber A: 2
 Subscriber B: 2
 Hot Publisher Published: 3
 Subscriber A: 3
 Subscriber B: 3
 Hot Publisher Published: 4
 Subscriber A: 4
 Subscriber B: 4

  1. replay()‌(Historical data cache)

Allow new subscribers to consume historical data before subscription (cache policy can be configurable).

ConnectableFlux<Integer> replayFlux = (1, 3)
                 .doOnNext(i -> ("Hot Publisher Published: " + i))
                 .replay(2);// Cache the last 2 data

         (i -> ("Subscriber A: " + i));
         ();
         (1000);
         (i -> ("Subscriber B: " + i)); // Subscriber B receives the last 2 data

Output‌:

Hot publisher Published: 1
 Subscriber A: 1
 Hot publisher Published: 2
 Subscriber A: 2
 Hot Publisher Published: 3
 Subscriber A: 3
 Subscriber B: 2
 Subscriber B: 3

Cold publisher and hot publisher comparison table

characteristic Cold publisher Hot Publisher
Data generation timing Generate when subscribed Generate in advance (or byconnect()trigger)
Subscriber independence Each subscriber consumes complete data independently Share the same data stream
Resource consumption High (independently generated by each subscriber) Low (shared generation logic)
Typical scenarios Database query, static data Real-time events, broadcasts

2. A powerful operator ecosystem

2.1 Core Operator Classification

category Operator example Function description
Conversion operator buffer, map, flatMap, window Modify the element structure or content in the stream (such as grouping, mapping, flattening)
Filter operator filter, take, skip Filter elements by condition (such as retaining elements that meet the conditions, skipping the first N items)
Combination operators merge, concat, zip Merge multiple streams (such as sequential joining, parallel merge, and element pairing)
Conditional operator any, all, hasElement Determine whether the element in the stream meets the condition (such as whether there are elements that meet the condition)
Mathematical operators count, sum, reduce Perform aggregate calculations on elements (such as counting totals, summing, and accumulating)
Error handling operator onErrorReturn, onErrorResume Provide alternative values ​​or switch to alternate streams in the event of an exception (such as returning static values, dynamic recovery logic)
Tool operators delay, timeout, log, subscribe Control flow life cycle (such as delayed sending, timeout interrupts, logging, triggering subscriptions)
The entire data source operation doOnNext,,,doOnRequest,doOnSubscribe,doOnCompletewait Among them, starting with doOn can operate on different states of the entire data link.

2.2 Common operations (similar to Java Stream)

//Conversion operators, filtering operations, conditions and mathematical operators. Java-like Stream will not be described too much here.
 //map
 (1, 2, 3).map(i -> i + 1).subscribe(::println);
 //filter
 ("a", "b", "c").filter(s -> ("a")).subscribe(::println);
 // flatMap
 ("a", "b", "c").flatMap(s -> (())).subscribe(::println);
 //reduce
 (1, 2, 3).reduce(0, (a, b) -> a + b).subscribe(::println);
 //Window window usage
 (1, 2, 3, 4, 5, 6).window(3, 1).flatMap(e -> (0, Integer::sum)).subscribe(::println);
 //Buffer backpressure or batch processing will cache data
 (1, 2, 3, 4, 5, 6).buffer(3, 1).subscribe(::println);

2.3 Combination operators

  • zip

The zip operator can combine multiple (up to 8) streams into one stream. The way to merge is to correspond the elements in the two streams one by one in order, and then combine the two elements into one element. If the lengths of the two streams are inconsistent, the length of the final combined stream is the length of the shorter flow in the two streams.

Flux<String> flux1 = ("a", "b", "c");
 Flux<String> flux2 = ("d", "e", "f");
 Flux<String> flux3 = ("1", "2", "3");
 (flux1, flux2, flux3).subscribe(::println);

 //Output
 [a,d,1]
 [b,e,2]
 [c,f,3]
  • merge

The merge operator can merge two streams into one stream by placing the elements in the two streams alternately into the merged stream. Run simultaneously and run successively according to time.

Flux<Integer> flux3 = (1, 2, 3).delayElements((80));
 Flux<Integer> flux4 = (4, 5, 6).delayElements((50));
 (flux4).subscribe(::println);

 //The output is processed according to time, so the result is likely to be like this, and it may be slightly different.
 4
 1
 5
 2
 6
 3
  • concat

The concat operator can merge two streams into one stream, by merging the elements in the two streams into the merged stream in order. Run it in order, and then run flux2 after flux1 is completed.

Flux<Integer> flux1 = (1, 2, 3).delayElements((80));
 Flux<Integer> flux2 = (4, 5, 6).delayElements((50));
 (flux2).subscribe(::println);
 //Output
 1
 2
 3
 4
 5
 6

2.4 The entire data source operation

Project Reactor provides a large number of methods starting with doOn, which are used to insert side effect logic (such as logging, monitoring, or resource management) during the life cycle of a data stream, ‌Not modifying the data stream itself, only for observing or triggering behavior‌。

The usage method of each method is roughly the same. Here is a simple example using doOnRequest and doOnNext.

(1, 2, 3, 4, 5, 6).doOnNext(s -> ("doOnNext: " + s)).subscribe();
 ("-----------------------------------------------------------------------------------------------------------------------------
 (1, 2, 3).doOnRequest(s -> ("doOnRequest: " + s)).subscribe(::println);
 //Output
 doOnNext: 1
 doOnNext: 2
 doOnNext: 3
 doOnNext: 4
 doOnNext: 5
 doOnNext: 6
 ----------------
 doOnRequest: 9223372036854775807
 1
 2
 3

Below are the usage scenarios and triggering opportunities for each method.

method Trigger timing Parameter Type Typical scenarios
doOnSubscribe When subscribe Consumer<Subscription> Resource Initialization
doOnNext When element push Consumer<T> Logging, status update
doOnError When an error occurs Consumer<Throwable> Error monitoring, alarm
doOnComplete When the stream ends normally Runnable Completion notification
doOnRequest When requesting data downstream Consumer<Long> Back pressure debugging, request volume monitoring
doOnCancel When unsubscribe Runnable Resource release
doOnEach When all events occur Consumer<Signal<T>> Unified event handling
doOnTerminate Before the stream terminates (before completion/error) Runnable Cleanup logic before termination
doAfterTerminate After the stream terminates (after completion/error) Runnable Statistics after termination
doOnDiscard When an element is discarded Consumer<T> Resource recovery, data consistency check

3. Execution control: subscription and scheduling

3.1 Subscription mechanism

The subscribe operator is used to subscribe to elements in the stream. When elements in the stream are not subscribed, all operations will not be triggered. Only when elements in the stream are subscribed, all operations will be triggered. Through reading the above content, I believe you have already learned a rough idea of ​​Project Reactor's publishing subscription model. There are many examples of subscriptions above, so I won't go into details here.

3.2 Scheduler Policy

Schedulers‌ is the core tool for managing threads and concurrent tasks, which controls the execution context of responsive streams. By rationally selecting the scheduler, you can optimize resource utilization, avoid blockage, and improve application performance

Scheduler Threading model Applicable scenarios Things to note
immediate Current thread Lightweight synchronization operation Avoid blockage
single Single threaded Strictly executed in sequence Avoid long-term blockage
boundedElastic Dynamic thread pool Blocking I/O operations Control the maximum number of threads and queue capacity
parallel Fixed-size thread pool Computation-intensive parallel tasks The number of threads is equal to the number of CPU cores by default
fromExecutorService Custom thread pool Integrate existing thread pools Manage your life cycle

3.3 Default Scheduler

In Project Reactor, it is very convenient to switch the thread scheduler used by publishOn and subscribeOn.

(1, 10)
         .publishOn(()) //Switch scheduler
         .log("publish thread:")
         .flatMap(n -> (() -> n).subscribeOn(())) //Switch scheduler
         .log("subscribe thread:")
         .subscribe();

3.4 Custom virtual thread scheduler

Of course, in JDK17 and the changed version, you can also combine virtual threads to further increase the concurrency.

Scheduler customSchedule = (());
(1, 10)
        .publishOn(customSchedule)
        .log("publish thread:")
        .flatMap(n -> (() -> n).subscribeOn(()))
        .log("subscribe thread:")
        .subscribe();

4. Advanced control components

4.1 The relationship between Processor and Sink

In Project Reactor,Processor‌ was once a key component, but as Reactor 3.4+ evolved, the official gradually marked it as ‌Deprecated‌, and recommend the use of more modern ‌Sink API‌ Alternative. The following are the reasons for deprecation and the core differences between the two.

1. ‌Thread safety

  • processor:mostProcessorImplementation (such asDirectProcessorUnicastProcessor)‌Non-thread safe‌, call directlyonNextonCompleteMethods such as this need to be manually synchronized.
  • Sink: Atomic operation‌:SinksupplytryEmitNexttryEmitErroretc. to ensure the security of multi-threaded data push.

2. ‌Role positioning

  • Processor:At the same timePublisherandSubscriberAlthough this design is flexible, it leads to unclear responsibilities and is easy to misuse.
  • Sink:As a pure producer only (only generate data streams)

3. ‌Backpressure treatment

  • processor

    There are large differences in support for backpressure:

    • DirectProcessorBackpressure is completely ignored (unbounded queue).
    • UnicastProcessorSupports backpressure for single subscribers, but requires manual configuration of buffers.
  • Sink‌: Built-in configuration, throughonBackpressureBufferonBackpressureErrorThe chain method directly defines the backpressure behavior.

4. ‌Complex life cycle management

  • processor‌: Need to be explicitly calledonCompleteoronErrorEnd the stream, if omissioned, may cause resource leakage or subscriber hang.
  • Sink:passtryEmitCompleteandtryEmitErrorClearly end the flow and avoid resource leakage.

5. ‌API Design

  • processor‌:ProcessorThe API is not optimized for modern responsive programming modes (such as lack of built-in support for retry and replay).
  • Sink:Flexible and simple, throughofmulticast()unicast()orreplay()Quickly configure multi-subscriber behavior.

4.2 API usage example

​ Since the processor has been deprecated, it is not recommended to use it, so I will not introduce it too much here.

  • 1: Send a single data

    <String> sink = ();
     Mono<String> mono = ();
     (
             value -> ("Received: " + value),
             error -> ("Error: " + error),
             () -> ("Completed")
     );
     ("Hello"); // equivalent to tryEmitNext + tryEmitComplete
  • 2: Send multiple data

    // Create multicast Sink and design buffered pressed strategies
     <String> sink = ().multicast().onBackpressureBuffer();
     //Convert to flux
     Flux<String> hotFlux = ().map(String::toUpperCase);
    
     // Subscriber A
     (i -> ("Subscriber A: " + i));
    
     // Subscriber B
     (i -> ("Subscriber B: " + i));
    
     // Send data
     ("hello");
     ("world");
     ();
  • 3: Support historical data

// Create a replay Sink, retaining the last 2 elements
 <String> sink = ().replay().limit(2);

 ("A");
 ("B");

 // Subscriber 1 (receive historical data A, B)
 ().subscribe(s -> ("Sub1: " + s));

 // Push new data
 ("C");

 // Subscriber 2 (receive historical data B, C)
 ().subscribe(s -> ("Sub2: " + s));

 //Output
 Sub1: A
 Sub1: B
 Sub1: C
 Sub2: B
 Sub2: C

4.2 Backpressure

4.2.1 Backpressure Strategy

1.onBackpressureBuffer(Buffering policy)

  • Behavior‌: Store unconsumed data in a buffer and send it when waiting for downstream requests.
  • Configuration Options
    • Buffer size‌: You can specify bounded or unbounded (the default is unbounded and should be used with caution).
    • Overflow strategy
      • ERROR: Throwed when the buffer is fullIllegalStateException
      • DROP_LATEST: Discard new data and retain old data.
      • DROP_OLDEST: Discard the oldest data and keep the new data.
  • Applicable scenarios‌: Allow short-term speed mismatch, but memory usage needs to be controlled.

2. onBackpressureError(Error policy)

  • Behavior‌: When the buffer is full or downstream is not requested, ‌Throw an error immediately‌(IllegalStateException)。
  • Applicable scenarios‌: Strictly require real-time, tolerate data loss but requires rapid failure.

3. directBestEffort(Try your best strategy)

  • Behavior‌: No buffer, push data directly to the downstream. If the downstream does not request,Silently discard new data‌。
  • Features‌: Avoid memory usage, but may lead to data loss.
  • Applicable scenarios‌: Real-time event processing (such as logs, metric collection), allowing occasional loss.

4. replay(Replay strategy)

  • Behavior

    New subscribers replay historical data

    • It also supports real-time data push.

    • Can configure the buffer size for replay (such as keeping the most recent onesNelement).

  • Backpressure treatment

    • For new subscribers: Follow backpressure requests when replaying historical data.
    • For real-time data:onBackpressureBufferordirectBestEffortStrategy.
  • Applicable scenarios‌: Scenarios where new subscribers need to obtain historical data.

4.2.2. Default policy

  • multicast()‌: Used by defaultdirectBestEffort(No buffer).
  • unicast()‌: Used by defaultonBackpressureBuffer(Unbounded buffer).
  • replay()‌: All historical data is retained by default (unbounded buffer).

5. Hooks and Context

5.1 Hooks

In Project Reactor,Hooks‌ is a set of global callback mechanisms that allow the default behavior of the Reactor library ‌Customized extension‌, used to debug, monitor or modify the execution logic of responsive streams.

1. The core uses of Hooks

  1. Global Error Handling‌: Catch exceptions that have not been processed downstream.
  2. Operator life cycle monitoring‌: Insert custom logic before and after operator execution.
  3. Debugging and tracking‌: Enhance stack trace information and locate asynchronous flow problems.
  4. Behavior modification‌: Implementation of dynamic replacement or wrapping operators.

2. Commonly used Hooks and functions

1. onOperatorError

  • effect‌: Capture the ‌ thrown during the execution of the operatorUnhandled exception‌。

  • Typical scenarios‌: Unified logging and conversion error types.

    ((error, context) -> {
         ("Global catch exception: " + error);
         return error;
     });

2. onNextDropped

  • effect‌: The treatment is caused by downstream unsubscribe, back pressure overflow, etc.DiscardedonNextelement‌。

  • Typical scenarios‌: Record missing data for audit or compensation.

    (item ->
         ("Element is discarded: " + item)
     );

3. onErrorDropped

  • effect‌: The processing has been terminated due to downstream (if calledonComplete) and was ‌DiscardedonErrorSignal‌。

  • Typical scenarios‌: Avoid silence ignoring errors.

    (error ->
         ("Error discarded: " + error)
     );

4. onOperatorDebug

  • effect‌: EnableDebug mode‌, generate enhanced stack trace information (including subscription point location) for the asynchronous operator.

  • cost‌: Increase performance overhead,Only for development environment use‌。

    (); // Enable debug mode

5. onEachOperator / onLastOperator

  • effectexistBefore and after each operator is executed‌Insert custom logic (such as logs, metric collection).

  • Typical scenarios‌: Performance monitoring, dynamic modification of data flow.

    (operator -> {
         long start = ();
         return original -> (signal ->
             ("operator time-consuming: " + (() - start) + "ms")
         );
     });

6. Reset Hooks

  • Restore default behavior

    javaCopy ();
    ();
    ();
    

4.4 Context

In Project Reactor,Context‌ is used to pass between stages of a responsive stream‌Context data‌’s core mechanism. It solves traditionThreadLocalLimitations in asynchronous, multithreaded environments allow data to be safely passed in operator chains. The following isContextThe detailed analysis of the design ideas, API usage and typical scenarios.

1. Why do you need a Context?
  • question‌: In an asynchronous responsive stream, data may be processed by different threads.ThreadLocalCannot pass across threads.
  • Solution‌:ContextProvides an immutable key-value store bound to the subscription chain, ensuring that context data is securely accessible throughout the life of the stream.
2. Characteristics of Context
  • Immutability‌: Each modification will generate a new instance to ensure thread safety.
  • Subscription Chain Binding‌: Data is passed along with the subscription chain, not on threads (It should be noted that the Context is passed from the bottom to the top)。
  • Key-value storagesimilarMapStructure, support type-safe keys (ContextKey)。
  • From bottom to up (Downstream → Upstream)
    • Write order‌: The call afterwardcontextWriteWill overwrite the first call.
    • Read order‌: Downstream (near the subscription point)ContextPriority to access.

passcontextWriteThe operator willContextWrite to the responsive stream, throughdeferContextualRead Context in stream

//Note that since the ontext is passed from the bottom to the top, it must be written in the bottom (point A) before it can be read at (point B)
 ("A", "B", "C", "D")
         //Record as point B. Splice the values ​​in Context
         .flatMap(s -> {
                     ("ssss:" + s);
                     return (ctx -> (s + ("suffix")));
                 }
         )
         //Remember as point A and write to Context (key: must be called before the read operation)
         .contextWrite(("suffix", "-ctx"))
         // Subscribe to output results
         .subscribe(::println);

Example of Context propagation from bottom up (Downstream → Upstream)

Due to the bottom-up propagation characteristic of Context, the Context isPoint BThe value ofPoint AValue of

("A", "B", "C", "D")
         // Splice the values ​​in the Context
         .flatMap(s -> {
                     //Because ctx222 will overwrite ctx111, the splicing here is ctx222.
                     ("ssss:" + s);
                     return (ctx -> (s + ("suffix")));
                 }
         )
         //Note as point B, writing to Context ctx222 will overwrite ctx111
         .contextWrite(("suffix", "-ctx222"))
         //Remember as point A and write to Context
         .contextWrite(("suffix", "-ctx111"))
         // Subscribe to output results
         .subscribe(::println);



 //Output
 ssss:A
 A-ctx222
 ssss:B
 B-ctx222
 ssss:C
 C-ctx222
 ssss:D
 D-ctx222

Conclusion

By deeply understanding the core concepts of Project Reactor, we can better master the responsive programming paradigm and build a more efficient and resilient distributed system. The combination with modern virtual threads provides a better solution for building a new generation of high concurrent applications. By rationally selecting scheduling strategies and optimizing thread models, we can keep the code concise while giving full play to the hardware performance.

The road is long and arduous, and I will search up and down