Sticky and half pack
Sticking phenomenon
The problem of sticking packets occurs because you don’t know where the boundary of a user message is. If you know where the boundary is, the receiver can divide the valid user message through the boundary.
Server code
public class HelloWorldServer {
static final Logger log = ();
void start() {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
();
(boss, worker);
(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
().addLast(new LoggingHandler());
().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
("connected {}", ());
(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
("disconnect {}", ());
(ctx);
}
});
}
});
ChannelFuture channelFuture = (8080);
("{} binding...", ());
();
("{} bound...", ());
().closeFuture().sync();
} catch (InterruptedException e) {
("server error", e);
} finally {
();
();
("stoped");
}
}
public static void main(String[] args) {
new HelloWorldServer().start();
}
}
The client code wants to send 10 messages, each of which is 16 bytes.
public class HelloWorldClient {
static final Logger log = ();
public static void main(String[] args) {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
();
(worker);
(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
("connetted...");
().addLast(new ChannelInboundHandlerAdapter() {
@Override
//A Active event will be triggered after the connection channel is successfully established
public void channelActive(ChannelHandlerContext ctx) throws Exception {
("sending...");
Random r = new Random();
char c = 'a';
for (int i = 0; i < 10; i++) {
ByteBuf buffer = ().buffer();
(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
(buffer);
}
}
});
}
});
ChannelFuture channelFuture = ("127.0.0.1", 8080).sync();
().closeFuture().sync();
} catch (InterruptedException e) {
("client error", e);
} finally {
();
}
}
}
A certain output on the server side can be seen to receive 160 bytes at one time, while the expected one is 16 bytes at one time, which is received in 10 times. This shows that the phenomenon of sticking
08:24:46 [DEBUG] [main] - [id: 0x81e0fda5] binding...
08:24:46 [DEBUG] [main] - [id: 0x81e0fda5, L:/0:0:0:0:0:0:0:0:8080] bound...
08:24:55 [DEBUG] [nioEventLoopGroup-3-1] - [id: 0x94132411, L:/127.0.0.1:8080 - R:/127.0.0.1:58177] REGISTERED
08:24:55 [DEBUG] [nioEventLoopGroup-3-1] - [id: 0x94132411, L:/127.0.0.1:8080 - R:/127.0.0.1:58177] ACTIVE
08:24:55 [DEBUG] [nioEventLoopGroup-3-1] - connected [id: 0x94132411, L:/127.0.0.1:8080 - R:/127.0.0.1:58177]
08:24:55 [DEBUG] [nioEventLoopGroup-3-1] - [id: 0x94132411, L:/127.0.0.1:8080 - R:/127.0.0.1:58177] READ: 160B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000010| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000020| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000030| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000040| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000050| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000060| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000070| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000080| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000090| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
+--------+-------------------------------------------------+----------------+
08:24:55 [DEBUG] [nioEventLoopGroup-3-1] - [id: 0x94132411, L:/127.0.0.1:8080 - R:/127.0.0.1:58177] READ COMPLETE
Half pack phenomenon
Half-package refers to the situation where the receiving end receives only part of the data, not the complete data.
The client code hopes to send 1 message, which is 160 bytes, and the code is changed to
ByteBuf buffer = ().buffer();
for (int i = 0; i < 10; i++) {
(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
}
(buffer);
Because the phenomenon is obvious, the server side will modify the receiving buffer, and the other codes will not change.
(ChannelOption.SO_RCVBUF, 10);
A certain output from the server side, you can see that the received message is divided into two sections, such as 20 bytes for the first time and 140 bytes for the second time.
08:43:49 [DEBUG] [main] - [id: 0x4d6c6a84] binding...
08:43:49 [DEBUG] [main] - [id: 0x4d6c6a84, L:/0:0:0:0:0:0:0:0:8080] bound...
08:44:23 [DEBUG] [nioEventLoopGroup-3-1] - [id: 0x1719abf7, L:/127.0.0.1:8080 - R:/127.0.0.1:59221] REGISTERED
08:44:23 [DEBUG] [nioEventLoopGroup-3-1] - [id: 0x1719abf7, L:/127.0.0.1:8080 - R:/127.0.0.1:59221] ACTIVE
08:44:23 [DEBUG] [nioEventLoopGroup-3-1] - connected [id: 0x1719abf7, L:/127.0.0.1:8080 - R:/127.0.0.1:59221]
08:44:24 [DEBUG] [nioEventLoopGroup-3-1] - [id: 0x1719abf7, L:/127.0.0.1:8080 - R:/127.0.0.1:59221] READ: 20B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000010| 00 01 02 03 |.... |
+--------+-------------------------------------------------+----------------+
08:44:24 [DEBUG] [nioEventLoopGroup-3-1] - [id: 0x1719abf7, L:/127.0.0.1:8080 - R:/127.0.0.1:59221] READ COMPLETE
08:44:24 [DEBUG] [nioEventLoopGroup-3-1] - [id: 0x1719abf7, L:/127.0.0.1:8080 - R:/127.0.0.1:59221] READ: 140B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 |................|
|00000010| 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 |................|
|00000020| 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 |................|
|00000030| 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 |................|
|00000040| 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 |................|
|00000050| 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 |................|
|00000060| 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 |................|
|00000070| 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 |................|
|00000080| 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |............ |
+--------+-------------------------------------------------+----------------+
08:44:24 [DEBUG] [nioEventLoopGroup-3-1] - [id: 0x1719abf7, L:/127.0.0.1:8080 - R:/127.0.0.1:59221] READ COMPLETE
Notice:(ChannelOption.SO_RCVBUF, 10) The size of the underlying receiving buffer (i.e., sliding window) affected only determines the smallest unit of netty read. Netty actually reads each time it is an integer multiple of it
Phenomenon analysis
The sticky and semi-package problem that appears here is not a problem with JavaNIO or Netty. It is essentially TCP is a churn protocol and messages have no boundaries.
Sticky bag:
- Phenomenon, send abc def, receive abcdef
- reason
- Application layer: Receiver ByteBuf is set too large (Netty default 1024)
- Sliding window: Assuming that the sender 256 bytes represents a complete message, but because the receiver does not process it in time and the window size is large enough, these 256 bytes will be buffered in the receiver's sliding window. When multiple messages are buffered in the sliding window The text will stick to the wrap
- Nagle algorithm: It causes sticky
Half pack
- Phenomenon, send abcdef, receive abcdef
- reason
- Application layer: Receiver ByteBuf is less than the actual data sent
- Sliding window: Suppose that the receiver has only 128 bytes left in the window and the sender's message size is 256 bytes. At this time, it can't be put down. You can only send the first 128 bytes first, and wait for ack to send the remaining part, which results in half a packet.
- MSS limit: When the sent data exceeds the MSS limit, the data will be sent in segments, resulting in half a packet.
Solution
Let’s take a look at how Netty solves the above problems:
- Short link, send a packet to establish a connection, so that the connection is established between the connection and the connection is disconnected, the boundary of the message is too low.
- Each message is a fixed length, and the disadvantages are wasteful space
- Each message uses a delimiter, for example \n, and the disadvantages need to be escaped.
- Each message is divided into head and body, and the length of the body is included in the head
Method 1: Short link (extremely not recommended)
Taking the solution to sticky bags as an example
public class HelloWorldClient {
static final Logger log = ();
public static void main(String[] args) {
// Send 10 times in 10 times
for (int i = 0; i < 10; i++) {
send();
}
}
private static void send() {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
();
(worker);
(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
("conneted...");
().addLast(new LoggingHandler());
().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
("sending...");
ByteBuf buffer = ().buffer();
(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
(buffer);
// Close it after sending
();
}
});
}
});
ChannelFuture channelFuture = ("localhost", 8080).sync();
().closeFuture().sync();
} catch (InterruptedException e) {
("client error", e);
} finally {
();
}
}
}
Output, omitted
It is still difficult to solve the problem of using this method in half-packages, because the buffer size of the receiver is limited.
Method 2: Fixed length
Let all packets be fixed in length (assuming the length is 8 bytes), and add them on the server side
().addLast(new FixedLengthFrameDecoder(8));
Client test code, note that after using this method, the client can flush it at any time.
public class HelloWorldClient {
static final Logger log = ();
public static void main(String[] args) {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
();
(worker);
(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
("connetted...");
().addLast(new LoggingHandler());
().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
("sending...");
// Send random packets of content
Random r = new Random();
char c = 'a';
ByteBuf buffer = ().buffer();
for (int i = 0; i < 10; i++) {
byte[] bytes = new byte[8];
for (int j = 0; j < (8); j++) {
bytes[j] = (byte) c;
}
c++;
(bytes);
}
(buffer);
}
});
}
});
ChannelFuture channelFuture = ("192.168.0.103", 9090).sync();
().closeFuture().sync();
} catch (InterruptedException e) {
("client error", e);
} finally {
();
}
}
}
Client output
12:07:00 [DEBUG] [nioEventLoopGroup-2-1] - connetted...
12:07:00 [DEBUG] [nioEventLoopGroup-2-1] - [id: 0x3c2ef3c2] REGISTERED
12:07:00 [DEBUG] [nioEventLoopGroup-2-1] - [id: 0x3c2ef3c2] CONNECT: /192.168.0.103:9090
12:07:00 [DEBUG] [nioEventLoopGroup-2-1] - [id: 0x3c2ef3c2, L:/192.168.0.103:53155 - R:/192.168.0.103:9090] ACTIVE
12:07:00 [DEBUG] [nioEventLoopGroup-2-1] - sending...
12:07:00 [DEBUG] [nioEventLoopGroup-2-1] - [id: 0x3c2ef3c2, L:/192.168.0.103:53155 - R:/192.168.0.103:9090] WRITE: 80B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 61 61 61 00 00 00 00 62 00 00 00 00 00 00 00 |aaaa....b.......|
|00000010| 63 63 00 00 00 00 00 00 64 00 00 00 00 00 00 00 |cc......d.......|
|00000020| 00 00 00 00 00 00 00 00 66 66 66 66 00 00 00 00 |........ffff....|
|00000030| 67 67 67 00 00 00 00 00 68 00 00 00 00 00 00 00 |ggg.....h.......|
|00000040| 69 69 69 69 69 00 00 00 6a 6a 6a 6a 00 00 00 00 |iiiii...jjjj....|
+--------+-------------------------------------------------+----------------+
12:07:00 [DEBUG] [nioEventLoopGroup-2-1] - [id: 0x3c2ef3c2, L:/192.168.0.103:53155 - R:/192.168.0.103:9090] FLUSH
Server output
12:06:51 [DEBUG] [main] - [id: 0xe3d9713f] binding...
12:06:51 [DEBUG] [main] - [id: 0xe3d9713f, L:/192.168.0.103:9090] bound...
12:07:00 [DEBUG] [nioEventLoopGroup-3-1] - [id: 0xd739f137, L:/192.168.0.103:9090 - R:/192.168.0.103:53155] REGISTERED
12:07:00 [DEBUG] [nioEventLoopGroup-3-1] - [id: 0xd739f137, L:/192.168.0.103:9090 - R:/192.168.0.103:53155] ACTIVE
12:07:00 [DEBUG] [nioEventLoopGroup-3-1] - connected [id: 0xd739f137, L:/192.168.0.103:9090 - R:/192.168.0.103:53155]
12:07:00 [DEBUG] [nioEventLoopGroup-3-1] - [id: 0xd739f137, L:/192.168.0.103:9090 - R:/192.168.0.103:53155] READ: 8B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 61 61 61 00 00 00 00 |aaaa.... |
+--------+-------------------------------------------------+----------------+
12:07:00 [DEBUG] [nioEventLoopGroup-3-1] - [id: 0xd739f137, L:/192.168.0.103:9090 - R:/192.168.0.103:53155] READ: 8B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 62 00 00 00 00 00 00 00 |b....... |
+--------+-------------------------------------------------+----------------+
12:07:00 [DEBUG] [nioEventLoopGroup-3-1] - [id: 0xd739f137, L:/192.168.0.103:9090 - R:/192.168.0.103:53155] READ: 8B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 63 63 00 00 00 00 00 00 |cc...... |
+--------+-------------------------------------------------+----------------+
12:07:00 [DEBUG] [nioEventLoopGroup-3-1] - [id: 0xd739f137, L:/192.168.0.103:9090 - R:/192.168.0.103:53155] READ: 8B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 64 00 00 00 00 00 00 00 |d....... |
+--------+-------------------------------------------------+----------------+
12:07:00 [DEBUG] [nioEventLoopGroup-3-1] - [id: 0xd739f137, L:/192.168.0.103:9090 - R:/192.168.0.103:53155] READ: 8B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 00 00 00 00 00 |........ |
+--------+-------------------------------------------------+----------------+
12:07:00 [DEBUG] [nioEventLoopGroup-3-1] - [id: 0xd739f137, L:/192.168.0.103:9090 - R:/192.168.0.103:53155] READ: 8B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 66 66 66 66 00 00 00 00 |ffff.... |
+--------+-------------------------------------------------+----------------+
12:07:00 [DEBUG] [nioEventLoopGroup-3-1] - [id: 0xd739f137, L:/192.168.0.103:9090 - R:/192.168.0.103:53155] READ: 8B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 67 67 67 00 00 00 00 00 |ggg..... |
+--------+-------------------------------------------------+----------------+
12:07:00 [DEBUG] [nioEventLoopGroup-3-1] - [id: 0xd739f137, L:/192.168.0.103:9090 - R:/192.168.0.103:53155] READ: 8B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 00 00 00 00 00 00 00 |h....... |
+--------+-------------------------------------------------+----------------+
12:07:00 [DEBUG] [nioEventLoopGroup-3-1] - [id: 0xd739f137, L:/192.168.0.103:9090 - R:/192.168.0.103:53155] READ: 8B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 69 69 69 69 69 00 00 00 |iiiii... |
+--------+-------------------------------------------------+----------------+
12:07:00 [DEBUG] [nioEventLoopGroup-3-1] - [id: 0xd739f137, L:/192.168.0.103:9090 - R:/192.168.0.103:53155] READ: 8B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 6a 6a 6a 6a 00 00 00 00 |jjjj.... |
+--------+-------------------------------------------------+----------------+
12:07:00 [DEBUG] [nioEventLoopGroup-3-1] - [id: 0xd739f137, L:/192.168.0.103:9090 - R:/192.168.0.103:53155] READ COMPLETE
The disadvantage is that the size of the data packet is difficult to grasp.
- Too big, wasteful
- The length is too small, which seems not enough for some data packets
Method 3: Fixed delimiter
The server joins, and the default is \n or \r\n as the delimiter. If the delimiter does not appear after the specified length, an exception will be thrown.
().addLast(new LineBasedFrameDecoder(1024));
After each message, the client adds the \n separator
public class HelloWorldClient {
static final Logger log = ();
public static void main(String[] args) {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
();
(worker);
(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
("connetted...");
().addLast(new LoggingHandler());
().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
("sending...");
Random r = new Random();
char c = 'a';
ByteBuf buffer = ().buffer();
for (int i = 0; i < 10; i++) {
for (int j = 1; j <= (16)+1; j++) {
((byte) c);
}
(10);
c++;
}
(buffer);
}
});
}
});
ChannelFuture channelFuture = ("192.168.0.103", 9090).sync();
().closeFuture().sync();
} catch (InterruptedException e) {
("client error", e);
} finally {
();
}
}
}
Client output
14:08:18 [DEBUG] [nioEventLoopGroup-2-1] - connetted...
14:08:18 [DEBUG] [nioEventLoopGroup-2-1] - [id: 0x1282d755] REGISTERED
14:08:18 [DEBUG] [nioEventLoopGroup-2-1] - [id: 0x1282d755] CONNECT: /192.168.0.103:9090
14:08:18 [DEBUG] [nioEventLoopGroup-2-1] - [id: 0x1282d755, L:/192.168.0.103:63641 - R:/192.168.0.103:9090] ACTIVE
14:08:18 [DEBUG] [nioEventLoopGroup-2-1] - sending...
14:08:18 [DEBUG] [nioEventLoopGroup-2-1] - [id: 0x1282d755, L:/192.168.0.103:63641 - R:/192.168.0.103:9090] WRITE: 60B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 0a 62 62 62 0a 63 63 63 0a 64 64 0a 65 65 65 ||
|00000010| 65 65 65 65 65 65 65 0a 66 66 0a 67 67 67 67 67 ||
|00000020| 67 67 0a 68 68 68 68 0a 69 69 69 69 69 69 69 0a |.|
|00000030| 6a 6a 6a 6a 6a 6a 6a 6a 6a 6a 6a 0a |jjjjjjjjjjj. |
+--------+-------------------------------------------------+----------------+
14:08:18 [DEBUG] [nioEventLoopGroup-2-1] - [id: 0x1282d755, L:/192.168.0.103:63641 - R:/192.168.0.103:9090] FLUSH
Server output
14:08:18 [DEBUG] [nioEventLoopGroup-3-5] - connected [id: 0xa4b3be43, L:/192.168.0.103:9090 - R:/192.168.0.103:63641]
14:08:18 [DEBUG] [nioEventLoopGroup-3-5] - [id: 0xa4b3be43, L:/192.168.0.103:9090 - R:/192.168.0.103:63641] READ: 1B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 |a |
+--------+-------------------------------------------------+----------------+
14:08:18 [DEBUG] [nioEventLoopGroup-3-5] - [id: 0xa4b3be43, L:/192.168.0.103:9090 - R:/192.168.0.103:63641] READ: 3B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 62 62 62 |bbb |
+--------+-------------------------------------------------+----------------+
14:08:18 [DEBUG] [nioEventLoopGroup-3-5] - [id: 0xa4b3be43, L:/192.168.0.103:9090 - R:/192.168.0.103:63641] READ: 3B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 63 63 63 |ccc |
+--------+-------------------------------------------------+----------------+
14:08:18 [DEBUG] [nioEventLoopGroup-3-5] - [id: 0xa4b3be43, L:/192.168.0.103:9090 - R:/192.168.0.103:63641] READ: 2B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 64 64 |dd |
+--------+-------------------------------------------------+----------------+
14:08:18 [DEBUG] [nioEventLoopGroup-3-5] - [id: 0xa4b3be43, L:/192.168.0.103:9090 - R:/192.168.0.103:63641] READ: 10B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 65 65 65 65 65 65 65 65 65 65 |eeeeeeeeee |
+--------+-------------------------------------------------+----------------+
14:08:18 [DEBUG] [nioEventLoopGroup-3-5] - [id: 0xa4b3be43, L:/192.168.0.103:9090 - R:/192.168.0.103:63641] READ: 2B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 66 66 |ff |
+--------+-------------------------------------------------+----------------+
14:08:18 [DEBUG] [nioEventLoopGroup-3-5] - [id: 0xa4b3be43, L:/192.168.0.103:9090 - R:/192.168.0.103:63641] READ: 7B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 67 67 67 67 67 67 67 |ggggggg |
+--------+-------------------------------------------------+----------------+
14:08:18 [DEBUG] [nioEventLoopGroup-3-5] - [id: 0xa4b3be43, L:/192.168.0.103:9090 - R:/192.168.0.103:63641] READ: 4B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 68 68 68 |hhhh |
+--------+-------------------------------------------------+----------------+
14:08:18 [DEBUG] [nioEventLoopGroup-3-5] - [id: 0xa4b3be43, L:/192.168.0.103:9090 - R:/192.168.0.103:63641] READ: 7B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 69 69 69 69 69 69 69 |iiiiiii |
+--------+-------------------------------------------------+----------------+
14:08:18 [DEBUG] [nioEventLoopGroup-3-5] - [id: 0xa4b3be43, L:/192.168.0.103:9090 - R:/192.168.0.103:63641] READ: 11B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 6a 6a 6a 6a 6a 6a 6a 6a 6a 6a 6a |jjjjjjjjjjj |
+--------+-------------------------------------------------+----------------+
14:08:18 [DEBUG] [nioEventLoopGroup-3-5] - [id: 0xa4b3be43, L:/192.168.0.103:9090 - R:/192.168.0.103:63641] READ COMPLETE
Disadvantages, it is more appropriate to process character data, but ifThe content itself contains the separator(This often happens in byte data), then the error will be parsed
Method 4: Preset length
Before sending a message, it is agreed to use fixed-length bytes to represent the length of the next data
// Maximum length, length offset, length occupancy, length adjustment, number of stripped bytes
().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 1, 0, 1));
Client Code
public class HelloWorldClient {
static final Logger log = ();
public static void main(String[] args) {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
();
(worker);
(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
("connetted...");
().addLast(new LoggingHandler());
().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
("sending...");
Random r = new Random();
char c = 'a';
ByteBuf buffer = ().buffer();
for (int i = 0; i < 10; i++) {
byte length = (byte) ((16) + 1);
// Write the length first
(length);
// Again
for (int j = 1; j <= length; j++) {
((byte) c);
}
c++;
}
(buffer);
}
});
}
});
ChannelFuture channelFuture = ("192.168.0.103", 9090).sync();
().closeFuture().sync();
} catch (InterruptedException e) {
("client error", e);
} finally {
();
}
}
}
Client output
14:37:10 [DEBUG] [nioEventLoopGroup-2-1] - connetted...
14:37:10 [DEBUG] [nioEventLoopGroup-2-1] - [id: 0xf0f347b8] REGISTERED
14:37:10 [DEBUG] [nioEventLoopGroup-2-1] - [id: 0xf0f347b8] CONNECT: /192.168.0.103:9090
14:37:10 [DEBUG] [nioEventLoopGroup-2-1] - [id: 0xf0f347b8, L:/192.168.0.103:49979 - R:/192.168.0.103:9090] ACTIVE
14:37:10 [DEBUG] [nioEventLoopGroup-2-1] - sending...
14:37:10 [DEBUG] [nioEventLoopGroup-2-1] - [id: 0xf0f347b8, L:/192.168.0.103:49979 - R:/192.168.0.103:9090] WRITE: 97B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 09 61 61 61 61 61 61 61 61 61 09 62 62 62 62 62 |.|
|00000010| 62 62 62 62 06 63 63 63 63 63 63 08 64 64 64 64 ||
|00000020| 64 64 64 64 0f 65 65 65 65 65 65 65 65 65 65 65 ||
|00000030| 65 65 65 65 0d 66 66 66 66 66 66 66 66 66 66 66 ||
|00000040| 66 66 02 67 67 02 68 68 0e 69 69 69 69 69 69 69 ||
|00000050| 69 69 69 69 69 69 69 09 6a 6a 6a 6a 6a 6a 6a 6a ||
|00000060| 6a |j |
+--------+-------------------------------------------------+----------------+
14:37:10 [DEBUG] [nioEventLoopGroup-2-1] - [id: 0xf0f347b8, L:/192.168.0.103:49979 - R:/192.168.0.103:9090] FLUSH
Server output
14:36:50 [DEBUG] [main] - [id: 0xdff439d3] binding...
14:36:51 [DEBUG] [main] - [id: 0xdff439d3, L:/192.168.0.103:9090] bound...
14:37:10 [DEBUG] [nioEventLoopGroup-3-1] - [id: 0x744f2b47, L:/192.168.0.103:9090 - R:/192.168.0.103:49979] REGISTERED
14:37:10 [DEBUG] [nioEventLoopGroup-3-1] - [id: 0x744f2b47, L:/192.168.0.103:9090 - R:/192.168.0.103:49979] ACTIVE
14:37:10 [DEBUG] [nioEventLoopGroup-3-1] - connected [id: 0x744f2b47, L:/192.168.0.103:9090 - R:/192.168.0.103:49979]
14:37:10 [DEBUG] [nioEventLoopGroup-3-1] - [id: 0x744f2b47, L:/192.168.0.103:9090 - R:/192.168.0.103:49979] READ: 9B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 61 61 61 61 61 61 61 61 |aaaaaaaaa |
+--------+-------------------------------------------------+----------------+
14:37:10 [DEBUG] [nioEventLoopGroup-3-1] - [id: 0x744f2b47, L:/192.168.0.103:9090 - R:/192.168.0.103:49979] READ: 9B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 62 62 62 62 62 62 62 62 62 |bbbbbbbbb |
+--------+-------------------------------------------------+----------------+
14:37:10 [DEBUG] [nioEventLoopGroup-3-1] - [id: 0x744f2b47, L:/192.168.0.103:9090 - R:/192.168.0.103:49979] READ: 6B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 63 63 63 63 63 63 |cccccc |
+--------+-------------------------------------------------+----------------+
14:37:10 [DEBUG] [nioEventLoopGroup-3-1] - [id: 0x744f2b47, L:/192.168.0.103:9090 - R:/192.168.0.103:49979] READ: 8B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 64 64 64 64 64 64 64 64 |dddddddd |
+--------+-------------------------------------------------+----------------+
14:37:10 [DEBUG] [nioEventLoopGroup-3-1] - [id: 0x744f2b47, L:/192.168.0.103:9090 - R:/192.168.0.103:49979] READ: 15B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 65 65 65 65 65 65 65 65 65 65 65 65 65 65 65 |eeeeeeeeeeeeeee |
+--------+-------------------------------------------------+----------------+
14:37:10 [DEBUG] [nioEventLoopGroup-3-1] - [id: 0x744f2b47, L:/192.168.0.103:9090 - R:/192.168.0.103:49979] READ: 13B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 66 66 66 66 66 66 66 66 66 66 66 66 66 |fffffffffffff |
+--------+-------------------------------------------------+----------------+
14:37:10 [DEBUG] [nioEventLoopGroup-3-1] - [id: 0x744f2b47, L:/192.168.0.103:9090 - R:/192.168.0.103:49979] READ: 2B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 67 67 |gg |
+--------+-------------------------------------------------+----------------+
14:37:10 [DEBUG] [nioEventLoopGroup-3-1] - [id: 0x744f2b47, L:/192.168.0.103:9090 - R:/192.168.0.103:49979] READ: 2B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 68 |hh |
+--------+-------------------------------------------------+----------------+
14:37:10 [DEBUG] [nioEventLoopGroup-3-1] - [id: 0x744f2b47, L:/192.168.0.103:9090 - R:/192.168.0.103:49979] READ: 14B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 69 69 69 69 69 69 69 69 69 69 69 69 69 69 |iiiiiiiiiiiiii |
+--------+-------------------------------------------------+----------------+
14:37:10 [DEBUG] [nioEventLoopGroup-3-1] - [id: 0x744f2b47, L:/192.168.0.103:9090 - R:/192.168.0.103:49979] READ: 9B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 6a 6a 6a 6a 6a 6a 6a 6a 6a |jjjjjjjjj |
+--------+-------------------------------------------------+----------------+
14:37:10 [DEBUG] [nioEventLoopGroup-3-1] - [id: 0x744f2b47, L:/192.168.0.103:9090 - R:/192.168.0.103:49979] READ COMPLETE
Protocol design and analysis
Why are agreements needed?
Message transmission in TCP/IP is based on streams and has no boundaries.
The purpose of the agreement is to define the boundaries of messages and formulate communication rules that both parties to communicate must abide by.
For example: Transmission on the network
Keep guests on rainy days and stay me or not
It is a famous Chinese sentence without punctuation. In the absence of punctuation, this sentence has several ways to disassemble it, but its meaning is completely different, so it is often used to describe the importance of punctuation.
An interpretation
It's rainy, but I won't
Another interpretation
It's raining, stay with guests, will you keep me? Keep
How to design a protocol? In fact, it is to add "punctuation marks" to the information transmitted on the network. But it is not good to break a sentence by a separator, because the separator itself must be distinguished if it is used for transmission. Therefore, the following protocol is more commonly used
Fixed length bytes represent content length + actual content
For example, if a Chinese character length is 3, according to the above protocol rules, the sending information is as follows, and the receiver will not misinterpret it.
0f It takes guests on rainy days 06 days 09 I won't stay
Redis protocol example
Simulates the redis client sending commands.
NioEventLoopGroup worker = new NioEventLoopGroup();
byte[] LINE = {13, 10};
try {
Bootstrap bootstrap = new Bootstrap();
();
(worker);
(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
().addLast(new LoggingHandler());
().addLast(new ChannelInboundHandlerAdapter() {
// An active event will be triggered after the connection channel is successfully established
@Override
public void channelActive(ChannelHandlerContext ctx) {
set(ctx);
get(ctx);
}
private void get(ChannelHandlerContext ctx) {
ByteBuf buf = ().buffer();
("*2".getBytes());//*2 means that the number of elements in the array is 2, that is, get aaa is 2 strings of contents
(LINE);
("$3".getBytes());//Suppose $3 means that there are 3 bytes in the following order
(LINE);
("get".getBytes());//Enter the gset command
(LINE);
("$3".getBytes());
(LINE);
("aaa".getBytes());//Input key is aaa
(LINE);
(buf);
}
private void set(ChannelHandlerContext ctx) {
//The following redis command is set aaa bbb
ByteBuf buf = ().buffer();
("*3".getBytes());//*3 means that the number of array elements is 3, that is, set aaa bbb is 3 strings of contents
(LINE);
("$3".getBytes());//Suppose $3 means that there are 3 bytes in the following order
(LINE);
("set".getBytes());//Enter the set command
(LINE);
("$3".getBytes());
(LINE);
("aaa".getBytes());//Input key is aaa
(LINE);
("$3".getBytes());
(LINE);
("bbb".getBytes());//The input value is bbb
(LINE);
(buf);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
((()));
}
});
}
});
ChannelFuture channelFuture = ("localhost", 6379).sync();
().closeFuture().sync();
} catch (InterruptedException e) {
("client error", e);
} finally {
();
}
Of course netty provides these ready-made protocols, which do not require us to develop them ourselves, so we are here to know why.
http protocol example
Simulate http server
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
();
(boss, worker);
(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
().addLast(new LoggingHandler());
().addLast(new HttpServerCodec());
().addLast(new SimpleChannelInboundHandler<HttpRequest>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {
// Get request
(());
// Return response
DefaultFullHttpResponse response =
new DefaultFullHttpResponse((), );
byte[] bytes = "<h1>Hello, world!</h1>".getBytes();
().setInt(CONTENT_LENGTH, );
().writeBytes(bytes);
// Write a response
(response);
}
});
/*().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
("{}", ());
if (msg instanceof HttpRequest) { // Request line, request header
} else if (msg instanceof HttpContent) { //Request body
}
}
});*/
}
});
ChannelFuture channelFuture = (8080).sync();
().closeFuture().sync();
} catch (InterruptedException e) {
("server error", e);
} finally {
();
();
}
Custom protocol elements
- Magic Number: It is agreed upon to determine whether it is an invalid data packet at the first time.
- Version number: Can support protocol upgrades
- Serialization algorithm: Which serialization deserialization method is used for the message body? It can be extended by this, such as: json, protobuf, hessian, jdk
- Instruction type: login, registration, single chat, group chat... related to business
- Request serial number: Provides asynchronous capability for duplex communication
- Text length
- Message text
Codec
Based on the above elements, design a login request message and a login response message, and use Netty to complete the sending and receiving
@Slf4j
public class MessageCodec extends ByteToMessageCodec<Message> {
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
// 1. 4 byte magic number
(new byte[]{1, 2, 3, 4});
// 2. 1 byte version,
(1);
// 3. 1 byte serialization method jdk 0 , json 1
(0);
// 4. 1 byte instruction type
(());
// 5. 4 bytes
(());
// No sense, aligning fill
(0xff);
// 6. Get the byte array of content
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
(msg);
byte[] bytes = ();
// 7. Length
();
// 8. Write content
(bytes);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int magicNum = ();
byte version = ();
byte serializerType = ();
byte messageType = ();
int sequenceId = ();
();
int length = ();
byte[] bytes = new byte[length];
(bytes, 0, length);
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
Message message = (Message) ();
("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
("{}", message);
(message);
}
}
test
EmbeddedChannel channel = new EmbeddedChannel(
new LoggingHandler(),
new LengthFieldBasedFrameDecoder(
1024, 12, 4, 0, 0),
new MessageCodec()
);
// encode
LoginRequestMessage message = new LoginRequestMessage("zhangsan", "123", "Zhang San");
// (message);
// decode
ByteBuf buf = ();
new MessageCodec().encode(null, message, buf);
ByteBuf s1 = (0, 100);
ByteBuf s2 = (100, () - 100);
(); // Reference count 2
(s1); // release 1
(s2);
@Sharable
- When the handler does not save the state, it can be safely shared under multithreading
- But be aware that for codec classes, the ByteToMessageCodec or CombinedChannelDuplexHandler parent class cannot be inherited. Their constructor methods have restrictions on @Sharable
- If you can ensure that the codec does not save state, you can inherit the MessageToMessageCodec parent class
@Slf4j
@
/**
* Must be used with LengthFieldBasedFrameDecoder to ensure that the received ByteBuf message is complete
*/
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {
ByteBuf out = ().buffer();
// 1. 4 byte magic number
(new byte[]{1, 2, 3, 4});
// 2. 1 byte version,
(1);
// 3. 1 byte serialization method jdk 0 , json 1
(0);
// 4. 1 byte instruction type
(());
// 5. 4 bytes
(());
// No sense, aligning fill
(0xff);
// 6. Get the byte array of content
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
(msg);
byte[] bytes = ();
// 7. Length
();
// 8. Write content
(bytes);
(out);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int magicNum = ();
byte version = ();
byte serializerType = ();
byte messageType = ();
int sequenceId = ();
();
int length = ();
byte[] bytes = new byte[length];
(bytes, 0, length);
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
Message message = (Message) ();
("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
("{}", message);
(message);
}
}
Extended serialization algorithm
Serialization, deserialization is mainly used in the conversion of message body
- When serializing, Java objects need to be changed into data to be transmitted (can be byte[], or json, etc., and eventually they need to be byte[])
- When deserializing, the incoming body data needs to be restored to Java objects for easy processing.
The current code only supports Java's own serialization and deserialization mechanism. The core code is as follows
// Deserialization
byte[] body = new byte[bodyLength];
(body);
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(body));
Message message = (Message) ();
(sequenceId);
// Serialization
ByteArrayOutputStream out = new ByteArrayOutputStream();
new ObjectOutputStream(out).writeObject(message);
byte[] bytes = ();
To support more serialization algorithms, abstract a Serializer interface
public interface Serializer {
// Deserialization method
<T> T deserialize(Class<T> clazz, byte[] bytes);
// Serialization method
<T> byte[] serialize(T object);
}
Provide two implementations, here we directly add the specific implementation to the enumeration class
enum SerializerAlgorithm implements Serializer {
// Java implementation
Java {
@Override
public <T> T deserialize(Class<T> clazz, byte[] bytes) {
try {
ObjectInputStream in =
new ObjectInputStream(new ByteArrayInputStream(bytes));
Object object = ();
return (T) object;
} catch (IOException | ClassNotFoundException e) {
throw new RuntimeException("Deserialization error", e);
}
}
@Override
public <T> byte[] serialize(T object) {
try {
ByteArrayOutputStream out = new ByteArrayOutputStream();
new ObjectOutputStream(out).writeObject(object);
return ();
} catch (IOException e) {
throw new RuntimeException("Serialization Error", e);
}
}
},
// Json implementation (introduced Gson dependencies)
Json {
@Override
public <T> T deserialize(Class<T> clazz, byte[] bytes) {
return new Gson().fromJson(new String(bytes, StandardCharsets.UTF_8), clazz);
}
@Override
public <T> byte[] serialize(T object) {
return new Gson().toJson(object).getBytes(StandardCharsets.UTF_8);
}
};
// What kind of serialization algorithm is needed to get from the bytes of the protocol
public static SerializerAlgorithm getByInt(int type) {
SerializerAlgorithm[] array = ();
if (type < 0 || type > - 1) {
throw new IllegalArgumentException("Exceed the SerializerAlgorithm range");
}
return array[type];
}
}
Add configuration classes and configuration files
public abstract class Config {
static Properties properties;
static {
try (InputStream in = ("/")) {
properties = new Properties();
(in);
} catch (IOException e) {
throw new ExceptionInInitializerError(e);
}
}
public static int getServerPort() {
String value = ("");
if(value == null) {
return 8080;
} else {
return (value);
}
}
public static getSerializerAlgorithm() {
String value = ("");
if(value == null) {
return ;
} else {
return (value);
}
}
}
Configuration File
=Json
Modify the codec
/**
* Must be used with LengthFieldBasedFrameDecoder to ensure that the received ByteBuf message is complete
*/
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {
@Override
public void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {
ByteBuf out = ().buffer();
// 1. 4 byte magic number
(new byte[]{1, 2, 3, 4});
// 2. 1 byte version,
(1);
// 3. 1 byte serialization method jdk 0 , json 1
(().ordinal());
// 4. 1 byte instruction type
(());
// 5. 4 bytes
(());
// No sense, aligning fill
(0xff);
// 6. Get the byte array of content
byte[] bytes = ().serialize(msg);
// 7. Length
();
// 8. Write content
(bytes);
(out);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int magicNum = ();
byte version = ();
byte serializerAlgorithm = (); // 0 or 1
byte messageType = (); // 0,1,2...
int sequenceId = ();
();
int length = ();
byte[] bytes = new byte[length];
(bytes, 0, length);
// Find the deserialization algorithm
algorithm = ()[serializerAlgorithm];
// Determine the specific message type
Class<? extends Message> messageClass = (messageType);
Message message = (messageClass, bytes);
// ("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
// ("{}", message);
(message);
}
}
The specific message type can be determined according toMessage type bytes
Get the correspondingMessage class
@Data
public abstract class Message implements Serializable {
/**
* Obtain the corresponding message class according to the message type bytes
* @param messageType Message type byte
* @return message class
*/
public static Class<? extends Message> getMessageClass(int messageType) {
return (messageType);
}
private int sequenceId;
private int messageType;
public abstract int getMessageType();
public static final int LoginRequestMessage = 0;
public static final int LoginResponseMessage = 1;
public static final int ChatRequestMessage = 2;
public static final int ChatResponseMessage = 3;
public static final int GroupCreateRequestMessage = 4;
public static final int GroupCreateResponseMessage = 5;
public static final int GroupJoinRequestMessage = 6;
public static final int GroupJoinResponseMessage = 7;
public static final int GroupQuitRequestMessage = 8;
public static final int GroupQuitResponseMessage = 9;
public static final int GroupChatRequestMessage = 10;
public static final int GroupChatResponseMessage = 11;
public static final int GroupMembersRequestMessage = 12;
public static final int GroupMembersResponseMessage = 13;
public static final int PingMessage = 14;
public static final int PongMessage = 15;
private static final Map<Integer, Class<? extends Message>> messageClasses = new HashMap<>();
static {
(LoginRequestMessage, );
(LoginResponseMessage, );
(ChatRequestMessage, );
(ChatResponseMessage, );
(GroupCreateRequestMessage, );
(GroupCreateResponseMessage, );
(GroupJoinRequestMessage, );
(GroupJoinResponseMessage, );
(GroupQuitRequestMessage, );
(GroupQuitResponseMessage, );
(GroupChatRequestMessage, );
(GroupChatResponseMessage, );
(GroupMembersRequestMessage, );
(GroupMembersResponseMessage, );
}
}
Parameter tuning
Related source codes to be updated
Client parameters CONNECT_TIMEOUT_MILLIS
-
Belongs to SocketChannal parameters
-
When using when establishing a connection between the client, if the connection cannot be connected within a specified millisecond, a timeout exception will be thrown.
-
SO_TIMEOUT is mainly used to block IO, blocking IO, accept, read, etc. are waiting infinitely. If you don't want to block forever, use it to adjust the timeout time.
@Slf4j
public class TestConnectionTimeout {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap()
.group(group)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 300)
.channel()
.handler(new LoggingHandler());
ChannelFuture future = ("127.0.0.1", 8080);
().channel().closeFuture().sync(); // Breakpoint 1
} catch (Exception e) {
();
("timeout");
} finally {
();
}
}
}
Also source code part#connect
@Override
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
// ...
// Get the timeout time
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {//If the timeout is greater than 0
connectTimeoutFuture = eventLoop().schedule(new Runnable() {// Then start the timed task
@Override
public void run() {
//connectPromise is an object that exchanges data between two threads
ChannelPromise connectPromise = ;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress); // Breakpoint 2
//(cause) Put the exception in the connectPromise and wake up the main thread
if (connectPromise != null && (cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, );//The timing task is executed after the connectTimeoutMillis time
}
// ...
}
Server parameter SO_BACKLOG
This is a parameter belonging to ServerSocketChannal
There are semi-connection queues and fully-connection queues during three handshakes. For more information, please see this article:TCP - Semi-connection queue and full-connection queue
- sync queue - semi-connection queue
- The size is specified by /proc/sys/net/ipv4/tcp_max_syn_backlog, in
syncookies
When enabled, there is no logical maximum limit, and this setting is ignored.
- The size is specified by /proc/sys/net/ipv4/tcp_max_syn_backlog, in
- accept queue - Full connection queue
- Its size is specified by /proc/sys/net/core/somaxconn. When using the listen function, the kernel will take the smaller values of both based on the incoming backlog parameters and system parameters.
- If the accpet queue is full, the server will send an error message that refuses to the client
In netty, you can set the size of the backlog by option(ChannelOption.SO_BACKLOG, value)
You can view the default size through the following source code
public class DefaultServerSocketChannelConfig extends DefaultChannelConfig
implements ServerSocketChannelConfig {
private volatile int backlog = ;
// ...
}
TCP_NODELAY
- Belongs to SocketChannal parameters
Send immediately, it is recommended to set to true. false means turning on the nagle algorithm
SO_SNDBUF & SO_RCVBUF
set upSliding windowThese parameters may need to be set earlier, but now tcp will automatically adjust the window according to congestion, etc., so it is not recommended to manually set these two values.
- SO_SNDBUF belongs to the SocketChannal parameter
- SO_RCVBUF can be used for both the SocketChannal parameter and the ServerSocketChannal parameter (it is recommended to set it to ServerSocketChannal)
ALLOCATOR
- Belongs to SocketChannal parameters
- Used to allocate ByteBuf, ()
RCVBUF_ALLOCATOR
- Belongs to SocketChannal parameters
- Control the netty receive buffer size
- Responsible for the allocation of inbound data, determine the size of the inbound buffer (and can be adjusted dynamically), and adopt direct direct memory uniformly. The specific pooling or non-pooling is determined by the allocator.
Source code detailed explanation
Start the analysis
Let's take a look at how the following code is processed in netty
//1 Use NioEventLoopGroup (nio boss thread for short) in netty to encapsulate threads and selectors
Selector selector = ();
//2 Create a NioServerSocketChannel, and initialize its associated handler, and store config for native ssc
NioServerSocketChannel attachment = new NioServerSocketChannel();
//3 When creating NioServerSocketChannel, a serverSocketChannel native to java was created
ServerSocketChannel serverSocketChannel = ();
(false);
//4 Start the nio boss thread to perform the next operation
//5 Register (only associated with selector and NioServerSocketChannel), no events are followed
SelectionKey selectionKey = (selector, 0, attachment);
//6 head -> Initializer -> ServerBootstrapAcceptor -> tail, the initializer is one-time, just to add acceptor
//7 Bind port
(new InetSocketAddress(8080));
//8 Trigger channel active event, follow the op_accept event in the head
(SelectionKey.OP_ACCEPT);
Entrance#bind
Key Code#doBind
Which threads are used to process this function? You can have a concept first and then read:
- init & register regFuture processing
- init: handled by main
- Create NioServerSocketChannel: handled by main
- Add NioServerSocketChannel Initialization handler: handled by main
- Initialize handler waiting for call
- init: handled by main
- register
- Start the nio boss thread: handled by main
- Native ssc Register to selector Unattended Events: Handled by nio-thread
- Execute NioServerSocketChannel initialization handler: handled by nio-thread
- regFuture waits for callback doBind0: handled by nio-thread
- Native ServerSocketChannel binding: handled by nio-thread
- Triggering NioServerSocketChannel active event: handled by nio-thread
private ChannelFuture doBind(final SocketAddress localAddress) {
// 1. Execute initialization and registration regFuture will be set by initAndRegister to set whether it is completed, thereby calling back at code in 3.2
// init is equivalent to ServerSocketChannel ssc= ();
// Register is equivalent to SelectionKey selectionKey=(selector, 0, nettySsc);
final ChannelFuture regFuture = initAndRegister();
final Channel channel = ();
if (() != null) {
return regFuture;
}
// 2. Because it is an asynchronous execution of initAndRegister, it needs to be divided into two situations. It also needs to be distinguished by the suspend breakpoint type during debugging.
// 2.1 If completed
if (()) {
ChannelPromise promise = ();
// 3.1 Call doBind0 immediately
doBind0(regFuture, channel, localAddress, promise);
return promise;
}
// 2.2 Not completed yet
else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
// 3.2 callback doBind0
(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = ();
if (cause != null) {
// Handle exceptions...
(cause);
} else {
();
// 3. The registration thread executes doBind0
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
Key Code#initAndRegister
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = ();
// 1.1 Initialization - What you do is add an initializer ChannelInitializer
init(channel);
} catch (Throwable t) {
// Handle exceptions...
return new DefaultChannelPromise(new FailedChannel(), ).setFailure(t);
}
// 1.2 Registration - What you do is register the native channel to the selector
ChannelFuture regFuture = config().group().register(channel);
if (() != null) {
// Handle exceptions...
}
return regFuture;
}
Key Code#init
// Here the channel is actually NioServerSocketChannel
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: ()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) ();
(key).set(());
}
}
ChannelPipeline p = ();
final EventLoopGroup currentlyChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = ().toArray(newOptionArray(0));
}
synchronized (childAttrs) {
currentChildAttrs = ().toArray(newAttrArray(0));
}
: : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : :
// Add an initializer for NioServerSocketChannel
(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ();
ChannelHandler handler = ();
if (handler != null) {
(handler);
}
// The initializer's responsibility is to add ServerBootstrapAcceptor to NioServerSocketChannel
().execute(new Runnable() {
@Override
public void run() {
(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
Key Code#register
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// Some checks, a little...
= eventLoop;
if (()) {
register0(promise);
} else {
try {
// When the execute method is executed for the first time, the nio thread will be started, and then the registration and other operations will be executed on the nio thread.
// Because there is only one NioServerSocketChannel, there will be only one boss nio thread
// The fact that this line of code is the switching of main -> nio boss thread
(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
// Logging...
closeForcibly();
();
safeSetFailure(promise, t);
}
}
}
#register0
private void register0(ChannelPromise promise) {
try {
if (!() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
// 1.2.1 The native nio channel is bound to the selector. Note that the selector is not registered at this time. The attachment is NioServerSocketChannel
doRegister();
neverRegistered = false;
registered = true;
// 1.2.2 Execute the initChannel of the NioServerSocketChannel initializer
();
// Callback 3.2 #doBind0
safeSetSuccess(promise);
();
// The corresponding server socket channel has not been bound yet, isActive is false
if (isActive()) {
if (firstRegistration) {
();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
();
safeSetFailure(promise, t);
}
}
Key Code#initChannel
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if ((ctx)) { // Guard against re-entrance.
try {
// 1.2.2.1 Perform initialization
initChannel((C) ());
} catch (Throwable cause) {
exceptionCaught(ctx, cause);
} finally {
// 1.2.2.2 Remove the initializer
ChannelPipeline pipeline = ();
if ((this) != null) {
(this);
}
}
return true;
}
return false;
}
Key Code#doBind0
// 3.1 or 3.2 execute doBind0
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
().execute(new Runnable() {
@Override
public void run() {
if (()) {
(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
(());
}
}
});
}
Key Code#bind
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
if (!() || !ensureOpen(promise)) {
return;
}
if ((config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instance of InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!() && !()) {
// Logging...
}
boolean wasActive = isActive();
try {
// 3.3 Perform port binding
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
// 3.4 Trigger active event
();
}
});
}
safeSetSuccess(promise);
}
Key Code#doBind
protected void doBind(SocketAddress localAddress) throws Exception {
if (() >= 7) {
javaChannel().bind(localAddress, ());
} else {
javaChannel().socket().bind(localAddress, ());
}
}
Key Code#channelActive
public void channelActive(ChannelHandlerContext ctx) {
();
// Trigger read (the read on NioServerSocketChannel is not reading data, but just to trigger event registration of channel)
readIfIsAutoRead();
}
Key Code#doBeginRead
protected void doBeginRead() throws Exception {
// () or () was called
final SelectionKey selectionKey = ;
if (!()) {
return;
}
readPending = true;
final int interestOps = ();
// The value of readInterestOp is 16, which is initialized when NioServerSocketChannel is created, which means you are concerned about the accept event
if ((interestOps & readInterestOp) == 0) {
(interestOps | readInterestOp);
}
}
NioEventLoop Analysis
The NioEventLoop thread not only needs to handle IO events, but also needs to handle Tasks (including normal tasks and timed tasks).
Submit task code#execute
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
// Add a task, where the queue uses the mpsc lock-free queue provided by jctools
addTask(task);
if (!inEventLoop) {
// InEventLoop If false means execute is called by other threads, that is, the first call is called. At this time, the first task needs to be submitted to eventLoop, start the dead loop, and execute it to the following doStartThread
startThread();
if (isShutdown()) {
// If you have shutdown, do rejection logic, the code is slightly...
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
// If the thread is blocked due to IO select, the thread of the added task needs to be responsible for waking up the NioEventLoop thread
wakeup(inEventLoop);
}
}
Wake up select blocking thread#wakeup
@Override
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && (false, true)) {
();
}
}
Start EventLoop main loop#doStartThread
private void doStartThread() {
assert thread == null;
(new Runnable() {
@Override
public void run() {
// Save the current thread of the thread pool in a member variable for subsequent use
thread = ();
if (interrupted) {
();
}
boolean success = false;
updateLastExecutionTime();
try {
// Call the run method of the external class SingleThreadEventExecutor to enter the dead loop. See below for the run method
();
success = true;
} catch (Throwable t) {
("Unexpected exception from an event executor: ", t);
} finally {
// Clean up, the code is slightly...
}
}
});
}
#run
The main task is to execute a dead loop, constantly check whether there are new tasks and IO events
protected void run() {
for (;;) {
try {
try {
// The logic of calculateStrategy is as follows:
// If there is a task, selectNow will be executed once, clearing the last wakeup result. Regardless of whether there are or not, the switch will be skipped.
// There will be no task, it will match to see if it should be blocked
switch ((selectNowSupplier, hasTasks())) {
case:
continue;
case SelectStrategy.BUSY_WAIT:
case:
// Because both the IO thread and the commit task thread may perform wakeup, and wakeup is a relatively expensive operation, an atomic Boolean object wakenUp is used. When it is true, it means that it is woken up by the current thread.
// Perform select blocking and set the wake-up state to false
boolean oldWakenUp = (false);
// If in this position, non-EventLoop threads set wakenUp to true and wakeup
// The following select method will not block
// After the runAllTasks process is completed, will the newly added tasks be executed in time at this stage of recirculation?
// Because oldWakenUp is true, the following select method will block until the timeout
// Only then can it be executed, so that the select method is unnecessary to block
select(oldWakenUp);
if (()) {
();
}
default:
}
} catch (IOException e) {
rebuildSelector0();
handleLoopException(e);
continue;
}
cancelledKeys = 0;
needsToSelectAgain = false;
// ioRatio defaults to 50
final int ioRatio = ;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// When ioRatio is 100, all non-IO tasks are always run
runAllTasks();
}
} else {
final long ioStartTime = ();
try {
processSelectedKeys();
} finally {
// Recording the time-consuming process of io event
final long ioTime = () - ioStartTime;
// Run non-IO tasks and exit runAllTasks once timed out
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
Notice
One puzzled thing here is wakeup, which can be called by the thread that submits the task (it is easier to understand) or by the EventLoop thread (it is more difficult to understand). Here you need to know the effect of the wakeup method:
- Called by a non-EventLoop thread, it will wake up the EventLoop thread currently blocking when executing the select
- Called by EventLoop itself, this wakeup will cancel the next select operation
#select
private void select(boolean oldWakenUp) throws IOException {
Selector selector = ;
try {
int selectCnt = 0;
long currentTimeNanos = ();
// Calculate the waiting time
// * No scheduledTask, timeout is 1s
// * There is scheduledTask, the timeout is `Next time of execution time of the time of the execution of the task - current time`
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
// If timeout, exit the loop
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
();
selectCnt = 1;
}
break;
}
// If the task exits the loop during the period, if there is no such judgment, the task will not be executed until the next time the select timeout
// (false, true) It is to make non-NioEventLoop no longer need to execute wakeup
if (hasTasks() && (false, true)) {
();
selectCnt = 1;
break;
}
// select limited time blocking
// Note that there is a bug in nio. When the bug occurs, the select method will not block even if there is no time to occur, resulting in constant polling, and the CPU occupies 100%
int selectedKeys = (timeoutMillis);
// Count is added 1
selectCnt ++;
// After waking up, if there is an IO event, or it is awakened by a non-EventLoop thread, or there is a task, exit the loop
if (selectedKeys != 0 || oldWakenUp || () || hasTasks() || hasScheduledTasks()) {
break;
}
if (()) {
// The thread is interrupted and exits the loop
// Logging
selectCnt = 1;
break;
}
long time = ();
if (time - (timeoutMillis) >= currentTimeNanos) {
// If the timeout, the count is reset to 1, the next loop will break
selectCnt = 1;
}
// Count exceeds the threshold, specified by , default 512
// This is to solve the nio null polling bug
else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// Rebuild the selector
selector = selectRebuildSelector(selectCnt);
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
// Logging
}
} catch (CancelledKeyException e) {
// Logging
}
}
Process keys#processSelectedKeys
private void processSelectedKeys() {
if (selectedKeys != null) {
// Replace the ready event set in the Selector implementation class with SelectedSelectionKeySet via reflection
// SelectedSelectionKeySet is implemented as an array, which can improve traversal performance (originally HashSet)
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(());
}
}
#processSelectedKey
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final unsafe = ();
// This key is invalid when the key is cancelled or closed
if (!()) {
// Handle if invalid...
return;
}
try {
int readyOps = ();
// Connection event
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = ();
ops &= ~SelectionKey.OP_CONNECT;
(ops);
();
}
// Writable events
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
().forceFlush();
}
// Readable or accessible events
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
// If it is available, please access #read
// If readable #read
();
}
} catch (CancelledKeyException ignored) {
(());
}
}
accept analysis
The following code in nio, the process in netty
//1 Block until the event occurs
();
Iterator<SelectionKey> iter = ().iterator();
while (()) {
//2 Get an event
SelectionKey key = ();
//3 If it is an accept event
if (()) {
//4 Execute accept
SocketChannel channel = ();
(false);
//5 Follow the read event
(selector, SelectionKey.OP_READ);
}
// ...
}
Let’s take a look at the access event processing (accept)
#read
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final allocHandle = unsafe().recvBufAllocHandle();
(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
// DoReadMessages execute accept and create NioSocketChannel as a message to put it into readBuf
// readBuf is an ArrayList used to cache messages
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
// localRead is 1, just a message, that is, a client connection is received
(localRead);
} while (());
} catch (Throwable t) {
exception = t;
}
int size = ();
for (int i = 0; i < size; i ++) {
readPending = false;
// Trigger the read event and let the handler on the pipeline handle it, which is the process at this time
// #channelRead
((i));
}
();
();
();
if (exception != null) {
closed = closeOnReadError(exception);
(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
if (!readPending && !()) {
removeReadOp();
}
}
}
Key Code#channelRead
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// The msg at this time is NioSocketChannel
final Channel child = (Channel) msg;
// NioSocketChannel add childHandler, that is, initializer
().addLast(childHandler);
// Set options
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
((AttributeKey<Object>) ()).set(());
}
try {
// Register NioSocketChannel to the nio worker thread, and the next processing will be transferred to the nio worker thread
(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!()) {
forceClose(child, ());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
Back to the familiar#register
method
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// Some checks, a little...
= eventLoop;
if (()) {
register0(promise);
} else {
try {
// The fact that this line of code is the switching of nio boss -> nio worker thread
(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
// Logging...
closeForcibly();
();
safeSetFailure(promise, t);
}
}
}
#register0
private void register0(ChannelPromise promise) {
try {
if (!() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
: : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : :
// Execute the initializer. Before execution, there is only head -> initializer -> tail in the pipeline before execution
();
// After execution, it is head -> logging handler -> my handler -> tail
safeSetSuccess(promise);
();
if (isActive()) {
if (firstRegistration) {
// Trigger active event on pipeline
();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
closeForcibly();
();
safeSetFailure(promise, t);
}
}
Back to familiar code#channelActive
public void channelActive(ChannelHandlerContext ctx) {
();
// Trigger read (NioSocketChannel read here is just to trigger the channel event registration, and data reading has not been involved)
readIfIsAutoRead();
}
#doBeginRead
protected void doBeginRead() throws Exception {
// () or () was called
final SelectionKey selectionKey = ;
if (!()) {
return;
}
readPending = true;
// At this time interestOps is 0
final int interestOps = ();
if ((interestOps & readInterestOp) == 0) {
// Follow the read event
(interestOps | readInterestOp);
}
}
read analysis
Let's look at the readable events#read
, Note that the sent data may not be read in one go, so it will trigger multiple nio read events, pipeline reads will be triggered multiple times within one event, and pipeline reads will be triggered once once
public final void read() {
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
// Decide the implementation of allocator
final ByteBufAllocator allocator = ();
// Used to allocate byteBuf to determine the size of a single read
final allocHandle = recvBufAllocHandle();
(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
byteBuf = (allocator);
// Read
(doReadBytes(byteBuf));
if (() <= 0) {
();
byteBuf = null;
close = () < 0;
if (close) {
readPending = false;
}
break;
}
(1);
readPending = false;
// Trigger the read event and let the handler on the pipeline handle it. At this time, it is the handler on the NioSocketChannel
(byteBuf);
byteBuf = null;
}
// Do you want to continue the loop
while (());
();
// Trigger the read complete event
();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
if (!readPending && !()) {
removeReadOp();
}
}
}
#continueReading()
public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
Return
// Generally true
() &&
// respectMaybeMoreData defaults to true
// The logic of maybeMoreDataSupplier is to return true if the expected read byte is equal to the actual read byte
(!respectMaybeMoreData || ()) &&
// Less than the maximum number of times, maxMessagePerRead defaults to 16
totalMessages < maxMessagePerRead &&
// Actually read the data
totalBytesRead > 0;
}
Interview Question Column
Java Interview Question ColumnIt is online, welcome to visit.
- If you don’t know how to write your resume, you don’t know how to package your resume project;
- If you don’t know if there is something in your resume?
- If you don’t know how to answer some comprehensive questions;
Then you can send me a private message and I will do my best to help you.