Location>code7788 >text

Netty source code—thread model two

Popularity:854 ℃/2025-03-21 23:10:54

Outline

Overall execution framework

A thread performs an event poll

Thread processing channel that generates IO events

Thread processing task queue addition task

Thread processing task execution tasks in task queue

Summarize

 

Overall execution framework

(1) Three things Reactor thread does

(2) The task will be executed as long as the IO event takes

(3) Execution process of() method

 

(1) Three things Reactor thread does

There is an infinite for loop in the run() method of NioEventLoop, and the for loop is three things that the Reactor thread needs to do.

 

1. First, call the select() method for an event poll

Since a NioEventLoop corresponds to a Selector, the select() method is to poll all Channel IO events registered to the Selector corresponding to this Reactor thread. Note that there is also an infinite for loop in the select() method, but this infinite for loop may be interrupted by certain conditions.

 

2. Then call the processSelectedKeys() method to handle the polled IO events

 

3. Finally, call the runAllTasks() method to handle the task placed by external threads into TaskQueue.

//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
 public final class NioEventLoop extends SingleThreadEventLoop {
     private volatile int ioRatio = 50;
     ...
     @Override
     protected void run() {
         for (;;) {
             ...
             //1. Call the select() method to execute an event poll
             select((false));
             if (()) {
                 ();
             }
             ...
             //2. Handle Channel that generates IO events
             processSelectedKeys();
             ...
             //3. Execute the task of putting the external thread into the TaskQueue
             runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
         }
     }

     private void select(boolean oldWakenUp) throws IOException {
         for(;;) {
             //1. The deadline of the timing task is coming soon, and this polling is interrupted
             //2. During the polling process, it was found that a task was added, and the polling was interrupted.
             //3. Blocking select operation: (timeoutMills)
             //4. Avoid JDK null polling bug
         }
     }
     ...
 }

(2) The task will be executed as long as the IO event takes

In the run() method of NioEventLoop, there is an ioRatio default to 50, which means the time to process IO events and the time to execute tasks are 1:1. That is, after the processSelectedKeys() method is executed for how long, the runAllTasks() method is executed for how long.

//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
 public final class NioEventLoop extends SingleThreadEventLoop {
     private volatile int ioRatio = 50;
     ...
     @Override
     protected void run() {
         for (;;) {
             ...
             //1. Call the select() method to execute an event poll
             select((false));
             if (()) {
                 ();
             }
             ...
             final int ioRatio = ;
             if (ioRatio == 100) {
                 try {
                     processSelectedKeys();
                 } finally {
                     // Ensure we always run tasks.
                     runAllTasks();
                 }
             } else {
                 final long ioStartTime = ();
                 try {
                     processSelectedKeys();
                 } finally {
                     // Ensure we always run tasks.
                     final long ioTime = () - ioStartTime;
                     runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                 }
             }
             ...
         }
     }
     ...
 }

(3) Execution process of() method

() -> for(;;)
   select() //Execute an event poll to check whether there are IO events
   processSelectedKeys() //Processing Channel that generates IO events
   runAllTasks() //handle asynchronous task queue
 //These 3 steps should be processed in one thread to save threads, because there will not always be IO events and asynchronous tasks.

 

A thread performs an event poll

(1) Set the wakeUp variable before executing the select operation

(2) If the timing task is about to start, this poll will be interrupted

(3) If a task is found to be added during the polling, the poll will be interrupted.

(4) Perform blocking select operation

(5) Avoid JDK's empty polling bug

(6) Summary of executing an event poll

 

(1) Set the wakeUp variable before executing the select operation

NioEventLoop has a wakenUp member variable indicating whether the blocking select operation should be awakened. Before the NioEventLoop's run() method is ready to execute the select() method to perform a new loop logic, wakenUp will be set to false to mark the beginning of a new loop.

//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
 public final class NioEventLoop extends SingleThreadEventLoop {
     //Boolean that controls determines if a blocked should break out of its selection process.
     //In our case we use a timeout for the select method and the select method will block for that time unless waken up.
     private final AtomicBoolean wakenUp = new AtomicBoolean();
     ...
     @Override
     protected void run() {
         for (;;) {
             ...
             //1. Call the select() method to execute an event poll
             select((false));
             if (()) {
                 ();
             }
             ...
         }
     }
     ...
 }

The following is the execution logic of NioEventLoop's select() method, which is Netty's 4-section logic about event loops.

//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
 public final class NioEventLoop extends SingleThreadEventLoop {
     Selector selector;
     ...
     private void select(boolean oldWakenUp) throws IOException {
         Selector selector = ;
         for(;;) {
             //1. The deadline of the timing task is coming soon, and this polling is interrupted
             //2. During the polling process, it was found that a task was added, and the polling was interrupted.
             //3. Blocking select operation: (timeoutMills)
             //4. Avoid JDK null polling bug
         }
     }
     ...
 }

(2) If the timing task is about to start, this poll will be interrupted

The select operation of the Reactor thread in NioEventLoop is also a for loop.

 

In the first step of the for loop, if you find that the start time of a task in the current timing task queue is almost up (less than 0.5ms), then the loop will jump out. Before jumping out of the loop, if you find that no select operation has been performed so far, call the selectNow() method once to perform a non-blocking select operation.

 

The timing task queue in Netty is sorted from small to large according to the delay time, so the delayNanos() method returns the delayNanos() time is the earliest deadline time.

