Java Network Programming
Early Java APIs only supported so-called blocking functions provided by the local system socket libraries. The following code shows a common example of server code using the traditional Java APIs
// Create a ServerSocket to listen for connection requests on the specified port.
ServerSocket serverSocket = new ServerSocket(5000); // Calls to the accept method will block until a connection is established.
// Calls to the accept method will block until a connection is established.
Socket clientSocket = (); // These stream objects are derived from the stream object.
// These stream objects are derived from the socket's stream object.
BufferedReader in = new BufferedReader(new InputStreamReader(())); // These stream objects are derived from the stream object of the socket.
PrintWriter out = new PrintWriter(((), true); String request, response; // Stream objects are derived from the socket's stream object.
String request, response.
// If the client sends "Done", exit the loop.
while ((request = ()) ! = null) {
if ("Done".equals(request)) {
break; }
}
// The request is passed to the server's processor
response = processRequest(request); // The server's response is sent to the client.
// The server's response is sent to the client
(response); // The server's response is sent to the client.
}
This code can only handle one connection at a time, and to manage multiple clients, you have to create a new Thread for each new client Socket, so let's consider the implications of this scenario:
- A large number of threads are dormant at any given time, resulting in a waste of resources
- Memory needs to be allocated for each thread's call stack.
- Context switching of threads incurs an overhead
This concurrency scheme is still ideal for small to medium sized numbers of clients, but does not support larger concurrencies very well
Java NIO
NIO (Non-blocking I/O, also known as New I/O), is a synchronous non-blocking I/O model and the basis for I/O multiplexing. Traditional IO flows are blocking, which means that when a thread invokes a read or write operation, the thread is blocked until the data has been completely read or written. the non-blocking model of NIO allows a thread to perform a read or write operation without doing so if there is no data available at the moment, rather than keeping the thread blocked, so that the thread can continue to do other things until the data is ready
class is the key to Java's non-blocking IO implementation. It uses an event notification API to determine which of a set of non-blocking sockets are ready and able to perform IO-related operations. Because the completion of a read or write operation can be checked at any point in time, a single thread can handle multiple concurrent connections!
This model provides better resource management than the blocking IO model:
- Use fewer threads to handle many connections, reducing the overhead of memory management and context switching
- Threads can also be used for other tasks when there are no IO operations to be processed
Reactor threading model
Reactor is an event-driven model for concurrently processing client requests and responses. The server uses a multiplexing strategy after receiving a client request to receive all client requests asynchronously through a non-blocking thread, and forwards these requests to the relevant group of worker threads for processing.
Reactor models are often implemented based on asynchronous threads, and there are three common Reactor threading models: Reactor single-threaded model, Reactor multi-threaded model, and Reactor master-standby multi-threaded model.
1. Reactor single-threaded model
The Reactor single-threaded model means that all client IO requests are done on the same thread.The components and responsibilities of the Reactor single-threaded model are shown in the following diagram
- Client: NIO client that initiates a TCP connection to the server and sends data.
- Acceptor: NIO server that receives TCP connections from clients through the Acceptor.
- Dispatcher: Receives data from the client and sends it to the corresponding codec in the form of a ByteBuffer.
- DecoderHandler: decoder, reads data from the client and performs data decoding and processing and message answering.
- EncoderHandler: an encoder that uniformly encodes the data sent to the client (message request or message response) and writes it to the channel
Since the Reactor model uses asynchronous non-blocking IO, a single thread can handle multiple IO-related operations independently. the Reactor single-threaded model centralizes all IO operations in a single thread, with the following processing flow:
- The Acceptor receives TCP connection request messages from clients.
- After the link is successfully established, the Dispatcher writes the received message into the ByteBuffer and dispatches it to the corresponding DecoderHandler to decode and process the message.
- Call the corresponding EncoderHandler after the message is processed to encode and distribute the response message corresponding to the request.
2. Reactor multi-threaded model
The biggest difference between the Reactor multithreaded model and the single-threaded model is that it uses a ThreadPoll to handle the client's IO requests.The Reactor multithreaded model is shown in the following figure
3. Reactor master-backup multithreaded model
In the Reactor master-standby multithreaded model, the server side is no longer a NIO thread but a separate pool of NIO threads used to receive client connections. After the main thread Acceptor receives the client's TCP connection request and establishes the connection (it may have to go through the process of authentication, login, and so on), it registers the newly created SocketChannel to one of the I/O threads in the Sub Reactor Pool, and then it is responsible for the specific SocketChannel reading/writing, coding/decoding, and business processing work. and business processing. In this way, the establishment of the client connection and the response to the message are realized in an asynchronous thread, which greatly improves the throughput of the system.Reactor main backup multi-threaded model is shown in Figure
Netty Overview
Netty is a high-performance, asynchronous event-driven NIO framework that provides support for TCP, UDP, and file transfers based on the API implementation provided by Java NIO. All Netty IO operations are asynchronous and non-blocking, and the results of IO operations can be obtained by the user either actively or through a notification mechanism using the Future-Listener mechanism.
The key features of the Netty architecture design are as follows:
- IO multiplexing model: Netty multiplexes IO by encapsulating Selector within NioEventLoop.
- Zero-copy data: Netty's data is received and sent using direct memory for socket reads and writes, which greatly improves system performance.
- Memory Reuse Mechanism: Allocating and reclaiming direct memory is a time-consuming operation, and in order to reuse buffers as much as possible, Netty provides a buffer reuse mechanism based on memory pools
- Lock-free mechanism: Netty internally adopts a serialized lock-free design idea for IO operations. In the specific use of the process can be adjusted in the NIO thread pool thread parameters, start multiple serialized threads running in parallel, this local lock-free serial multi-threaded design than a queue combined with a number of worker thread model of performance is better!
- High-performance serialization framework: Netty is based on ProtoBuf to serialize data by default. By extending Netty's codec interface, users can implement a custom serialization framework.
Netty Core Components
- Bootstrap/ServerBootstrap: Bootstrap is used for client-side service bootstrapping, ServerBootstrap is used for server-side service bootstrapping.
- NioEventLoop: performs event operations based on thread queuing. Specific event operations to be performed include connection registration, port binding, IO data reading and writing, etc. Each NioEventLoop thread is responsible for the event processing of multiple channels. Each NioEventLoop thread is responsible for the event processing of multiple channels.
- NioEventLoopGroup: NioEventLoop Lifecycle Management
- Future/ChannelFuture: Future and ChannelFuture are used for asynchronous communication, based on the asynchronous communication method, you can register a listening event after the IO operation is triggered, and then automatically trigger the listening event after the IO operation is completed and complete the subsequent operations.
- Channel: Channel is a network communication component in Nettty, which is used to perform specific IO operations.All data communication in Nettty is based on channel reading or writing data to the corresponding channel.The main functions of channel include establishment of network connection, management of connection status (opening and closing of network connection), The main functions of a channel include network connection establishment, connection state management (opening and closing of network connections), configuration of network connection parameters (size of each received data), network data operations based on asynchronous NIO (data reading, data writing), and so on.
- Selector: Selector is used to manage channels in multiplexing. In Netty, a Selector can manage multiple Channels, register the connection to the Selector after the channel connection is established, and the Selector listens to the IO events on each Channel internally, and notifies the ChannelHandler to perform specific IO operations
- ChannelHandlerContext: the management of Channel context information. Each ChannelHandler corresponds to a ChannelHandlerContext.
- ChannelHandler: IO event interception and processing. Among them, ChannelInboundHandler is used to handle the IO operation of data receiving, and ChannelOutboundHandler is used to handle the IO operation of data sending.
- ChannelPipeline: event interception processing and forwarding based on the interceptor design pattern.Each Channel in Netty corresponds to a ChannelPipeline, which maintains a bi-directional list of ChannelHandlerContexts, each of which corresponds to a ChannelHandler to accomplish the interception and processing of specific Channel events. Each ChannelHandlerContext corresponds to a ChannelHandler to accomplish the interception and processing of specific channel events. Among them, inbound data is passed and processed sequentially from Head to Tail, and outbound data is passed and processed sequentially from Tail to Head.
Netty Principles
1. Netty Server initialization process
- Initialize BossGroups and WorkerGroups
- Configure EventLoopGroup based on ServerBootstrap, including connection parameter settings, channel type settings, codec handler settings, etc.
- Binding ports and service startup
public static void main(String[] args) {
// 1:establish BossGroup cap (a poem) WorkerGroup
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
final ServerBootstrap serverBootstrap = new ServerBootstrap();
// 2:configureNioEventLoopGroup
serverBootstrap
.group(bossGroup, workerGroup)
.channel() // set up channel The type of the NIO
.option(ChannelOption.SO_BACKLOG, 1024) // set up BACKLOG The size of the 1024
.childOption(ChannelOption.SO_KEEPALIVE,true) // Enable heartbeat detection mechanism
.childOption(ChannelOption.TCP_NODELAY,true) // set up数据包无延迟
// set up Channel The type of the NioSocketChannel
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
// configure解码器为 MessageDecoder resemble
().addLast("decoder", new MessageDecoder());
// configure编码器为 MessageEncoder resemble
().addlast("encoder", new MessageEncoder());
}
});
// 3:绑定端口cap (a poem)服务启动
int port = 9000;
(port).addlistener(future -> {
if(()) {
("server start up on port:" + port);
} else {
("server start up failed");
}
});
}
2. Netty workflow
- Netty abstracts two groups of thread pools, BossGroup and WorkerGroup, with BossGroup specializing in receiving client connections and WorkerGroup specializing in reading and writing to the network.
- BossGroup and WorkerGroup types are both NioEventLoopGroups.
- NioEventLoopGroup is equivalent to an event loop thread group, which contains multiple event loop threads, each of which is the NioEventLoop
- Each NioEventLoop has a selector that listens for network communication on the socketChannel registered to it.
- Steps that are looped inside each BossNioEventLoop thread:
- Handles the accept event, establishes a connection with the client, and generates a NioSocketChannel.
- Registers the NioSocketChannel with a selector on a worker NIOEventLoop.
- Process the tasks in the task queue, i.e. runAllTasks
- Steps to loop through each worker NIOEventLoop thread
- Polls the read/write events of all NioSocketChannels registered to its selector.
- Handles I/O events, i.e. read/write events, in the corresponding NioSocketChannel.
- runAllTasks handles tasks in the TaskQueue, some time-consuming business processes can be put into the TaskQueue and processed slowly, so as not to affect the flow of data processing in the pipeline.
- Each worker NIOEventLoop handles NioSocketChannel operations using a pipeline, which maintains a number of handler processors to process the data in the channel.
Netty Practical
The use of Netty is divided into two parts: the client side and the server side. The client is used to connect to the server side to report data and receive request instructions issued by the server side and so on. The server is mainly used to receive data from the client and respond to the client's messages according to the protocol.
Define a common message format BaseMessage
public class BaseMessage {
//When the message was created
private Date createTime;
//Time of message reception
private Date receiveTime;
//Message
private String messageContent;
//messagesid
private int messageId;
//an omissionget、set、constructor method
}
Define the message processing tool class MessageUtils
public class MessageUtils {
//commander-in-chief (military) BaseMessage Message Write ByteBuf
public static ByteBuf getByteBuf(BaseMessage baseMessage) throws UnsupportedEncodingException {
byte[] req = (baseMessage).getBytes("UTF-8");
ByteBuf byteBuf = ();
(reg);
return byteBuf;
}
//through (a gap)ByteBufAccess to information in,utilizationUTF-8encoded and parsed asBaseMessagesystem message format
public static BaseMessage getBaseMessage(ByteBuf buf) {
byte[] con = new byte[()];
(con);
try {
String message = new String(con, "UTF8");
BaseMessage baseMessage = (message, );
(new Date());
return baseMessage;
} catch(UnsupportedEncodingException e) {
();
return null;
}
}
}
Defining the NettyServer
public class NettyServer {
private final static Log logger = ();
private int port;
public NettyServer(int port) {
= port;
bind ();
}
private void bind() {
//1:establishBossGroupcap (a poem)WorkerGroup
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
try {
//2:establishServerBootstrap
ServerBootstrap bootstrap = new ServerBootstrap();
(boss, worker);
//3:set upChannelcap (a poem) Option
();
(ChannelOption.SO_BACKLOG, 1024);
(ChannelOption.TCP_NODELAY, true);
(ChannelOption.SO_KEEPALIVE, true);
(new channelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline p = ();
//defineMessageDecoder,For decodingServerIncoming messages and processing
("decoder", new MessageDecoder());
}
});
//4:set up绑定端口号并启动
ChannelFuture channelFuture = (port).sync();
if (()) {
("NettyServer start success, port: " + );
}
//5:set up异步关闭连接
().closeFuture().sync();
} catch(Exception e) {
("NettyServer start fail, exception:" + ());
():
} finally {
//6:优雅退出函数set up
();
();
}
}
public static void main(String[] args) throws InterruptedException {
new NettyServer(9000);
}
}
Define MessageDecoder decoder
public class MessageDecoder extends ChannelHandlerAdapter {
private final static Log logger = ();
// overwritechannelReadmethod and receives the message sent by the client
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//1:Receives and decodes the message sent by the client
ByteBuf buf = (ByteBuf) msg;
BaseMessage message = (buf);
try {
//2:Define the reply message body
BaseMessage responseMessage = new BaseMessage(() + 1, "response from server", new Date());
("send response message for client:" + (responseMessage));
//3:message encoding
ByteBuf byteBuf = (responseMessage);
//4:Messaging,Pass the message through theChannelHandlerContextwriteChannel
(byteBuf);
} catch(UnsupportedEncodingException e) {
();
}
}
@Override//Disconnect Trigger Event
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
("channel removed");
(ctx);
}
@Override//Connection Exception Trigger Event
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
("channel exception");
(ctx, cause);
}
@Override//Connection Registration Trigger Event
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
("channel registered");
(ctx);
}
}
Define NettyClient
public class NettyClient {
private final static Log logger = ();
//Port number of the server
private int port = 9000;//server-side IPaddress
private String host = "localhost";
public NettyClient(String host, int port) throws InterruptedException {
= port;
= host;
start();
}
private void start() throws InterruptedException {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
();
(ChannelOption.SO_KEEPALIVE, true);
(eventLoopGroup);
(host, port);
(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
().addLast(new NettyClientHandler());
}
});
}
}
}
Defining the NettyClientHandler Message Handler
public class NettyClientHandler extends ChannelHandlerAdapter {
private final static Log logger = ();
@Override//After the connection is created,Nettywill automatically invokechannelActivemethodologies
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//Create a message,Send to server
BaseMessage message = new BaseMessage(0, "message from client", new Date());
ByteBuf byteBuf = (message);
(byteBuf);
("send a message for server:" + (message));
}
@Override//Reading server-side messages
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
BaseMessage message = (buf);
("received message form server:" + JSON,toJSONString(message));
}
}