Outline
1. Sorting up the issues about NioEventLoop
2. Understanding the Reactor thread model is mainly divided into three parts
Creation of
Startup
1. Sorting up the issues about NioEventLoop
1. By default, how many threads will be used for Netty service and when will it be started?
Answer: The default is 2 times the CPU core and several threads. When calling the execute(task) method of EventExcutor, it will determine whether the current thread is the Netty Reactor thread, that is, whether the current thread is the thread entity corresponding to NioEventLoop. If so, it means that Netty's Reactor thread has been started. If not, it means that the external thread calls the execute() method of EventExcutor. So the startThread() method will be called first to determine whether the current thread has been started. If it has not been started, the current thread is started as the Netty Reactor thread.
2.How does Netty solve JDK null polling?
Answer: Netty will judge that if the currently blocked Select() operation does not take that long, it means that the empty polling bug may be triggered at this time. By default, if this phenomenon reaches 512 times, a Selector is rebuilt and all keys on the previous Selector are re-transferred to the new Selector. This method of handling above can avoid JDK null polling bugs.
3. How does Netty ensure that asynchronous serial lock-free?
Answer: There are two scenarios for asynchronous serial lock-freeization.
Scenario 1: Get a Channel on the client, and you don’t need to synchronize the Channel, and you can read and write concurrently by multi-threading.
Scenario 2: All operations in ChannelHandler are thread-safe and do not require synchronization.
When all external threads call the EventLoop or Channel method, Netty will use the inEventLoop() method to determine that the current thread is an external thread (a thread entity that is not a NioEventLoop). In this case, all operations will be encapsulated into a task and put into the MPSC queue. Then, in the execution logic of NioEventLoop, that is, the run() method, these tasks will be executed one by one.
2. Understanding the Reactor thread model is mainly divided into three parts
1.NioEventLoop creation
2.NioEventLoop start
Three.NioEventLoop execution
Creation of
(1) Create an entrance
(2) Determine the number of NioEventLoops
(3) The creation process of NioEventLoopGroup
(4) Create a thread executor ThreadPerTaskExecutor
(5) Create NioEventLoop
(6) Create thread selector EventExecutorChooser
(7) Summary of the creation of NioEventLoopGroup
(1) Create an entrance
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
(2) Determine the number of NioEventLoops
The NioEventLoopGroup construction method is used to determine the number of NioEventLoops. If the NioEventLoopGroup does not pass construction parameters, then the number of NioEventLoop threads is twice the number of CPU cores. If the NioEventLoopGroup passes the parameter n, then the number of NioEventLoop threads is n.
(3) The creation process of NioEventLoopGroup
The NioEventLoopGroup constructor triggers the creation process.
1. Create a thread executor ThreadPerTaskExecutor
A thread is created each time the() method is called.
2. Create NioEventLoop
NioEventLoop corresponds to the thread in the NioEventLoopGroup thread pool. The NioEventLoopGroup constructor will use a for loop to create a NioEventLoop thread by calling the newChild() method.
3. Create a thread selector EventExecutorChooser
The function of the thread selector is to assign a NioEventLoop thread to each new connection, that is, select a NioEventLoop thread from the NioEventLoopGroup thread pool to handle the new connection.
//MultithreadEventLoopGroup implementations which is used for NIO Selector based Channels.
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
//Create a new instance using the default number of threads,
//the default ThreadFactory and the SelectorProvider which is returned by SelectorProvider#provider().
public NioEventLoopGroup() {
this(0);
}
//Create a new instance using the specified number of threads,
//ThreadFactory and the SelectorProvider which is returned by SelectorProvider#provider().
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, ());
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, );
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, ());
}
...
}
//Abstract base class for EventLoopGroup implementations that handles their tasks with multiple threads at the same time.
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
private static final InternalLogger logger = ();
private static final int DEFAULT_EVENT_LOOP_THREADS;
static {
DEFAULT_EVENT_LOOP_THREADS = (1, ("", ().availableProcessors() * 2));
if (()) ("-: {}", DEFAULT_EVENT_LOOP_THREADS);
}
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
...
}
//Abstract base class for EventExecutorGroup implementations that handles their tasks with multiple threads at the same time.
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
private final EventExecutor[] children;
private final choicer;
...
//Create a new instance.
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, , args);
}
//Create a new instance.
//@param nThreads, the number of threads that will be used by this instance.
//@param executor, the Executor to use, or null if the default should be used.
//@param chooserFactory, the EventExecutorChooserFactory to use.
//@param args, arguments which will passed to each #newChild(Executor, Object...) call
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) throw new IllegalArgumentException(("nThreads: %d (expected: > 0)", nThreads));
//1. Create ThreadPerTaskExecutor thread executor
if (executor == null) executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
//2. Create NioEventLoop
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
...
//When creating each NioEventLoop, the newChild() method will be called to configure some core parameters for each NioEventLoop.
//Passes in the thread executor to create NioEventLoop
children[i] = newChild(executor, args);
}
//3. Create a thread selector
chooser = (children);
...
}
...
}
The context for creating a NioEventLoopGroup is as follows:
new NioEventLoopGroup() //Thread group, the default number of threads is 2 * Number of CPU cores
new ThreadPerTaskExecutor() //Create a thread executor, which is responsible for creating the thread corresponding to NioEventLoop
for(...) { newChild() } //Construct NioEventLoop and create NioEventLoop thread group
() //Thread selector, used to assign a NioEventLoop thread to each new connection
(4) Create a thread executor ThreadPerTaskExecutor
The function of ThreadPerTaskExecutor is: every time it calls its execute() method to execute a Runnable task, a thread will be created through(), and then the Runnable task to be executed is passed into the thread for execution.
The member variable threadFactory is constructed by the newDefaultThreadFactory() method when passing it to the ThreadPerTaskExecutor's constructor, which is a DefaultThreadFactory object.
Therefore, when the thread executor ThreadPerTaskExecutor creates a thread through(), it actually calls the newThread() method of DefaultThreadFactory.
The thread entity created by the() method is the FastThreadLocalThread object optimized by Netty. This thread entity is faster than JDK when operating ThreadLocal.
ThreadPerTaskExecutor thread executor summary:
1. Every time the execution() method of ThreadPerTaskExecutor is executed, a thread entity of FastThreadLocalThread is created, so Netty's thread entity is created by ThreadPerTaskExecutor.
2. The naming rules of FastThreadLocalThread thread entity are: nioEventLoop-self-increasing thread pool number-self-increasing thread number.
//Abstract base class for EventExecutorGroup implementations that handles their tasks with multiple threads at the same time.
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
private final EventExecutor[] children;
private final choicer;
...
//Create a new instance.
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) throw new IllegalArgumentException(("nThreads: %d (expected: > 0)", nThreads));
//1. Create ThreadPerTaskExecutor thread executor
if (executor == null) executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
//2. Create NioEventLoop
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
...
//When creating each NioEventLoop, the newChild() method will be called to configure some core parameters for each NioEventLoop.
//Passes in the thread executor to create NioEventLoop
children[i] = newChild(executor, args);
}
//3. Create a thread selector
chooser = (children);
...
}
protected ThreadFactory newDefaultThreadFactory() {
//getClass() is to get the object type to which the method belongs, that is, the NioEventLoopGroup type
//Because it is called here layer by layer through the NioEventLoopGroup constructor
return new DefaultThreadFactory(getClass());
}
...
}
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) throw new NullPointerException("threadFactory");
= threadFactory;
}
@Override
public void execute(Runnable command) {
//Calling the newThread() method of DefaultThreadFactory to execute the Runnable task
(command).start();
}
}
//A ThreadFactory implementation with a simple naming rule.
public class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolId = new AtomicInteger();
private final AtomicInteger nextId = new AtomicInteger();
private final boolean daemon;
private final int priority;
protected final ThreadGroup threadGroup;
...
public DefaultThreadFactory(Class<?> poolType) {
this(poolType, false, Thread.NORM_PRIORITY);
}
public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) {
//toPoolName() method will turn the first letter of NioEventLoopGroup into lowercase
this(toPoolName(poolType), daemon, priority);
}
public DefaultThreadFactory(String poolName, boolean daemon, int priority) {
this(poolName, daemon, priority,
() == null? ().getThreadGroup() : ().getThreadGroup());
}
public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) {
...
//Prefix used to mark thread name prefix
prefix = poolName + '-' + () + '-';
= daemon;
= priority;
= threadGroup;
}
@Override
public Thread newThread(Runnable r) {
Thread t = newThread(new DefaultRunnableDecorator(r), prefix + ());
if (()) {
if (!daemon) (false);
} else {
if (daemon) (true);
}
if (() != priority) (priority);
return t;
}
protected Thread newThread(Runnable r, String name) {
return new FastThreadLocalThread(threadGroup, r, name);
}
...
}
(5) Create NioEventLoop
Note 1:
From the MultithreadEventExecutorGroup construction method, we can see that Netty will use the for loop + newChild() method to create nThreads NioEventLoop, and a NioEventLoop corresponds to a thread entity FastThreadLocalThread.
//Abstract base class for EventExecutorGroup implementations that handles their tasks with multiple threads at the same time.
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
private final EventExecutor[] children;
private final choicer;
...
//Create a new instance.
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) throw new IllegalArgumentException(("nThreads: %d (expected: > 0)", nThreads));
//1. Create ThreadPerTaskExecutor thread executor
if (executor == null) executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
//2. Create NioEventLoop
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
...
//When creating each NioEventLoop, the newChild() method will be called to configure some core parameters for each NioEventLoop.
//Passes in the thread executor to create NioEventLoop
children[i] = newChild(executor, args);
}
//3. Create a thread selector
chooser = (children);
...
}
//Create a new EventExecutor which will later then accessible via the #next() method.
//This method will be called for each thread that will serve this MultithreadEventExecutorGroup.
protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;
...
}
Note 2:
The newChild() abstract method of MultithreadEventExecutorGroup is implemented by NioEventLoopGroup, so its newChild() method will also be executed when executing the default constructor of NioEventLoopGroup.
The newChild() method of NioEventLoopGroup needs to pass an executor parameter, which is the thread executor created at the beginning of the NioEventLoopGroup construction method. After that, the newChild() method will return a newly created NioEventLoop object.
//MultithreadEventLoopGroup implementations which is used for NIO Selector based Channels.
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
...
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
//executor is the thread executor created when executing the NioEventLoopGroup constructor method at the beginning of the ThreadPerTaskExecutor
//This refers to NioEventLoopGroup, which means which NioEventLoopGroup is the newly created NioEventLoop object belongs to.
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
...
}
Description Three:
When creating a NioEventLoop object, the NioEventLoop constructor will create a Selector by calling its openSelector() method, so a Selector is bound to a NioEventLoop, and a Selector can bind multiple connections together to be responsible for listening to read and write events of these connections.
In the openSelector() method of NioEventLoop, Netty will optimize the data structure of the Selector by reflection (Hash Set => array).
//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
//The NIO Selector.
Selector selector;
private final SelectorProvider provider;
private final SelectStrategy selectStrategy;
...
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
//Calling the constructor of its parent class SingleThreadEventLoop
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) throw new NullPointerException("selectorProvider");
if (strategy == null) throw new NullPointerException("selectStrategy");
= selectorProvider;
= openSelector();//Create a Selector
= strategy;
}
private Selector openSelector() {
final Selector selector;
try {
selector = ();
...
} catch(IOException e) {
...
}
...
return selector;
}
...
}
Description 4:
The NioEventLoop constructor also calls the constructor of the parent class SingleThreadEventExecutor of its parent class. There are two key operations in the construction method of SingleThreadEventExecutor: one is to save the thread executor, because it must be used when creating the thread corresponding to NioEventLoop. The second is to create an MPSC task queue, because the essence of all asynchronous execution in Netty is coordinated through this task queue.
//Abstract base class for EventLoops that execute all its submitted tasks in a single thread.
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
private final Queue<Runnable> tailTasks;
...
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedExecutionHandler) {
//Calling the constructor of its parent class SingleThreadEventExecutor
super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
//Calling the newTaskQueue() method of the parent class SingleThreadEventExecutor
tailTasks = newTaskQueue(maxPendingTasks);
}
...
}
//Abstract base class for OrderedEventExecutor's that execute all its submitted tasks in a single thread.
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
private final boolean addTaskWakesUp;
private final Executor executor;
private final int maxPendingTasks;
private final Queue<Runnable> taskQueue;
private final RejectedExecutionHandler rejectedExecutionHandler;
...
//Create a new instance
protectedSingleThreadEventExecutor(EventExecutorGroup parent, ThreadFactory threadFactory,
boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp, maxPendingTasks, rejectedHandler);
}
//Create a new instance
//@param parent, the EventExecutorGroup which is the parent of this instance and belongs to it
//@param executor, the Executor which will be used for executing
//@param addTaskWakesUp, true if and only if invocation of #addTask(Runnable) will wake up the executor thread
//@param maxPendingTasks, the maximum number of pending tasks before new tasks will be rejected.
//@param rejectedHandler, the RejectedExecutionHandler to use.
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
super(parent);
= addTaskWakesUp;
= (16, maxPendingTasks);
//Key operation 1: Save the thread executor
= (executor, "executor");
//Key Operation 2: Create an MPSC task queue
= newTaskQueue();
= (rejectedHandler, "rejectedHandler");
}
//Create a new Queue which will holds the tasks to execute.
//NioEventLoop will rewrite this newTaskQueue() method
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
}
...
}
//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
...
//Create an MPSC task queue
@Override
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
//This event loop never calls takeTask()
return (maxPendingTasks);
}
...
}
The MPSC queue is a multi-producer single-consumer queue. A single consumer refers to a thread corresponding to a NioEventLoop (the thread that executes its run() method). Multi-producers are threads other than the thread corresponding to this NioEventLoop, which is usually our business thread. For example, some threads can call them at will without considering thread safety when calling the writeAndFlush() method, so these threads are multi-producers.
MPSC queues are implemented through the JCTools toolkit, and Netty's high performance is largely due to this toolkit. The full name of MPSC is Muti Producer Single Consumer. Muti Producer corresponds to external threads, and Single Consumer corresponds to Netty's NioEventLoop thread. When an external thread executes some Netty tasks, if it is determined that it is not executed by the thread corresponding to NioEventLoop, it will be directly placed in a task queue and then executed by a thread corresponding to NioEventLoop.
Create a NioEventLoop summary:
The newChild() method of NioEventLoopGroup does three things when creating a NioEventLoop: 1. Create a Selector to poll the connection registered to the NioEventLoop, 2. Create an MPSC task queue, and 3. Save the thread executor to the NioEventLoop.
(6) Create thread selector EventExecutorChooser
Note 1:
In traditional BIO programming, after a new connection is created, it is usually necessary to bind a Selector to the connection, and then the entire life cycle of the connection is managed by this Selector.
Note 2:
A Selector is created when creating a NioEventLoop, so a Selector corresponds to a NioEventLoop, and a Selector is available on a NioEventLoop. The function of the thread selector is to select a NioEventLoop in the NioEventLoop Group for a connection, thereby binding the connection to the Selector of this NioEventLoop.
Description Three:
According to the MultithreadEventExecutorGroup construction method, the newChooser() method of DefaultEventExecutorChooserFactory is used to create a thread selector. After creating the thread selector EventExecutorChooser, you can obtain a NioEventLoop through its next() method.
Netty creates different thread selectors by judging whether the number of NioEventLoops in the NioEventLoopGroup is a power of 2. But no matter which selector it is, the final effect is to traverse the first NioEventLoop to the last NioEventLoop, and then start from the first NioEventLoop, and loop in this way.
//Abstract base class for EventExecutorGroup implementations that handles their tasks with multiple threads at the same time.
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
private final EventExecutor[] children;
private final choicer;
...
//Create a new instance.
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, , args);
}
//Create a new instance.
//@param nThreads, the number of threads that will be used by this instance.
//@param executor, the Executor to use, or null if the default should be used.
//@param chooserFactory, the EventExecutorChooserFactory to use.
//@param args, arguments which will passed to each #newChild(Executor, Object...) call
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) throw new IllegalArgumentException(("nThreads: %d (expected: > 0)", nThreads));
//1. Create ThreadPerTaskExecutor thread executor
if (executor == null) executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
//2. Create NioEventLoop
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
...
//When creating each NioEventLoop, the newChild() method will be called to configure some core parameters for each NioEventLoop.
//Passes in the thread executor to create NioEventLoop
children[i] = newChild(executor, args);
}
//3. Create a thread selector, and the chooserFactory is the passed DefaultEventExecutorChooserFactory instance
chooser = (children);
...
}
...
}
//Default implementation which uses simple round-robin to choose next EventExecutor.
@UnstableApi
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {
public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();
private DefaultEventExecutorChooserFactory() { }
@SuppressWarnings("unchecked")
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo()) {
//If the number of NioEventLoops is a power of 2, then perform bit and operation
return new PowerOfTowEventExecutorChooser(executors);
} else {
//If the number of NioEventLoop is not a power of 2, normal modulus operation is performed
return new GenericEventExecutorChooser(executors);
}
}
private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
}
private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
PowerOfTowEventExecutorChooser(EventExecutor[] executors) {
= executors;
}
@Override
public EventExecutor next() {
return executors[() & - 1];
}
}
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {
= executors;
}
@Override
public EventExecutor next() {
return executors[(() % )];
}
}
}
Description 4:
The last step in creating a NioEventLoopGroup is to create a thread selector chooser. The process of creating a thread selector is as follows:
() //Create the entrance to the thread selector. The function of the choicer is to bind a NioEventLoop for the new connection.
() //Judge whether the number of NioEventLoops is a power of 2
PowerOfTowEventExecutorChooser //Optimization
index++ & (length - 1) //Bits and operations
GenericEventExecutorChooser //Normal
abs(index++ % length) //Mode operation
(7) Summary of the creation of NioEventLoopGroup
By default, NioEventLoopGroup creates 2x CPU cores for several NioEventLoops. A NioEventLoop corresponds to a Selector and an MPSC task queue one by one.
The naming rule of NioEventLoop thread is nioEventLoopGroup-xx-yy, where xx represents the global xxth NioEventLoopGroup thread pool, and yy represents that this NioEventLoop belongs to the yyth one in this NioEventLoopGroup.
The function of the thread selector chooses a NioEventLoop for a connection, and a NioEventLoop can be returned through the next() method of the thread selector. If the number of NioEventLoops is a power of 2, the next() method will be optimized using bits and operations.
A NioEventLoopGroup will have a thread executor, a thread selector chooser, and an array children store 2 times the CPU cores and several NioEventLoops.
Startup
(1) Start the two major entrances to NioEventLoop
(2) Determine whether the current thread is a NioEventLoop thread
(3) Create a thread and start
(4) NioEventLoop startup summary
(1) Start the two major entrances to NioEventLoop
Entry 1: When the server starts, register the server channel to the Selector
Entry 2: When connecting to a new connection, when binding a NioEventLoop through a choicer
Let’s look at the entrance one:
Calling the bind() method of ServerBootstrap will actually call the doBind() method of AbstractBootstrap, and then calling the initAndRegister() method of AbstractBootstrap, and then executing the config().group().register(channel) registration server channel. Finally, the register() method will be called layer by layer to start a NioEventLoop to register the server channel to the Selector.
//Bootstrap sub-class which allows easy bootstrap of ServerChannel
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
...
...
}
//AbstractBootstrap is a helper class that makes it easy to bootstrap a Channel.
//It supports method-chaining to provide an easy way to configure the AbstractBootstrap.
//When not used in a ServerBootstrap context, the #bind() methods are useful for connectionless transports such as datagram (UDP).
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
...
//Create a new Channel and bind it.
public ChannelFuture bind(int inetPort) {
//First create an InetSocketAddress object based on the port number, and then call the overloaded method bind()
return bind(new InetSocketAddress(inetPort));
}
//Create a new Channel and bind it.
public ChannelFuture bind(SocketAddress localAddress) {
//Verify necessary parameters required for service startup
validate();
if (localAddress == null) throw new NullPointerException("localAddress");
return doBind((localAddress, "localAddress"));
}
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();//1. Initialize and register Channel
final Channel channel = ();
...
doBind0(regFuture, channel, localAddress, promise);//2. Bind the server port
...
return promise;
}
final ChannelFuture initAndRegister() {
Channel channel = null;
...
//1. Create a server channel
channel = ();
//2. Initialize the server channel
init(channel);
...
//3. Register the server channel, for example, register through the NioEventLoopGroup register() method
ChannelFuture regFuture = config().group().register(channel);
...
return regFuture;
}
...
}
//Bootstrap sub-class which allows easy bootstrap of ServerChannel
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
...
@Override
public final ServerBootstrapConfig config() {
return config;
}
...
}
public abstract class AbstractBootstrapConfig<B extends AbstractBootstrap<B, C>, C extends Channel> {
protected final B bootstrap;
...
protected AbstractBootstrapConfig(B bootstrap) {
= (bootstrap, "bootstrap");
}
//Returns the configured EventLoopGroup or null if non is configured yet.
public final EventLoopGroup group() {
//For example, return a NioEventLoopGroup object
return ();
}
...
}
//MultithreadEventLoopGroup implementations which is used for NIO Selector based Channels.
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
...
...
}
//Abstract base class for EventLoopGroup implementations that handles their tasks with multiple threads at the same time.
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
...
@Override
public ChannelFuture register(Channel channel) {
//First get a NioEventLoop through the next() method, and then register the server Channel through the () method
return next().register(channel);
}
@Override
public EventLoop next() {
return (EventLoop) ();
}
...
}
//Abstract base class for EventExecutorGroup implementations that handles their tasks with multiple threads at the same time.
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
private final choicer;
...
@Override
public EventExecutor next() {
//Select a NioEventLoop through the thread selector chooser
return ();
}
...
}
//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
...
...
}
//Abstract base class for EventLoops that execute all its submitted tasks in a single thread.
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
...
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
(promise, "promise");
//Calling the register() method of AbstractUnsafe
().unsafe().register(this, promise);
return promise;
}
...
}
//A skeletal Channel implementation.
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
private volatile EventLoop eventLoop;
...
//Unsafe implementation which sub-classes must extend and use.
protected abstract class AbstractUnsafe implements Unsafe {
...
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
//Bind an event looper, that is, bind a NioEventLoop to the Channel
= eventLoop;
//Register Selector and start a NioEventLoop
if (()) {
register0(promise);
} else {
...
//Change this NioEventLoop thread and call register0() method to register this server channel to the Selector
//In fact, the execution() method of SingleThreadEventExecutor is executed
(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
...
}
}
}
...
}
(2) Determine whether the current thread is a NioEventLoop thread
Calling the inEventLoop() method of NioEventLoop can determine whether the current thread is Netty's Reactor thread, that is, the thread entity corresponding to NioEventLoop. After the thread entity of NioEventLoop is created, the thread entity will be saved to the member variable thread of the NioEventLoop parent class.
When the server starts and registers the server Channel to the Selector, and executes the () code in the () method, the main thread corresponding to the main method will be passed in and compared. Since the value is not assigned at this time, it is empty, so the inEventLoop() method returns false, so the() code will be executed to create a thread and start it.
//SingleThreadEventLoop implementation which register the Channel's
//to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
...
...
}
//Abstract base class for EventLoops that execute all its submitted tasks in a single thread.
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
...
...
}
//Abstract base class for OrderedEventExecutor's that execute all its submitted tasks in a single thread.
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
...
...
}
//Abstract base class for EventExecutors that want to support scheduling.
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
...
...
}
//Abstract base class for {@link EventExecutor} implementations.
public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {
...
@Override
public boolean inEventLoop() {
// When registering the server channel, it is registered through the main thread. () corresponds to the main thread
//Calling the() method
return inEventLoop(());
}
...
}
//Abstract base class for OrderedEventExecutor's that execute all its submitted tasks in a single thread.
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
private volatile Thread thread;
...
@Override
public boolean inEventLoop(Thread thread) {
return thread == ;//The thread has not been created yet, it is null
}
...
}
(3) Create a thread and start
When the () method is preparing to register the server channel to the Selector, the code executing () in the judgment condition is found to be false, so the code is executed () code to create a thread and start it to execute the registration task. Execution() code is actually calling the execute() method of SingleThreadEventExecutor.
//Abstract base class for EventLoops that execute all its submitted tasks in a single thread.
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
...
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
(promise, "promise");
//Call the register() method of AbstractUnsafe and pass NioEventLoop into it as a parameter
().unsafe().register(this, promise);
return promise;
}
...
}
//A skeletal Channel implementation.
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
private volatile EventLoop eventLoop;
...
//Unsafe implementation which sub-classes must extend and use.
protected abstract class AbstractUnsafe implements Unsafe {
...
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
//Bind an event looper, that is, bind a NioEventLoop to the Channel
= eventLoop;
//Register Selector and start a NioEventLoop
if (()) {
register0(promise);
} else {
...
//Change this NioEventLoop thread and call register0() method to register this server channel to the Selector
//In fact, the execution() method of SingleThreadEventExecutor is executed
(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
...
}
}
}
...
}
//Abstract base class for OrderedEventExecutor's that execute all its submitted tasks in a single thread.
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
private volatile Thread thread;
//When creating NioEventLoop, it will be passed into the thread executor of NioEventLoopGroup through the constructor method.
private final Executor executor;
...
@Override
public void execute(Runnable task) {
if (task == null) throw new NullPointerException("task");
boolean inEventLoop = inEventLoop();
//Judge whether the current thread is Netty's Reactor thread
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) reject();
}
if (!addTaskWakesUp && wakesUpForTask(task)) wakeup(inEventLoop);
}
private void startThread() {
//Judge whether the Reactor thread has been started; if it has not been started, the thread is started by calling the doStartThread() method through CAS
if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
doStartThread();
}
}
}
private void doStartThread() {
assert thread == null;
//() method will create a FastThreadLocalThread thread to execute Runnable tasks
//So in the run() method of Runnable, () refers to the FastThreadLocalThread thread
(new Runnable() {
@Override
public void run() {
//() refers to the FastThreadLocalThread thread
thread = ();
...
();//Start the thread
...
}
});
}
//The specific run() method is implemented by subclasses such as NioEventLoop
protected abstract void run();
...
}
The execution() method of SingleThreadEventExecutor is explained as follows:
1. This method may also be used by user code, such as ().execute(task). Therefore, the inEventLoop() method is called in the execute() method to make an external thread judgment to ensure that there are no thread problems when executing the task task.
2. If the current thread is not Netty's Reactor thread, call the startThread() method to start a Reactor thread. In the startThread() method, we will first determine whether the Reactor thread entity corresponding to the current NioEventLoop has been started. If it is not started, the thread is started by calling the doStartThread() method after setting CAS successfully.
3. When executing the doStartThread() method, the execute() method of the executor internal member variable of NioEventLoop will be called. The executor is the thread executor ThreadPerTaskExecutor. Its function is to create a thread to execute every time the Runnable task is executed. That is, the() method will create a FastThreadLocalThread thread to execute the Runnable task through the newThread() method of DefaultThreadFactory.
The Runnable task of the 4.doStartThread() method will be executed by a FastThreadLocalThread thread. In the run() method of the Runnable task, the FastThreadLocalThread object created by ThreadPerTaskExecutor will be saved into the member variable thread of SingleThreadEventExecutor, and then the run() method of SingleThreadEventExecutor is called.
//Abstract base class for EventExecutorGroup implementations that handles their tasks with multiple threads at the same time.
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
private final EventExecutor[] children;
private final choicer;
...
//Create a new instance.
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) throw new IllegalArgumentException(("nThreads: %d (expected: > 0)", nThreads));
//1. Create ThreadPerTaskExecutor thread executor
if (executor == null) executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
//2. Create NioEventLoop
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
...
//When creating each NioEventLoop, the newChild() method will be called to configure some core parameters for each NioEventLoop.
//Passes in the thread executor to create NioEventLoop
children[i] = newChild(executor, args);
}
//3. Create a thread selector
chooser = (children);
...
}
protected ThreadFactory newDefaultThreadFactory() {
//getClass() is to get the object type to which the method belongs, that is, the NioEventLoopGroup type
//Because it is called here layer by layer through the NioEventLoopGroup constructor
return new DefaultThreadFactory(getClass());
}
...
}
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) throw new NullPointerException("threadFactory");
= threadFactory;
}
@Override
public void execute(Runnable command) {
//Calling the newThread() method of DefaultThreadFactory to execute the Runnable task
(command).start();
}
}
//A ThreadFactory implementation with a simple naming rule.
public class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolId = new AtomicInteger();
private final AtomicInteger nextId = new AtomicInteger();
private final boolean daemon;
private final int priority;
protected final ThreadGroup threadGroup;
...
public DefaultThreadFactory(Class<?> poolType) {
this(poolType, false, Thread.NORM_PRIORITY);
}
public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) {
//toPoolName() method will turn the first letter of NioEventLoopGroup into lowercase
this(toPoolName(poolType), daemon, priority);
}
public DefaultThreadFactory(String poolName, boolean daemon, int priority) {
this(poolName, daemon, priority,
() == null ? ().getThreadGroup() : ().getThreadGroup());
}
public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) {
...
//Prefix used to mark thread name prefix
prefix = poolName + '-' + () + '-';
= daemon;
= priority;
= threadGroup;
}
@Override
public Thread newThread(Runnable r) {
Thread t = newThread(new DefaultRunnableDecorator(r), prefix + ());
if (()) {
if (!daemon) (false);
} else {
if (daemon) (true);
}
if (() != priority) (priority);
return t;
}
protected Thread newThread(Runnable r, String name) {
return new FastThreadLocalThread(threadGroup, r, name);
}
...
}
How is NioEventLoop bound to a thread entity?NioEventLoop creates a FastThreadLocalThread through the thread executor ThreadPerTaskExecutor, and then saves the FastThreadLocalThread thread to its member variable, thereby binding to a thread entity.
(4) NioEventLoop startup summary
1. During the process of registering the server channel, the main thread will eventually call the register() method of AbstractUnsafe. This method will first bind a NioEventLoop to the server channel, and then encapsulate the logic of the actual registration Selector into a Runnable task, and then call the execution() method of NioEventLoop to execute the Runnable task.
2. The execution() method of NioEventLoop is actually the execution() method of its parent class SingleThreadEventExecutor. It will first determine whether the thread currently calling the execute() method is Netty's Reactor thread. If not, call the startThread() method to create a Reactor thread.
3. The startThread() method will create a thread through the execute() method of the thread executor ThreadPerTaskExecutor. This thread is a FastThreadLocalThread thread. This thread needs to execute the following logic: save the thread to the member variable thread of NioEventLoop, and then call the run() method of NioEventLoop to start NioEventLoop.
The startup process of NioEventLoop is as follows:
bind() -> initAndRegister() -> config().group().register() -> () //Entrance
startThread() -> doStartThread() //Create thread
() //Thread executor creates a FastThreadLocalThread thread
thread = () //Save FastThreadLocalThread thread into member variable of NioEventLoop
() //Start NioEventLoop
The NioEventLoop startup process description is as follows:
First, the bind() method will encapsulate the operation of the specific binding port into a Runnable task, and then call the execute() method of NioEventLoop. Then Netty will determine whether the thread calling the execute() method is an NIO thread. If it is found that it is not, it will call the startThread() method to start creating the thread.
The creation thread is created through the thread executor ThreadPerTaskExecutor. The function of a thread executor is to create a thread for each task execution, and the created thread is a FastThreadLocalThread thread at the bottom of NioEventLoop.
After creating the FastThreadLocalThread thread, a Runnable task will be executed. The Runnable task will first save the thread to the NioEventLoop object. The purpose of saving is to determine whether the subsequent execution thread of NioEventLoop is itself. If you don't put a packaged task into TaskQueue for serial execution, it will achieve thread safety. The Runnable task will then call the run() method of NioEventLoop, thereby starting NioEventLoop. The run() method of NioEventLoop is the core method that drives Netty operation.