//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
 public final class NioEventLoop extends SingleThreadEventLoop {
     Selector selector;
     ...
     private void select(boolean oldWakenUp) throws IOException {
         Selector selector = ;
         int selectCnt = 0;
         long currentTimeNanos = ();//Current time
         long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);//Current time + earliest deadline for the timing task
         for(;;) {
             //1. The deadline of the timing task is coming soon, and this polling is interrupted
             long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
             if (timeoutMillis <= 0) {
                 if (selectCnt == 0) {
                     ();//Execute select operations without blocking
                     selectCnt = 1;
                 }
                 break;
             }
             ...
         }
     }
     ...
 }

 //Abstract base class for OrderedEventExecutor's that execute all its submitted tasks in a single thread.
 public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
     ...
     protected long delayNanos(long currentTimeNanos) {
         ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
         if (scheduledTask == null) {
             return SCHEDULE_PURGE_INTERVAL;
         }
         return (currentTimeNanos);
     }
     ...
 }

 //Abstract base class for EventExecutors that want to support scheduling.
 public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
     Queue<ScheduledFutureTask<?>> scheduledTaskQueue;//Scheduled Task Queue
     ...
     final ScheduledFutureTask<?> peekScheduledTask() {
         Queue<ScheduledFutureTask<?>> scheduledTaskQueue = ;
         if (scheduledTaskQueue == null) {
             return null;
         }
         return ();
     }
     ...
 }

 final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V> {
     ...
     public long delayNanos(long currentTimeNanos) {
         return (0, deadlineNanos() - (currentTimeNanos - START_TIME));
     }
     public long deadlineNanos() {
         return deadlineNanos;
     }
     ...
 }

(3) If a task is found to be added during the polling, the poll will be interrupted.

Note: Netty's task queue includes normal tasks and timed tasks. This polling needs to be interrupted when the timing task is about to start, and this polling needs to be interrupted when the ordinary task queue is not empty.

 

In order to ensure that ordinary tasks in ordinary task queues can be executed in a timely manner, Netty will determine whether the ordinary task queue is empty before calling the () method for blocking select operation. If it is not empty, then call the() method to perform a non-blocking select operation and then jump out of the loop.

//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
 public final class NioEventLoop extends SingleThreadEventLoop {
     Selector selector;
     ...
     private void select(boolean oldWakenUp) throws IOException {
         Selector selector = ;
         int selectCnt = 0;
         ...
         for(;;) {
             ...
             //2. During the polling process, it was found that a task was added, and the polling was interrupted.
             if (hasTasks() && (false, true)) {
                 ();//Non-blocking execution of select operation
                 selectCnt = 1;
                 break;
             }
             ...
         }
     }
     ...
 }

 //Abstract base class for EventLoops that execute all its submitted tasks in a single thread.
 public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
     private final Queue<Runnable> tailTasks;
     ...
     @Override
     protected boolean hasTasks() {
         return () || !();
     }
     ...
 }

 //Abstract base class for OrderedEventExecutor's that execute all its submitted tasks in a single thread.
 public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
     private final Queue<Runnable> taskQueue;//Normal task queue
     ...
     protected boolean hasTasks() {
         assert inEventLoop();
         return !();
     }
     ...
 }

(4) Perform blocking select operation

1. Block up to the start time of the first timing task at most

2. External thread submits tasks to wake up the Reactor thread

3. The judgment conditions for whether to interrupt this polling

 

1. Block up to the start time of the first timing task at most

Execution to this step means that the queue in Netty's normal task queue is empty, and the start time of all timing tasks has not yet reached (greater than 0.5ms). So a blocking select operation is performed, blocking until the start time of the first timing task, that is, passing timeoutMills as a parameter into the select() method.

//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
 public final class NioEventLoop extends SingleThreadEventLoop {
     Selector selector;
     ...
     private void select(boolean oldWakenUp) throws IOException {
         Selector selector = ;
         ...
         for(;;) {
             ...
             //3. Blocking select operation: (timeoutMills), maximum blocking timeoutMills time
             int selectedKeys = (timeoutMillis);
             selectCnt ++;
             if (selectedKeys != 0 || oldWakenUp || () || hasTasks() || hasScheduledTasks()) {
                 // - Selected something,
                 // - waken up by user, or
                 // - the task queue has a pending task.
                 // - a scheduled task is ready for processing
                 break;
             }
             ...
         }
     }
     ...
 }

2. External thread submits tasks to wake up the Reactor thread

If the delay time of the first timing task is very long, such as one hour, then it is possible that the thread will keep blocking and the select operation (it will still return after selecting). But as long as new tasks are added during this period, the block will be released.

 

For example, when an external thread executes the execution() method of NioEventLoop to add a task, the wakeUp() method of NioEventLoop will be called to wake up the blocked thread through the() method (timeoutMills).

//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
 public final class NioEventLoop extends SingleThreadEventLoop {
     //Boolean that controls determines if a blocked should break out of its selection process.
     //In our case we use a timeout for the select method and the select method will block for that time unless waken up.
     private final AtomicBoolean wakenUp = new AtomicBoolean();
     Selector selector;
     ...
     @Override
     protected void wakeup(boolean inEventLoop) {
         if (!inEventLoop && (false, true)) {
             ();
         }
     }
 }

 //Abstract base class for EventLoops that execute all its submitted tasks in a single thread.
 public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
     ...
 }

 //Abstract base class for OrderedEventExecutor's that execute all its submitted tasks in a single thread.
 public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
     ...
     @Override
     public void execute(Runnable task) {
         if (task == null) {
             throw new NullPointerException("task");
         }

         boolean inEventLoop = inEventLoop();
         if (inEventLoop) {
             addTask(task);
         } else {
             startThread();
             addTask(task);
             if (isShutdown() && removeTask(task)) {
                 reject();
             }
         }

         if (!addTaskWakesUp && wakesUpForTask(task)) {
             //Call() method to wake up the blocked thread that is executing (timeoutMills)
             wakeup(inEventLoop);
         }
     }
     ...
 }

3. The judgment conditions for whether to interrupt this polling

After the blocking select operation is completed, Netty will make a series of state judgments to decide whether to interrupt this poll. If the following conditions are met, the poll will be interrupted:

Condition 1: IO event detected

Condition 2: Actively wake up by the user

Condition 3: There are tasks that need to be executed in the ordinary task queue

Condition 4: The first timed task is about to be executed

//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
 public final class NioEventLoop extends SingleThreadEventLoop {
     Selector selector;
     ...
     private void select(boolean oldWakenUp) throws IOException {
         Selector selector = ;
         ...
         for(;;) {
             ...
             //3. Blocking select operation: (timeoutMills), maximum blocking timeoutMills time
             int selectedKeys = (timeoutMillis);
             selectCnt ++;
             //After the blocking select operation is completed, Netty will make a series of state judgments to decide whether to interrupt this poll
             if (selectedKeys != 0 || oldWakenUp || () || hasTasks() || hasScheduledTasks()) {
                 // - Selected something, IO event was detected
                 // - waken up by user, actively awakened by user
                 // - the task queue has a pending task. There are tasks that need to be executed in the normal task queue
                 // - a scheduled task is ready for processing the first scheduled task is about to be executed
                 break;
             }
             ...
         }
     }
     ...
 }

(5) Avoid JDK's empty polling bug

The JDK null polling bug will cause the selector to be polled all the time, which will eventually lead to 100% CPU utilization.

 

1.Netty method to avoid JDK empty polling

First, the start time is recorded before each execution (timeoutMillis), and the end time is recorded after the blocking select operation.

 

Then determine whether the blocking select operation lasts for at least timeoutMillis time. If the blocking select operation lasts more than or equal to timeoutMillis, it means that this is a valid poll, so selectCnt is reset to 1. If the blocking select operation lasts less than timeoutMillis, it means that the JDK's empty polling bug may have been triggered, so selectCnt is automatically increased. When the number of select operations with a short duration selectCnt exceeds 512 times, then the Selector is rebuilt.

//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
 public final class NioEventLoop extends SingleThreadEventLoop {
     Selector selector;
     private static final int SELECTOR_AUTO_REBUILD_THRESHOLD = 512;
     ...
     private void select(boolean oldWakenUp) throws IOException {
         Selector selector = ;
         intselectCnt = 0;
         long currentTimeNanos = ();//Record the start time
         ...
         for(;;) {
             ...
             int selectedKeys = (timeoutMillis);//Perform blocking select operation
             selectCnt++;//The duration of the select operation is very short, and empty polling may occur. SelectCnt needs to be increased by itself.
             long time = ();//Record end time
             if (time - (timeoutMillis) >= currentTimeNanos) {
                 //If the select operation lasts more than timeoutMillis, it means that this is a valid poll, reset selectCnt to 1
                 selectCnt = 1;
             } else if (selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                 //If the number of select operations with a short duration exceeds 512 times, rebuild the selector
                 rebuildSelector();//Rebuild Selector
                 selector = ;
                 ();
                 selectCnt = 1;
                 break;
             }
             currentTimeNanos = time;
             ...
         }
     }
     ...
 }

2. Reconstruct the logic of Selector

The logic of rebuilding the Selector is to create a new Selector through the openSelector() method, and then execute an infinite for loop. As long as a concurrent modification of SelectionKeys exception occurs during the execution, the transfer will be restarted until the transfer is completed.

 

The specific transfer steps are: first get the valid key, and then cancel the event registration of the key on the old Selector. Then register the Channel corresponding to the key to the new Selector, and finally rebind the Channel and the new key.

//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
 public final class NioEventLoop extends SingleThreadEventLoop {
     Selector selector;
     ...
     //Replaces the current Selector of this event loop with newly created Selectors to work around the infamous epoll 100% CPU bug.
     public void rebuildSelector() {
         final Selector oldSelector = selector;
         final Selector newSelector = openSelector();
         int nChannels = 0;
         for (;;) {
             try {
                 for (SelectionKey key: ()) {
                     Object a = ();
                     //1. Get a valid key
                     if (!() || ().keyFor(newSelector) != null) {
                         continue;
                     }
        
                     int interestOps = ();
                     //2. Cancel the event registration of the key on the old Selector
                     ();
                     //3. Register the Channel corresponding to the key to the new Selector
                     SelectionKey newKey = ().register(newSelector, interestOps, a);
                     if (a instanceof AbstractNioChannel) {
                         //4. Rebind Channel and new key
                         ((AbstractNioChannel) a).selectionKey = newKey;
                     }
                     nChannels++;
                 }
                 break;
             } catch(ConcurrentModificationException e) {
                 continue;
             }
         }
         selector = newSelector;
         ();
     }
     ...
 }

(6) Summary of executing an event poll

What does the Reactor thread's select operation do:

 

Simply put:

Continuously poll whether there are IO events, and constantly check whether there are tasks that need to be executed during the polling process, so as to ensure that all tasks in the Netty task queue can be executed in time, and a counter will be cleverly used during the polling process to avoid JDK's empty polling bug.

 

To put it in detail:

The select() method of NioEventLoop will first determine whether there is a timed task that is almost time to start and whether there is a task in the taskQueue, a normal task queue. If there is a call () for non-blocking select operation, if there is no call (timeoutMillis) for blocking select operation. After the blocking select operation is completed, it will be determined whether the select operation blocked timeoutMillis for so long. If there is no blockage for so long, it means that the JDK's null polling bug may have been triggered. Next, we will continue to determine whether the number of times that may trigger the null polling bug has reached 512 times. If it is reached, we will avoid the null polling bug by replacing the original Selector.

 

Thread processing channel that generates IO events

(1) Key logic for handling IO events

(2) Netty's optimization of selectedKeys

(3) Description of the process of handling IO events

(4) Summary of handling IO events

 

(1) Key logic for handling IO events

The first step of the Reactor thread is to poll out the IO events registered on the Selector, and the second step is to handle these IO events.

 

The key logic of processSelectedKeys() contains two parts:

1. Optimization for selectedKeys

2. The processSelectedKeysOptimized() method truly handles IO events

//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
 public final class NioEventLoop extends SingleThreadEventLoop {
     Selector selector;
     private SelectedSelectionKeySet selectedKeys;
     ...
     @Override
     protected void run() {
         for (;;) {
             ...
             //1. Call the select() method to execute an event poll
             select((false));
             if (()) {
                 ();
             }
             ...
             //2. Handle Channel that generates IO events
             processSelectedKeys();
             ...
             //3. Execute the task of putting the external thread into the TaskQueue
             runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
         }
     }

     private void processSelectedKeys() {
         if (selectedKeys != null) {
             //() will return an array
             processSelectedKeysOptimized(());
         } else {
             processSelectedKeysPlain(());
         }
     }
     ...
 }

(2) Netty's optimization of selectedKeys

All Netty's optimizations to selectedKeys are reflected in the openSelector() method of NioEventLoop.

 

This optimization refers to:

() operation will add the IO event in the ready state to the two HashSet member variables at the bottom of the Selector, and Netty will replace the HashSet used in the Selector to store the SelectionKey with an array through reflection, so that the time complexity of adding the SelectionKey is reduced from the O(n) of the HashSet to the O(1) of the array.

 

Specifically:

The member variable selectedKeys of NioEventLoop is a SelectedSelectionKeySet object, which will be created in the openSelector() method of NioEventLoop. After that, the openSelector() method will bind selectedKeys to the two member variables of the Selector through reflection. SelectedSelectionKeySet inherits AbstractSet, but the underlying layer uses arrays to store SelectionKey.

//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
 public final class NioEventLoop extends SingleThreadEventLoop {
     Selector selector;
     private SelectedSelectionKeySet selectedKeys;
     ...
     private Selector openSelector() {
         final Selector selector = ();
         final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
         ...
         //The following selectorImplClass corresponds to
         Field selectedKeysField = ("selectedKeys");
         Field publicSelectedKeysField = ("publicSelectedKeys");
         (true);
         (true);
         //These two sets are the essence of optimization. In a word, it is:
         //Replace the HashSet implementation in Selector for storing SelectionKey with arrays, so that the time complexity of the add() method is O(1)
         (selector, selectedKeySet);
         (selector, selectedKeySet);
         ...
         selectedKeys = selectedKeySet;
         ...
     }
     ...
 }

 //SelectedSelectionKeySet inherits AbstractSet, which means that this class can be used as a Set, but the underlying layer uses two arrays to alternately use them.
 //In the add() method, first determine which array should be used, and then find the corresponding array to perform the following 3 steps:
 //Step 1: Put SelectionKey into the end of the array
 //Step 2: Update the logical length of the array +1
 //Step 3: If the logical length of the array is equal to the physical length of the array, expand the array

 //After the program runs for a period of time, wait until the length of the array is long enough, every time the NIO event is polled,
 //Calling the add() method here only requires O(1) time complexity to put the SelectionKey into the Set.
 //The time complexity of HashSet put() method used in the underlying JDK is O(1) and the maximum is O(n).
 // Another advantage of replacing HashSet with array is that it is very efficient when traversing
 final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
     private SelectionKey[] keysA;
     private int keysASize;
     private SelectionKey[] keysB;
     private int keysBSize;
     private boolean isA = true;

     SelectedSelectionKeySet() {
         keysA = new SelectionKey[1024];
         keysB = ();
     }

     @Override
     public boolean add(SelectionKey o) {
         if (o == null) {
             return false;
         }
         if (isA) {
             int size = keysASize;
             keysA[size ++] = o;
             keysASize = size;
             if (size == ) {
                 doubleCapacityA();
             }
         } else {
             int size = keysBSize;
             keysB[size ++] = o;
             keysBSize = size;
             if (size == ) {
                 doubleCapacityB();
             }
         }
         return true;
     }
    
     //Return an array
     SelectionKey[] flip() {
         if (isA) {
             isA = false;
             keysA[keysASize] = null;
             keysBSize = 0;
             return keysA;
         } else {
             isA = true;
             keysB[keysBSize] = null;
             keysASize = 0;
             return keysB;
         }
     }
     ...
 }

 //You can see that the two member variables of SelectorImpl are selectedKeys and keys, both HashSet
 public abstract class SelectorImpl extends AbstractSelector {
     protected Set<SelectionKey> selectedKeys = new HashSet();
     protected HashSet<SelectionKey> keys = new HashSet();
     private Set<SelectionKey> publicKeys;
     private Set<SelectionKey> publicSelectedKeys;

     protected SelectorImpl(SelectorProvider var1) {
         super(var1);
         if (("1.4")) {
              = ;
              = ;
         } else {
              = ();
              = ();
         }
     }
     ...
 }

(3) Description of the process of handling IO events

Note 1:

First remove the IO event. The IO event is taken from selectedKeys in the form of an array, and its corresponding channel is returned by the attachment() method of the SelectionKey.

 

At this time, you can appreciate the benefits of optimized selectedKeys. Because the array is traversed during traversal, the efficiency is improved compared to the JDK native HashSet.

 

After getting the current SelectionKey, set selectedKeys[i] to null, which is done to facilitate GC. Because assuming that a NioEventLoop polls N IO events per average and 3N events are polled during peak periods, then the physical length of selectedKeys must be greater than or equal to 3N. If you do not set selectedKeys[i] to null each time you process these keys, then once the peak period passes, the SelectionKeys corresponding to selectedKeys[i] stored at the tail of the array will not be recycled. Although the corresponding object of the SelectionKey may not be large, the associated attachment may be large. If these objects survive and cannot be recycled, memory leaks may occur.

 

Note 2:

Then get the attachment corresponding to the current SelectionKey. This attachment is the Channel corresponding to the IO event retrieved, so the Channel can be processed next.

 

Since when Netty registers the server Channel, the SelectableChannel object inside the AbstractNioChannel will be registered with the Selector object, and the AbstractNioChannel is attached as an attachment to the SelectableChannel object. Therefore, when JDK polls a SelectableChannel with an IO event, you can directly take out the AbstractNioChannel through the attachment() method and operate it.

 

Description Three:

Then the processSelectedKey() method will be called to process the SelectionKey and AbstractNioChannel. Netty has two major categories of Channels: one is NioServerSocketChannel, which is handled by bossGroup. The other is NioSocketChannel, handled by the workerGroup. For boss's NioEventLoop, the connection event is polled. For worker's NioEventLoop, the read and write events are polled.

 

Description 4:

Finally, we will determine whether to poll again. The run() method of NioEventLoop will set needsToSelectAgain to false every time after polling an IO event. Only when the Channel is removed from the Selector, that is, when the cancel() method of NioEventLoop is called, and the cancel() key has reached 256 times, will the needsToSelectAgain be set to true. When needsToSelectAgain is true, the selectAgain() method will be called to poll again.

//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
 public final class NioEventLoop extends SingleThreadEventLoop {
     Selector selector;
     private SelectedSelectionKeySet selectedKeys;
     private boolean needsToSelectAgain;
     private int cancelledKeys;
     private static final int CLEANUP_INTERVAL = 256;
     ...
     @Override
     protected void run() {
         for (;;) {
             ...
             //1. Call the select() method to execute an event poll
             select((false));
             if (()) {
                 ();
             }
             ...
             //2. Handle Channel that generates IO events
             needsToSelectAgain = false;
             processSelectedKeys();
             ...
             //3. Execute the task of putting the external thread into the TaskQueue
             runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
         }
     }

     private void processSelectedKeys() {
         if (selectedKeys != null) {
             //() will return an array
             processSelectedKeysOptimized(());
         } else {
             processSelectedKeysPlain(());
         }
     }

     private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
         for (int i = 0;; i ++) {
             //1.First remove the IO event
             final SelectionKey k = selectedKeys[i];
             if (k == null) {
                 break;
             }
             selectedKeys[i] = null;//Help GC
             //2. Then get the corresponding channel and process the Channel
             //By default, this a is NioChannel, that is, the Channel encapsulated by Netty when the server starts
             final Object a = ();
             if (a instanceof AbstractNioChannel) {
                 // Handling of network events
                 processSelectedKey(k, (AbstractNioChannel) a);
             } else {
                 //NioTask is mainly used for some tasks executed when a SelectableChannel is registered with the Selector
                 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                 processSelectedKey(k, task);
             }
             //3. Finally, determine whether another poll should be conducted
             if (needsToSelectAgain) {
                 for (;;) {
                     i++;
                     if (selectedKeys[i] == null) {
                         break;
                     }
                     selectedKeys[i] = null;
                 }
                 selectAgain();
                 //() will return an array
                 selectedKeys = ();
                 i = -1;
             }
         }
     }

     private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
         final unsafe = ();
         if (!()) {
             final EventLoop eventLoop;
             try {
                 eventLoop = ();
             } catch (Throwable ignored) {
                 //If the channel implementation throws an exception because there is no event loop,
                 //we ignore this because we are only trying to determine if ch is registered to this event loop and thus has authority to close ch.
                 return;
             }
             //Only close ch if ch is still registered to this EventLoop.
             //ch could have deregistered from the event loop and thus the SelectionKey could be cancelled as part of the deregistration process,
             //but the channel is still healthy and should not be closed.
             if (eventLoop != this || eventLoop == null) {
                 return;
             }
             //close the channel if the key is not valid anymore
             (());
             return;
         }

         try {
             int readyOps = ();
             //We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
             //the NIO JDK channel implementation may throw a NotYetConnectedException.
             if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                 //remove OP_CONNECT as otherwise (..) will always return without blocking
                 int ops = ();
                 ops &= ~SelectionKey.OP_CONNECT;
                 (ops);
                 ();
             }

             //Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
             if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                 //Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                 ().forceFlush();
             }

             //Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead to a spin loop
             if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                 ();
                 if (!()) {
                     //Connection already closed - no need to handle write.
                     return;
                 }
             }
         } catch (CancelledKeyException ignored) {
             (());
         }
     }

     void cancel(SelectionKey key) {
         ();
         cancelledKeys ++;
         if (cancelledKeys >= CLEANUP_INTERVAL) {
             cancelledKeys = 0;
             needsToSelectAgain = true;
         }
     }
     ...
 }

(4) Summary of handling IO events

By default, Netty will use reflection to convert the two HashSets of the Selector underlying layer to store the SelectionKey into an array to improve the efficiency of handling IO events.

 

When processing each SelectionKey, the corresponding attachment will be obtained, and this attachment is an AbstractNioChannel bound to the server Channel when registering the Selector. Therefore, when processing each SelectionKey, you can find the corresponding AbstractNioChannel, and then the processing will be serially serialized to the ChannelHandler and callback to the user through Pipeline.

 

Thread processing task queue addition task

(1) The process of Reactor thread performing an event poll

(2) Task classification and addition instructions

(3) Adding ordinary tasks

(4) Adding timed tasks

(5) Supplementary of Netty's timing task mechanism

 

(1) The process of Reactor thread performing an event poll

Each time the Reactor thread polls through the run() method of NioEventLoop, it will first call the select() method to try to detect the IO event, and then call the processSelectedKeys() method to process the detected IO event. Among them, the IO event mainly includes the new connection access event and the connection's data read and write events. Finally, the runAllTasks() method will be called to handle the asynchronous tasks in the task queue.

 

(2) Task classification and addition instructions

The Task in the runAllTasks() method includes ordinary tasks and timed tasks, which are stored in different queues of NioEventLoop. One is the ordinary task queue MpscQueue, and the other is the timed task queue PriorityQueue.

 

The ordinary task queue MpscQueue is created when creating NioEventLoop, and then when an external thread calls the execution() method of NioEventLoop, the addTask() method will be called to save the Task to the normal task queue.

 

The scheduled task queue PriorityQueue is created when adding a timing task. Then, when an external thread calls the schedule() method of NioEventLoop, the scheduleTaskQueue().add() method will be called to save the Task to the scheduled task queue.

 

(3) Adding ordinary tasks

Scenario 1: User-defined normal tasks

Scenario 2: External thread calls Channel method

 

When customizing normal tasks through().eventLoop().execute(...), or calling various methods of Channel through non-Reactor threads (external threads), the execution() method of SingleThreadEventExecutor will be executed in the end.

 

Scenario 1: User-defined normal tasks

Whether it is an external thread or a Reactor thread that executes the execution() method of NioEventLoop, it will call the addTask() method of NioEventLoop and then call the offerTask() method. The offerTask() method will use a taskQueue to save the Task. This taskQueue is actually an MPSC queue, and every NioEventLoop will have an MPSC queue.

 

Netty uses MPSC queues to easily aggregate asynchronous tasks of external threads, and then batch execution with a single thread inside the Reactor thread to improve performance. We can learn from Netty's task execution mode to handle multi-threaded data aggregation and report applications regularly.

//Scene 1: User-defined normal tasks
 ().eventLoop().execute(new Runnable() {
     @Override
     public void run() {
         ...
     }
 });

 public final class NioEventLoop extends SingleThreadEventLoop {
     ...
 }

 public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
     ...
 }

 public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
     //Each NioEventLoop will have an MPSC queue
     private final Queue<Runnable> taskQueue;
     protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
           boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
         super(parent);
          = addTaskWakesUp;
          = (16, maxPendingTasks);
          = (executor, "executor");
         //Create ordinary task queue MpscQueue
         taskQueue = newTaskQueue();
         rejectedExecutionHandler = (rejectedHandler, "rejectedHandler");
     }
     ...
    
     @Override
     public void execute(Runnable task) {
         if (task == null) throw new NullPointerException("task");
         boolean inEventLoop = inEventLoop();
         // Whether it is an external thread or a Reactor thread that executes the execution() method of NioEventLoop, the addTask() method of NioEventLoop will be called.
         if (inEventLoop) {
             addTask(task);
         } else {
             startThread();
             addTask(task);
             if (isShutdown() && removeTask(task)) {
                 reject();
             }
         }
         if (!addTaskWakesUp && wakesUpForTask(task)) wakeup(inEventLoop);
     }
    
     //Add a task to the task queue, or throws a RejectedExecutionException if this instance was shutdown before.
     protected void addTask(Runnable task) {
         if (task == null) throw new NullPointerException("task");
         if (!offerTask(task)) reject(task);
     }

     final boolean offerTask(Runnable task) {
         if (isShutdown()) reject();
         return (task);
     }
     ...
 }

Scenario 2: External thread calls Channel method

This scenario is in a business thread, the corresponding Channel is found according to the user's identity, and then the Channel's write() method is called to push a message to the user.

 

When an external thread calls the write() method of Channel, () will return false. So the write operation will be encapsulated into a WriteTask, and then the safeExecute() method will be called to execute. By default, the NIO thread corresponding to the Channel will be obtained, and then passed as parameters into the safeExecute() method for execution, thereby ensuring that the task will be executed by the NIO thread corresponding to the Channel, and thread safety is achieved through single thread execution.

//Scenario 2: The current thread is a business thread
 (...)

 abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint {
     final EventExecutor executor;// Generally, the default is null when initializing
     AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, Class<? extends ChannelHandler> handlerClass) {
          = pipeline;
          = executor;
         ...
     }
        
     ...
     private void write(Object msg, boolean flush, ChannelPromise promise) {
         AbstractChannelHandlerContext next = findContextOutbound();
         final Object m = (msg, next);
         EventExecutor executor = ();
         // When an external thread calls the write() method of Channel, () will return false
         if (()) {
             if (flush) {
                 (m, promise);
             } else {
                 (m, promise);
             }
         } else {
             //Embroidery the write operation into a WriteTask
             AbstractWriteTask task;
             if (flush) {
                 task = (next, m, promise);
             } else {
                 task = (next, m, promise);
             }
             //Call safeExecute() to execute
             safeExecute(executor, task, promise, m);
         }
     }
    
     @Override
     public EventExecutor executor() {
         if (executor == null) {
             return channel().eventLoop();
         } else {
             return executor;
         }
     }
    
     private AbstractChannelHandlerContext findContextOutbound(int mask) {
         AbstractChannelHandlerContext ctx = this;
         EventExecutor CurrentExecutor = executor();
         do {
             ctx = ;
         } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));
         return ctx;
     }
    
     private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) {
         try {
             //Calling the() method
             (runnable);
         } catch (Throwable cause) {
             try {
                 (cause);
             } finally {
                 if (msg != null) {
                     (msg);
                 }
             }
         }
     }
     ...
 }

 public class DefaultChannelPipeline implements ChannelPipeline {
     ...
     final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {
         private final Unsafe unsafe;
         HeadContext(DefaultChannelPipeline pipeline) {
             //The incoming executor is null
             super(pipeline, null, HEAD_NAME, );
             unsafe = ().unsafe();
             setAddComplete();
         }
         ...
     }
    
     final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
         TailContext(DefaultChannelPipeline pipeline) {
             //The incoming executor is null
             super(pipeline, null, TAIL_NAME, );
             setAddComplete();
         }
         ...
     }
     ...
 }

 public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
     //Each NioEventLoop will have an MPSC queue
     private final Queue<Runnable> taskQueue;
     protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
           boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
         super(parent);
          = addTaskWakesUp;
          = (16, maxPendingTasks);
          = (executor, "executor");
         //Create ordinary task queue MpscQueue
         taskQueue = newTaskQueue();
         rejectedExecutionHandler = (rejectedHandler, "rejectedHandler");
     }
  
     ...
     @Override
     public void execute(Runnable task) {
         if (task == null) throw new NullPointerException("task");
         boolean inEventLoop = inEventLoop();
         // Whether it is an external thread or a Reactor thread that executes the execution() method of NioEventLoop, the addTask() method of NioEventLoop will be called.
         if (inEventLoop) {
             addTask(task);
         } else {
             startThread();
             addTask(task);
             if (isShutdown() && removeTask(task)) {
                 reject();
             }
         }
         if (!addTaskWakesUp && wakesUpForTask(task)) wakeup(inEventLoop);
     }
    
     //Add a task to the task queue, or throws a RejectedExecutionException if this instance was shutdown before.
     protected void addTask(Runnable task) {
         if (task == null) throw new NullPointerException("task");
         if (!offerTask(task)) reject(task);
     }

     final boolean offerTask(Runnable task) {
         if (isShutdown()) reject();
         return (task);
     }
     ...
 }

(4) Adding timed tasks

Usually, the scheduled task is customized using().eventLoop().schedule(..), where the schedule() method will add the scheduled task through scheduledTaskQueue().add(task). First, the scheduledTaskQueue() method will return a priority queue, and then the timing task object will be added to the queue through the add() method of the priority queue.

 

Note that the reasons why the priority queue can be used here without considering the multi-threaded concurrency problem are as follows. If an external thread calls the schedule() method to add a timed task, Netty will encapsulate the logic of adding a timed task into a normal task. This task is a task that "adds a certain timing task" instead of adding a certain timing task. In this way, access to the priority queue becomes a single thread, that is, only the Reactor thread will access it, so there is no problem of multi-thread concurrency.

//Scene 3: User-defined timing tasks, which is also the most commonly used method
 ().eventLoop().schedule(new Runnable() {
     @Override
     public void run() {
         ...
     }
 }, 60, );

 //Abstract base class for EventExecutors that want to support scheduling.
 public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
     Queue<ScheduledFutureTask<?>> scheduledTaskQueue;
     ...
     //Add a timed task
     <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
         if (inEventLoop()) {
             //If the current thread is a Reactor thread, add tasks directly to PriorityQueue
             scheduledTaskQueue().add(task);
         } else {
             //If it is an external thread, call the() method
             //The action of adding a timed task is also encapsulated into a normal task
             execute(new Runnable() {
                 @Override
                 public void run() {
                     scheduledTaskQueue().add(task);
                 }
             });
         }
         return task;
     }
    
     Queue<ScheduledFutureTask<?>> scheduledTaskQueue() {
         if (scheduledTaskQueue == null) {
             //Create a timed task queue PriorityQueue
             scheduledTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>();
         }
         return scheduledTaskQueue;
     }
     ...
 }

(5) Supplementary of Netty's timing task mechanism

1. How to ensure priority execution of tasks with the nearest deadline

Why are timing tasks saved in priority queues? The characteristics of the priority queue are to arrange internal elements in a certain order, and internal elements can be compared. Since each element in the priority queue is a timing task, timing tasks can also be compared. The logic of comparison is: first compare the deadline of the timing task, and then compare the order of the addition of the timing task, which is the ID, when the deadline is the same.

 

2. Netty's timing tasks have three ways to execute

Method 1: Timed tasks will not be executed repeatedly

().eventLoop().schedule(), the passed periodNanos is 0.

Method 2: Execute it every once in a while

().eventLoop().scheduleAtFixedRate(), the passed periodNanos is a positive number.

Method 3: Execute again at the same time

().eventLoop().scheduleWithFixedDelay(), the passed periodNanos is a negative number.

 

Netty's three timing tasks are executed by adjusting the deadline of the next task. First, modify the deadline for the next execution, and then add the current task to the queue again, so as to ensure that the task is executed at the appropriate time.

final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V> {
     //Each timing task has a unique ID
     private static final AtomicLong nextTaskId = new AtomicLong();
     private final long id = ();
     private long deadlineNanos;
     //Identify whether a task is executed repeatedly and in what way
     private final long periodNanos;
     ...
    
     @Override
     public int compareTo(Delayed o) {
         if (this == o) {
             return 0;
         }

         ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;
         long d = deadlineNanos() - ();
         if (d < 0) {
             return -1;
         } else if (d > 0) {
             return 1;
         } else if (id < ) {
             return -1;
         } else if (id == ) {
             throw new Error();
         } else {
             return 1;
         }
     }

     public long deadlineNanos() {
         return deadlineNanos;
     }
    
     @Override
     public void run() {
         ...
         if (periodNanos == 0) {
             //1. Corresponding to schedule() method, representing a one-time task
             V result = ();
             setSuccessInternal(result);
         } else {
             ();
             long p = periodNanos;
             if (p > 0) {
                 //2. Corresponding to scheduleAtFixedRate() method, indicating that tasks are executed at a fixed rate
                 deadlineNanos += p;
             } else {
                 //3. Corresponding to scheduleWithFixedDelay() method, indicating that tasks are executed with fixed delays
                 deadlineNanos = nanoTime() - p;
             }
             (this);
         }
         ...
     }
     ...
 }

 

Thread processing task execution tasks in task queue

(1) The runAllTasks() method needs to pass in timeout time

(2) Steps for Reactor thread to execute tasks

(3) Batch strategy for Netty performance optimization

(4) Summary of the execution task of() method

 

(1) The runAllTasks() method needs to pass in timeout time

The runAllTasks() method of SingleThreadEventExecutor needs to be passed in the parameter timeoutNanos, which means that all tasks should be taken out and executed within the timeoutNanos time. Because if the Reactor thread stays for too long while executing tasks, many IO events will accumulate and cannot be processed in time, resulting in a large number of client requests blocking. Therefore, Netty will finely control the execution time of the internal task queue.

 

(2) Steps for Reactor thread to execute tasks

1. Task aggregation

Transfer the timing task to the MPSC queue, where the timed task that is about to expire is transferred to the MPSC queue.

 

2. Time calculation

Calculate the deadline for this round of task execution. At this time, all timing tasks whose deadline has reached have been filled into the ordinary task queue (MPSC queue).

 

3. Task execution

First, execute tasks synchronously without throwing exceptions, then accumulate the number of tasks currently executed, and then calculate whether the current time has exceeded the deadline every 64 times, and finally determine whether this round of tasks has been executed.

//Abstract base class for OrderedEventExecutor's that execute all its submitted tasks in a single thread.
 public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
     //Each NioEventLoop will have an MPSC queue
     private final Queue<Runnable> taskQueue;
     ...
    
     //Poll all tasks from the task queue and run them via Runnable#run() method.
     //This method stops running the tasks in the task queue and returns if it ran longer than timeoutNanos.
     protected boolean runAllTasks(long timeoutNanos) {
         //1. Transfer the timing task to the MPSC queue, that is, task aggregation
         fetchFromScheduledTaskQueue();
         //Get tasks from ordinary task queue (MPSC queue)
         Runnable task = pollTask();
         if (task == null) {
             afterRunningAllTasks();
             return false;
         }
         //2. Calculate the deadline for the execution of this round of tasks
         final long deadline = () + timeoutNanos;
         long runTasks = 0;
         long lastExecutionTime;
         //3. Execute tasks, and execute the tasks retrieved by pollTask() one by one through a for loop
         for (;;) {
             //3.1 Execute tasks without throwing exceptions (synchronous blocking) to ensure that tasks can be executed safely
             safeExecute(task);
             //3.2 Accumulate the number of tasks currently executed
             runTasks ++;
             //3.3 Calculate whether the current time has exceeded the deadline every 64 times, because () is also quite time-consuming
             if ((runTasks & 0x3F) == 0) {
                 lastExecutionTime = ();
                 if (lastExecutionTime >= deadline) {
                     break;
                 }
             }
             //3.4 Determine whether this round of tasks has been completed
             task = pollTask();
             if (task == null) {
                 lastExecutionTime = ();
                 break;
             }
         }
         afterRunningAllTasks();
          = lastExecutionTime;
         return true;
     }
    
     private boolean fetchFromScheduledTaskQueue() {
         long nanoTime = ();
         Runnable scheduledTask = pollScheduledTask(nanoTime);
         while (scheduledTask != null) {
             if (!(scheduledTask)) {
                 scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
                 return false;
             }
             scheduledTask = pollScheduledTask(nanoTime);
         }
         return true;
     }
    
     protected Runnable pollTask() {
         assert inEventLoop();
         return pollTaskFrom(taskQueue);
     }

     protected final Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
         for (;;) {
             Runnable task = ();
             if (task == WAKEUP_TASK) {
                 continue;
             }
             return task;
         }
     }
     ...
 }

 //Abstract base class for EventExecutors that want to support scheduling.
 public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
     Queue<ScheduledFutureTask<?>> scheduledTaskQueue;
     ...
     //Return the Runnable which is ready to be executed with the given nanoTime.
     //You should use #nanoTime() to retrieve the correct nanoTime.
     protected final Runnable pollScheduledTask(long nanoTime) {
         assert inEventLoop();
         Queue<ScheduledFutureTask<?>> scheduledTaskQueue = ;
         ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : ();
         if (scheduledTask == null) {
             return null;
         }
         if (() <= nanoTime) {
             ();
             return scheduledTask;
         }
         return null;
     }
     ...
 }

 //Abstract base class for EventExecutor implementations.
 public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {
     ...
     //Try to execute the given Runnable and just log if it throws a Throwable.
     protected static void safeExecute(Runnable task) {
         try {
             ();//Synchronously execute tasks
         } catch (Throwable t) {
             ("A task raised an exception. Task: {}", task, t);
         }
     }
     ...
 }

(3) Netty performance optimization interval strategy

Assuming there are massive small tasks in the task queue, if you need to determine whether the deadline is reached every time you complete the task, then the efficiency is relatively low. Therefore, Netty chooses to determine whether the deadline is reached every 64 tasks, so the efficiency will be much higher.

 

(4) Summary of the execution task of() method

There are two types of tasks in Netty: one is ordinary tasks and the other is timed tasks. When Netty executes these tasks, first aggregates the timing tasks into the ordinary task queue, and then obtains tasks from the ordinary task queue to execute one by one. It is only after every 64 tasks are executed that determine whether the current time exceeds the maximum allowed execution time. If it exceeds, it will be interrupted directly. After the interrupt, the next () method for loop will be performed.

 

Summarize

(1) Summary of the execution process of NioEventLoop

(2) Summary of Reactor thread model

(3) Summary of NioEventLoop creation startup execution

 

(1) Summary of the execution process of NioEventLoop

1. During the execution process, NioEventLoop will first continuously detect whether there is an IO event, and then process the IO event if it is detected. Then, after processing the IO event, process the asynchronous tasks submitted by the external thread.

 

2. When detecting whether an IO event occurs, in order to ensure the timely processing of asynchronous tasks, as long as there is a task to be processed, then immediately stop the detection and process the task.

 

3. There are two types of tasks executed by external threads: ordinary tasks and timed tasks. These two tasks are saved to the MPSC queue and the priority queue respectively, and the tasks in the priority queue will eventually be transferred to the MPSC queue for processing.

 

4.Netty will check whether the timeout is timed out and the task execution loop will exit.

 

(2) Summary of Reactor thread model

A.NioEventLoopGroup is created in user code, and by default, it will create twice the CPU cores of several NioEventLoops.

 

2.NioEventLoop is lazy to start, bossNioEventLoop starts when the server starts, and workerNioEventLoop starts when the new connection is connected.

 

3. When the number of CPU cores is a power of 2, after binding NioEventLoop for each new connection, an optimization of modulus operation indexing and operation will be performed.

 

4. Each connection corresponds to a Channel, each Channel is bound to a unique NioEventLoop, a NioEventLoop may be bound to multiple Channels, and each NioEventLoop corresponds to a FastThreadLocalThread thread entity and a Selector. Therefore, all operations of a single connection are executed on one thread, so it is thread-safe.

 

5. Each NioEventLoop corresponds to a Selector, which can batch process the Channel registered to it.

 

6. The execution process of each NioEventLoop includes event detection, event processing and asynchronous task execution.

 

7. The user thread pool is thread-safe when performing some operations on the Channel. This is because Netty will encapsulate the operations of external threads into a Task and put it into the MPSC queue in the Channel-bound NioEventLoop, and then perform serial execution in the third process of the execution process (event loop) of the NioEventLoop.

 

8. Therefore, the responsibility of NioEventLoop is not only to handle network IO events, but also to be handled by user-defined ordinary tasks and timing tasks by NioEventLoop, thereby achieving the unification of the thread model.

 

9. From the scheduling layer, there is no other type of thread starting from the NioEventLoop thread for asynchronous execution of other tasks, thereby avoiding multi-threaded concurrent operations and lock competition, and improving the processing and scheduling performance of IO threads.

 

(3) Summary of NioEventLoop creation startup execution

1. When the user creates the bossGroup and the workerGroup, the NioEventLoopGroup is created. When the parameter is not transmitted by default, twice the CPU core will be created.

 

2. Each NioEventLoopGroup has a thread executor and a thread selector chooser. The thread selector chooser is used for thread allocation, which will optimize for the number of NioEventLoops.

 

3.NioEventLoop will create a Selector and an MPSC task queue when creating a Selector. When creating a Selector, Netty will use an array to replace the two HashSet data structures in the Selector through reflection.

 

4.Netty's NioEventLoop will start the thread when the execute() method is called for the first time. This thread is a FastThreadLocalThread object. After starting the thread, Netty will save the created thread to a member variable so that it can determine whether the thread that executes the logic in NioEventLoop is the created thread.

 

5. The execution logic of NioEventLoop in the run() method mainly includes three parts: the first is to detect IO events, the second is to handle IO events, and the third is to execute asynchronous tasks.