Location>code7788 >text

Netty advanced usage and source code detailed explanation

Popularity:296 ℃/2025-02-12 11:01:36

Sticky and half pack

Sticking phenomenon

The problem of sticking packets occurs because you don’t know where the boundary of a user message is. If you know where the boundary is, the receiver can divide the valid user message through the boundary.

Server code

public class HelloWorldServer {
    static final Logger log = ();
    void start() {
        NioEventLoopGroup boss = new NioEventLoopGroup(1);
        NioEventLoopGroup worker = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            ();
            (boss, worker);
            (new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ().addLast(new LoggingHandler());
                    ().addLast(new ChannelInboundHandlerAdapter() {
                        @Override
                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                            ("connected {}", ());
                            (ctx);
                        }

                        @Override
                        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                            ("disconnect {}", ());
                            (ctx);
                        }
                    });
                }
            });
            ChannelFuture channelFuture = (8080);
            ("{} binding...", ());
            ();
            ("{} bound...", ());
            ().closeFuture().sync();
        } catch (InterruptedException e) {
            ("server error", e);
        } finally {
            ();
            ();
            ("stoped");
        }
    }

    public static void main(String[] args) {
        new HelloWorldServer().start();
    }
}

The client code wants to send 10 messages, each of which is 16 bytes.

public class HelloWorldClient {
     static final Logger log = ();
     public static void main(String[] args) {
         NioEventLoopGroup worker = new NioEventLoopGroup();
         try {
             Bootstrap bootstrap = new Bootstrap();
             ();
             (worker);
             (new ChannelInitializer<SocketChannel>() {
                 @Override
                 protected void initChannel(SocketChannel ch) throws Exception {
                     ("connetted...");
                     ().addLast(new ChannelInboundHandlerAdapter() {
                         @Override
                         //A Active event will be triggered after the connection channel is successfully established
                         public void channelActive(ChannelHandlerContext ctx) throws Exception {
                             ("sending...");
                             Random r = new Random();
                             char c = 'a';
                             for (int i = 0; i < 10; i++) {
                                 ByteBuf buffer = ().buffer();
                                 (new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
                                 (buffer);
                             }
                         }
                     });
                 }
             });
             ChannelFuture channelFuture = ("127.0.0.1", 8080).sync();
             ().closeFuture().sync();

         } catch (InterruptedException e) {
             ("client error", e);
         } finally {
             ();
         }
     }
 }

A certain output on the server side can be seen to receive 160 bytes at one time, while the expected one is 16 bytes at one time, which is received in 10 times. This shows that the phenomenon of sticking

08:24:46 [DEBUG] [main]  - [id: 0x81e0fda5] binding...
08:24:46 [DEBUG] [main]  - [id: 0x81e0fda5, L:/0:0:0:0:0:0:0:0:8080] bound...
08:24:55 [DEBUG] [nioEventLoopGroup-3-1]  - [id: 0x94132411, L:/127.0.0.1:8080 - R:/127.0.0.1:58177] REGISTERED
08:24:55 [DEBUG] [nioEventLoopGroup-3-1]  - [id: 0x94132411, L:/127.0.0.1:8080 - R:/127.0.0.1:58177] ACTIVE
08:24:55 [DEBUG] [nioEventLoopGroup-3-1]  - connected [id: 0x94132411, L:/127.0.0.1:8080 - R:/127.0.0.1:58177]
08:24:55 [DEBUG] [nioEventLoopGroup-3-1]  - [id: 0x94132411, L:/127.0.0.1:8080 - R:/127.0.0.1:58177] READ: 160B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000010| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000020| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000030| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000040| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000050| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000060| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000070| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000080| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000090| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
+--------+-------------------------------------------------+----------------+
08:24:55 [DEBUG] [nioEventLoopGroup-3-1]  - [id: 0x94132411, L:/127.0.0.1:8080 - R:/127.0.0.1:58177] READ COMPLETE

Half pack phenomenon

Half-package refers to the situation where the receiving end receives only part of the data, not the complete data.

The client code hopes to send 1 message, which is 160 bytes, and the code is changed to

ByteBuf buffer = ().buffer();
for (int i = 0; i < 10; i++) {
    (new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
}
(buffer);

Because the phenomenon is obvious, the server side will modify the receiving buffer, and the other codes will not change.

(ChannelOption.SO_RCVBUF, 10);

A certain output from the server side, you can see that the received message is divided into two sections, such as 20 bytes for the first time and 140 bytes for the second time.

08:43:49 [DEBUG] [main]  - [id: 0x4d6c6a84] binding...
08:43:49 [DEBUG] [main]  - [id: 0x4d6c6a84, L:/0:0:0:0:0:0:0:0:8080] bound...
08:44:23 [DEBUG] [nioEventLoopGroup-3-1]  - [id: 0x1719abf7, L:/127.0.0.1:8080 - R:/127.0.0.1:59221] REGISTERED
08:44:23 [DEBUG] [nioEventLoopGroup-3-1]  - [id: 0x1719abf7, L:/127.0.0.1:8080 - R:/127.0.0.1:59221] ACTIVE
08:44:23 [DEBUG] [nioEventLoopGroup-3-1]  - connected [id: 0x1719abf7, L:/127.0.0.1:8080 - R:/127.0.0.1:59221]
08:44:24 [DEBUG] [nioEventLoopGroup-3-1]  - [id: 0x1719abf7, L:/127.0.0.1:8080 - R:/127.0.0.1:59221] READ: 20B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000010| 00 01 02 03                                     |....            |
+--------+-------------------------------------------------+----------------+
08:44:24 [DEBUG] [nioEventLoopGroup-3-1]  - [id: 0x1719abf7, L:/127.0.0.1:8080 - R:/127.0.0.1:59221] READ COMPLETE
08:44:24 [DEBUG] [nioEventLoopGroup-3-1]  - [id: 0x1719abf7, L:/127.0.0.1:8080 - R:/127.0.0.1:59221] READ: 140B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 |................|
|00000010| 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 |................|
|00000020| 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 |................|
|00000030| 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 |................|
|00000040| 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 |................|
|00000050| 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 |................|
|00000060| 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 |................|
|00000070| 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 |................|
|00000080| 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f             |............    |
+--------+-------------------------------------------------+----------------+
08:44:24 [DEBUG] [nioEventLoopGroup-3-1]  - [id: 0x1719abf7, L:/127.0.0.1:8080 - R:/127.0.0.1:59221] READ COMPLETE

Notice:(ChannelOption.SO_RCVBUF, 10) The size of the underlying receiving buffer (i.e., sliding window) affected only determines the smallest unit of netty read. Netty actually reads each time it is an integer multiple of it

Phenomenon analysis

The sticky and semi-package problem that appears here is not a problem with JavaNIO or Netty. It is essentially TCP is a churn protocol and messages have no boundaries.

Sticky bag:

  • Phenomenon, send abc def, receive abcdef
  • reason
    • Application layer: Receiver ByteBuf is set too large (Netty default 1024)
    • Sliding window: Assuming that the sender 256 bytes represents a complete message, but because the receiver does not process it in time and the window size is large enough, these 256 bytes will be buffered in the receiver's sliding window. When multiple messages are buffered in the sliding window The text will stick to the wrap
    • Nagle algorithm: It causes sticky

Half pack

  • Phenomenon, send abcdef, receive abcdef
  • reason
    • Application layer: Receiver ByteBuf is less than the actual data sent
    • Sliding window: Suppose that the receiver has only 128 bytes left in the window and the sender's message size is 256 bytes. At this time, it can't be put down. You can only send the first 128 bytes first, and wait for ack to send the remaining part, which results in half a packet.
    • MSS limit: When the sent data exceeds the MSS limit, the data will be sent in segments, resulting in half a packet.

Solution

Let’s take a look at how Netty solves the above problems:

  1. Short link, send a packet to establish a connection, so that the connection is established between the connection and the connection is disconnected, the boundary of the message is too low.
  2. Each message is a fixed length, and the disadvantages are wasteful space
  3. Each message uses a delimiter, for example \n, and the disadvantages need to be escaped.
  4. Each message is divided into head and body, and the length of the body is included in the head

Method 1: Short link (extremely not recommended)

Taking the solution to sticky bags as an example

public class HelloWorldClient {
     static final Logger log = ();

     public static void main(String[] args) {
         // Send 10 times in 10 times
         for (int i = 0; i < 10; i++) {
             send();
         }
     }

     private static void send() {
         NioEventLoopGroup worker = new NioEventLoopGroup();
         try {
             Bootstrap bootstrap = new Bootstrap();
             ();
             (worker);
             (new ChannelInitializer<SocketChannel>() {
                 @Override
                 protected void initChannel(SocketChannel ch) throws Exception {
                     ("conneted...");
                     ().addLast(new LoggingHandler());
                     ().addLast(new ChannelInboundHandlerAdapter() {
                         @Override
                         public void channelActive(ChannelHandlerContext ctx) throws Exception {
                             ("sending...");
                             ByteBuf buffer = ().buffer();
                             (new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
                             (buffer);
                             // Close it after sending
                             ();
                         }
                     });
                 }
             });
             ChannelFuture channelFuture = ("localhost", 8080).sync();
             ().closeFuture().sync();

         } catch (InterruptedException e) {
             ("client error", e);
         } finally {
             ();
         }
     }
 }

Output, omitted

It is still difficult to solve the problem of using this method in half-packages, because the buffer size of the receiver is limited.

Method 2: Fixed length

Let all packets be fixed in length (assuming the length is 8 bytes), and add them on the server side

().addLast(new FixedLengthFrameDecoder(8));

Client test code, note that after using this method, the client can flush it at any time.

public class HelloWorldClient {
     static final Logger log = ();

     public static void main(String[] args) {
         NioEventLoopGroup worker = new NioEventLoopGroup();
         try {
             Bootstrap bootstrap = new Bootstrap();
             ();
             (worker);
             (new ChannelInitializer<SocketChannel>() {
                 @Override
                 protected void initChannel(SocketChannel ch) throws Exception {
                     ("connetted...");
                     ().addLast(new LoggingHandler());
                     ().addLast(new ChannelInboundHandlerAdapter() {
                         @Override
                         public void channelActive(ChannelHandlerContext ctx) throws Exception {
                             ("sending...");
                             // Send random packets of content
                             Random r = new Random();
                             char c = 'a';
                             ByteBuf buffer = ().buffer();
                             for (int i = 0; i < 10; i++) {
                                 byte[] bytes = new byte[8];
                                 for (int j = 0; j < (8); j++) {
                                     bytes[j] = (byte) c;
                                 }
                                 c++;
                                 (bytes);
                             }
                             (buffer);
                         }
                     });
                 }
             });
             ChannelFuture channelFuture = ("192.168.0.103", 9090).sync();
             ().closeFuture().sync();

         } catch (InterruptedException e) {
             ("client error", e);
         } finally {
             ();
         }
     }
 }

Client output

12:07:00 [DEBUG] [nioEventLoopGroup-2-1]  - connetted...
12:07:00 [DEBUG] [nioEventLoopGroup-2-1]  - [id: 0x3c2ef3c2] REGISTERED
12:07:00 [DEBUG] [nioEventLoopGroup-2-1]  - [id: 0x3c2ef3c2] CONNECT: /192.168.0.103:9090
12:07:00 [DEBUG] [nioEventLoopGroup-2-1]  - [id: 0x3c2ef3c2, L:/192.168.0.103:53155 - R:/192.168.0.103:9090] ACTIVE
12:07:00 [DEBUG] [nioEventLoopGroup-2-1]  - sending...
12:07:00 [DEBUG] [nioEventLoopGroup-2-1]  - [id: 0x3c2ef3c2, L:/192.168.0.103:53155 - R:/192.168.0.103:9090] WRITE: 80B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 61 61 61 00 00 00 00 62 00 00 00 00 00 00 00 |aaaa....b.......|
|00000010| 63 63 00 00 00 00 00 00 64 00 00 00 00 00 00 00 |cc......d.......|
|00000020| 00 00 00 00 00 00 00 00 66 66 66 66 00 00 00 00 |........ffff....|
|00000030| 67 67 67 00 00 00 00 00 68 00 00 00 00 00 00 00 |ggg.....h.......|
|00000040| 69 69 69 69 69 00 00 00 6a 6a 6a 6a 00 00 00 00 |iiiii...jjjj....|
+--------+-------------------------------------------------+----------------+
12:07:00 [DEBUG] [nioEventLoopGroup-2-1]  - [id: 0x3c2ef3c2, L:/192.168.0.103:53155 - R:/192.168.0.103:9090] FLUSH

Server output

12:06:51 [DEBUG] [main]  - [id: 0xe3d9713f] binding...
12:06:51 [DEBUG] [main]  - [id: 0xe3d9713f, L:/192.168.0.103:9090] bound...
12:07:00 [DEBUG] [nioEventLoopGroup-3-1]  - [id: 0xd739f137, L:/192.168.0.103:9090 - R:/192.168.0.103:53155] REGISTERED
12:07:00 [DEBUG] [nioEventLoopGroup-3-1]  - [id: 0xd739f137, L:/192.168.0.103:9090 - R:/192.168.0.103:53155] ACTIVE
12:07:00 [DEBUG] [nioEventLoopGroup-3-1]  - connected [id: 0xd739f137, L:/192.168.0.103:9090 - R:/192.168.0.103:53155]
12:07:00 [DEBUG] [nioEventLoopGroup-3-1]  - [id: 0xd739f137, L:/192.168.0.103:9090 - R:/192.168.0.103:53155] READ: 8B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 61 61 61 00 00 00 00                         |aaaa....        |
+--------+-------------------------------------------------+----------------+
12:07:00 [DEBUG] [nioEventLoopGroup-3-1]  - [id: 0xd739f137, L:/192.168.0.103:9090 - R:/192.168.0.103:53155] READ: 8B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 62 00 00 00 00 00 00 00                         |b.......        |
+--------+-------------------------------------------------+----------------+
12:07:00 [DEBUG] [nioEventLoopGroup-3-1]  - [id: 0xd739f137, L:/192.168.0.103:9090 - R:/192.168.0.103:53155] READ: 8B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 63 63 00 00 00 00 00 00                         |cc......        |
+--------+-------------------------------------------------+----------------+
12:07:00 [DEBUG] [nioEventLoopGroup-3-1]  - [id: 0xd739f137, L:/192.168.0.103:9090 - R:/192.168.0.103:53155] READ: 8B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 64 00 00 00 00 00 00 00                         |d.......        |
+--------+-------------------------------------------------+----------------+
12:07:00 [DEBUG] [nioEventLoopGroup-3-1]  - [id: 0xd739f137, L:/192.168.0.103:9090 - R:/192.168.0.103:53155] READ: 8B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 00 00 00 00 00                         |........        |
+--------+-------------------------------------------------+----------------+
12:07:00 [DEBUG] [nioEventLoopGroup-3-1]  - [id: 0xd739f137, L:/192.168.0.103:9090 - R:/192.168.0.103:53155] READ: 8B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 66 66 66 66 00 00 00 00                         |ffff....        |
+--------+-------------------------------------------------+----------------+
12:07:00 [DEBUG] [nioEventLoopGroup-3-1]  - [id: 0xd739f137, L:/192.168.0.103:9090 - R:/192.168.0.103:53155] READ: 8B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 67 67 67 00 00 00 00 00                         |ggg.....        |
+--------+-------------------------------------------------+----------------+
12:07:00 [DEBUG] [nioEventLoopGroup-3-1]  - [id: 0xd739f137, L:/192.168.0.103:9090 - R:/192.168.0.103:53155] READ: 8B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 00 00 00 00 00 00 00                         |h.......        |
+--------+-------------------------------------------------+----------------+
12:07:00 [DEBUG] [nioEventLoopGroup-3-1]  - [id: 0xd739f137, L:/192.168.0.103:9090 - R:/192.168.0.103:53155] READ: 8B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 69 69 69 69 69 00 00 00                         |iiiii...        |
+--------+-------------------------------------------------+----------------+
12:07:00 [DEBUG] [nioEventLoopGroup-3-1]  - [id: 0xd739f137, L:/192.168.0.103:9090 - R:/192.168.0.103:53155] READ: 8B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 6a 6a 6a 6a 00 00 00 00                         |jjjj....        |
+--------+-------------------------------------------------+----------------+
12:07:00 [DEBUG] [nioEventLoopGroup-3-1]  - [id: 0xd739f137, L:/192.168.0.103:9090 - R:/192.168.0.103:53155] READ COMPLETE

The disadvantage is that the size of the data packet is difficult to grasp.

  • Too big, wasteful
  • The length is too small, which seems not enough for some data packets

Method 3: Fixed delimiter

The server joins, and the default is \n or \r\n as the delimiter. If the delimiter does not appear after the specified length, an exception will be thrown.

().addLast(new LineBasedFrameDecoder(1024));

After each message, the client adds the \n separator

public class HelloWorldClient {
    static final Logger log = ();

    public static void main(String[] args) {
        NioEventLoopGroup worker = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            ();
            (worker);
            (new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ("connetted...");
                    ().addLast(new LoggingHandler());
                    ().addLast(new ChannelInboundHandlerAdapter() {
                        @Override
                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                            ("sending...");
                            Random r = new Random();
                            char c = 'a';
                            ByteBuf buffer = ().buffer();
                            for (int i = 0; i < 10; i++) {
                                for (int j = 1; j <= (16)+1; j++) {
                                    ((byte) c);
                                }
                                (10);
                                c++;
                            }
                            (buffer);
                        }
                    });
                }
            });
            ChannelFuture channelFuture = ("192.168.0.103", 9090).sync();
            ().closeFuture().sync();

        } catch (InterruptedException e) {
            ("client error", e);
        } finally {
            ();
        }
    }
}

Client output

14:08:18 [DEBUG] [nioEventLoopGroup-2-1]  - connetted...
14:08:18 [DEBUG] [nioEventLoopGroup-2-1]  - [id: 0x1282d755] REGISTERED
14:08:18 [DEBUG] [nioEventLoopGroup-2-1]  - [id: 0x1282d755] CONNECT: /192.168.0.103:9090
14:08:18 [DEBUG] [nioEventLoopGroup-2-1]  - [id: 0x1282d755, L:/192.168.0.103:63641 - R:/192.168.0.103:9090] ACTIVE
14:08:18 [DEBUG] [nioEventLoopGroup-2-1]  - sending...
14:08:18 [DEBUG] [nioEventLoopGroup-2-1]  - [id: 0x1282d755, L:/192.168.0.103:63641 - R:/192.168.0.103:9090] WRITE: 60B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 0a 62 62 62 0a 63 63 63 0a 64 64 0a 65 65 65 ||
|00000010| 65 65 65 65 65 65 65 0a 66 66 0a 67 67 67 67 67 ||
|00000020| 67 67 0a 68 68 68 68 0a 69 69 69 69 69 69 69 0a |.|
|00000030| 6a 6a 6a 6a 6a 6a 6a 6a 6a 6a 6a 0a             |jjjjjjjjjjj.    |
+--------+-------------------------------------------------+----------------+
14:08:18 [DEBUG] [nioEventLoopGroup-2-1]  - [id: 0x1282d755, L:/192.168.0.103:63641 - R:/192.168.0.103:9090] FLUSH

Server output

14:08:18 [DEBUG] [nioEventLoopGroup-3-5]  - connected [id: 0xa4b3be43, L:/192.168.0.103:9090 - R:/192.168.0.103:63641]
14:08:18 [DEBUG] [nioEventLoopGroup-3-5]  - [id: 0xa4b3be43, L:/192.168.0.103:9090 - R:/192.168.0.103:63641] READ: 1B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 61                                              |a               |
+--------+-------------------------------------------------+----------------+
14:08:18 [DEBUG] [nioEventLoopGroup-3-5]  - [id: 0xa4b3be43, L:/192.168.0.103:9090 - R:/192.168.0.103:63641] READ: 3B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 62 62 62                                        |bbb             |
+--------+-------------------------------------------------+----------------+
14:08:18 [DEBUG] [nioEventLoopGroup-3-5]  - [id: 0xa4b3be43, L:/192.168.0.103:9090 - R:/192.168.0.103:63641] READ: 3B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 63 63 63                                        |ccc             |
+--------+-------------------------------------------------+----------------+
14:08:18 [DEBUG] [nioEventLoopGroup-3-5]  - [id: 0xa4b3be43, L:/192.168.0.103:9090 - R:/192.168.0.103:63641] READ: 2B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 64 64                                           |dd              |
+--------+-------------------------------------------------+----------------+
14:08:18 [DEBUG] [nioEventLoopGroup-3-5]  - [id: 0xa4b3be43, L:/192.168.0.103:9090 - R:/192.168.0.103:63641] READ: 10B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 65 65 65 65 65 65 65 65 65 65                   |eeeeeeeeee      |
+--------+-------------------------------------------------+----------------+
14:08:18 [DEBUG] [nioEventLoopGroup-3-5]  - [id: 0xa4b3be43, L:/192.168.0.103:9090 - R:/192.168.0.103:63641] READ: 2B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 66 66                                           |ff              |
+--------+-------------------------------------------------+----------------+
14:08:18 [DEBUG] [nioEventLoopGroup-3-5]  - [id: 0xa4b3be43, L:/192.168.0.103:9090 - R:/192.168.0.103:63641] READ: 7B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 67 67 67 67 67 67 67                            |ggggggg         |
+--------+-------------------------------------------------+----------------+
14:08:18 [DEBUG] [nioEventLoopGroup-3-5]  - [id: 0xa4b3be43, L:/192.168.0.103:9090 - R:/192.168.0.103:63641] READ: 4B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 68 68 68                                     |hhhh            |
+--------+-------------------------------------------------+----------------+
14:08:18 [DEBUG] [nioEventLoopGroup-3-5]  - [id: 0xa4b3be43, L:/192.168.0.103:9090 - R:/192.168.0.103:63641] READ: 7B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 69 69 69 69 69 69 69                            |iiiiiii         |
+--------+-------------------------------------------------+----------------+
14:08:18 [DEBUG] [nioEventLoopGroup-3-5]  - [id: 0xa4b3be43, L:/192.168.0.103:9090 - R:/192.168.0.103:63641] READ: 11B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 6a 6a 6a 6a 6a 6a 6a 6a 6a 6a 6a                |jjjjjjjjjjj     |
+--------+-------------------------------------------------+----------------+
14:08:18 [DEBUG] [nioEventLoopGroup-3-5]  - [id: 0xa4b3be43, L:/192.168.0.103:9090 - R:/192.168.0.103:63641] READ COMPLETE

Disadvantages, it is more appropriate to process character data, but ifThe content itself contains the separator(This often happens in byte data), then the error will be parsed

Method 4: Preset length

Before sending a message, it is agreed to use fixed-length bytes to represent the length of the next data

// Maximum length, length offset, length occupancy, length adjustment, number of stripped bytes
 ().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 1, 0, 1));

Client Code

public class HelloWorldClient {
     static final Logger log = ();

     public static void main(String[] args) {
         NioEventLoopGroup worker = new NioEventLoopGroup();
         try {
             Bootstrap bootstrap = new Bootstrap();
             ();
             (worker);
             (new ChannelInitializer<SocketChannel>() {
                 @Override
                 protected void initChannel(SocketChannel ch) throws Exception {
                     ("connetted...");
                     ().addLast(new LoggingHandler());
                     ().addLast(new ChannelInboundHandlerAdapter() {
                         @Override
                         public void channelActive(ChannelHandlerContext ctx) throws Exception {
                             ("sending...");
                             Random r = new Random();
                             char c = 'a';
                             ByteBuf buffer = ().buffer();
                             for (int i = 0; i < 10; i++) {
                                 byte length = (byte) ((16) + 1);
                                 // Write the length first
                                 (length);
                                 // Again
                                 for (int j = 1; j <= length; j++) {
                                     ((byte) c);
                                 }
                                 c++;
                             }
                             (buffer);
                         }
                     });
                 }
             });
             ChannelFuture channelFuture = ("192.168.0.103", 9090).sync();
             ().closeFuture().sync();

         } catch (InterruptedException e) {
             ("client error", e);
         } finally {
             ();
         }
     }
 }

Client output

14:37:10 [DEBUG] [nioEventLoopGroup-2-1]  - connetted...
14:37:10 [DEBUG] [nioEventLoopGroup-2-1]  - [id: 0xf0f347b8] REGISTERED
14:37:10 [DEBUG] [nioEventLoopGroup-2-1]  - [id: 0xf0f347b8] CONNECT: /192.168.0.103:9090
14:37:10 [DEBUG] [nioEventLoopGroup-2-1]  - [id: 0xf0f347b8, L:/192.168.0.103:49979 - R:/192.168.0.103:9090] ACTIVE
14:37:10 [DEBUG] [nioEventLoopGroup-2-1]  - sending...
14:37:10 [DEBUG] [nioEventLoopGroup-2-1]  - [id: 0xf0f347b8, L:/192.168.0.103:49979 - R:/192.168.0.103:9090] WRITE: 97B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 09 61 61 61 61 61 61 61 61 61 09 62 62 62 62 62 |.|
|00000010| 62 62 62 62 06 63 63 63 63 63 63 08 64 64 64 64 ||
|00000020| 64 64 64 64 0f 65 65 65 65 65 65 65 65 65 65 65 ||
|00000030| 65 65 65 65 0d 66 66 66 66 66 66 66 66 66 66 66 ||
|00000040| 66 66 02 67 67 02 68 68 0e 69 69 69 69 69 69 69 ||
|00000050| 69 69 69 69 69 69 69 09 6a 6a 6a 6a 6a 6a 6a 6a ||
|00000060| 6a                                              |j               |
+--------+-------------------------------------------------+----------------+
14:37:10 [DEBUG] [nioEventLoopGroup-2-1]  - [id: 0xf0f347b8, L:/192.168.0.103:49979 - R:/192.168.0.103:9090] FLUSH

Server output

14:36:50 [DEBUG] [main]  - [id: 0xdff439d3] binding...
14:36:51 [DEBUG] [main]  - [id: 0xdff439d3, L:/192.168.0.103:9090] bound...
14:37:10 [DEBUG] [nioEventLoopGroup-3-1]  - [id: 0x744f2b47, L:/192.168.0.103:9090 - R:/192.168.0.103:49979] REGISTERED
14:37:10 [DEBUG] [nioEventLoopGroup-3-1]  - [id: 0x744f2b47, L:/192.168.0.103:9090 - R:/192.168.0.103:49979] ACTIVE
14:37:10 [DEBUG] [nioEventLoopGroup-3-1]  - connected [id: 0x744f2b47, L:/192.168.0.103:9090 - R:/192.168.0.103:49979]
14:37:10 [DEBUG] [nioEventLoopGroup-3-1]  - [id: 0x744f2b47, L:/192.168.0.103:9090 - R:/192.168.0.103:49979] READ: 9B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 61 61 61 61 61 61 61 61                      |aaaaaaaaa       |
+--------+-------------------------------------------------+----------------+
14:37:10 [DEBUG] [nioEventLoopGroup-3-1]  - [id: 0x744f2b47, L:/192.168.0.103:9090 - R:/192.168.0.103:49979] READ: 9B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 62 62 62 62 62 62 62 62 62                      |bbbbbbbbb       |
+--------+-------------------------------------------------+----------------+
14:37:10 [DEBUG] [nioEventLoopGroup-3-1]  - [id: 0x744f2b47, L:/192.168.0.103:9090 - R:/192.168.0.103:49979] READ: 6B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 63 63 63 63 63 63                               |cccccc          |
+--------+-------------------------------------------------+----------------+
14:37:10 [DEBUG] [nioEventLoopGroup-3-1]  - [id: 0x744f2b47, L:/192.168.0.103:9090 - R:/192.168.0.103:49979] READ: 8B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 64 64 64 64 64 64 64 64                         |dddddddd        |
+--------+-------------------------------------------------+----------------+
14:37:10 [DEBUG] [nioEventLoopGroup-3-1]  - [id: 0x744f2b47, L:/192.168.0.103:9090 - R:/192.168.0.103:49979] READ: 15B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 65 65 65 65 65 65 65 65 65 65 65 65 65 65 65    |eeeeeeeeeeeeeee |
+--------+-------------------------------------------------+----------------+
14:37:10 [DEBUG] [nioEventLoopGroup-3-1]  - [id: 0x744f2b47, L:/192.168.0.103:9090 - R:/192.168.0.103:49979] READ: 13B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 66 66 66 66 66 66 66 66 66 66 66 66 66          |fffffffffffff   |
+--------+-------------------------------------------------+----------------+
14:37:10 [DEBUG] [nioEventLoopGroup-3-1]  - [id: 0x744f2b47, L:/192.168.0.103:9090 - R:/192.168.0.103:49979] READ: 2B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 67 67                                           |gg              |
+--------+-------------------------------------------------+----------------+
14:37:10 [DEBUG] [nioEventLoopGroup-3-1]  - [id: 0x744f2b47, L:/192.168.0.103:9090 - R:/192.168.0.103:49979] READ: 2B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 68                                           |hh              |
+--------+-------------------------------------------------+----------------+
14:37:10 [DEBUG] [nioEventLoopGroup-3-1]  - [id: 0x744f2b47, L:/192.168.0.103:9090 - R:/192.168.0.103:49979] READ: 14B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 69 69 69 69 69 69 69 69 69 69 69 69 69 69       |iiiiiiiiiiiiii  |
+--------+-------------------------------------------------+----------------+
14:37:10 [DEBUG] [nioEventLoopGroup-3-1]  - [id: 0x744f2b47, L:/192.168.0.103:9090 - R:/192.168.0.103:49979] READ: 9B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 6a 6a 6a 6a 6a 6a 6a 6a 6a                      |jjjjjjjjj       |
+--------+-------------------------------------------------+----------------+
14:37:10 [DEBUG] [nioEventLoopGroup-3-1]  - [id: 0x744f2b47, L:/192.168.0.103:9090 - R:/192.168.0.103:49979] READ COMPLETE

Protocol design and analysis

Why are agreements needed?

Message transmission in TCP/IP is based on streams and has no boundaries.

The purpose of the agreement is to define the boundaries of messages and formulate communication rules that both parties to communicate must abide by.

For example: Transmission on the network

Keep guests on rainy days and stay me or not

It is a famous Chinese sentence without punctuation. In the absence of punctuation, this sentence has several ways to disassemble it, but its meaning is completely different, so it is often used to describe the importance of punctuation.

An interpretation

It's rainy, but I won't

Another interpretation

It's raining, stay with guests, will you keep me?  Keep

How to design a protocol? In fact, it is to add "punctuation marks" to the information transmitted on the network. But it is not good to break a sentence by a separator, because the separator itself must be distinguished if it is used for transmission. Therefore, the following protocol is more commonly used

Fixed length bytes represent content length + actual content

For example, if a Chinese character length is 3, according to the above protocol rules, the sending information is as follows, and the receiver will not misinterpret it.

0f It takes guests on rainy days 06 days 09 I won't stay

Redis protocol example

Simulates the redis client sending commands.

NioEventLoopGroup worker = new NioEventLoopGroup();
 byte[] LINE = {13, 10};
 try {
     Bootstrap bootstrap = new Bootstrap();
     ();
     (worker);
     (new ChannelInitializer<SocketChannel>() {
         @Override
         protected void initChannel(SocketChannel ch) {
             ().addLast(new LoggingHandler());
             ().addLast(new ChannelInboundHandlerAdapter() {
                 // An active event will be triggered after the connection channel is successfully established
                 @Override
                 public void channelActive(ChannelHandlerContext ctx) {
                     set(ctx);
                     get(ctx);
                 }
                 private void get(ChannelHandlerContext ctx) {
                     ByteBuf buf = ().buffer();
                     ("*2".getBytes());//*2 means that the number of elements in the array is 2, that is, get aaa is 2 strings of contents
                     (LINE);
                     ("$3".getBytes());//Suppose $3 means that there are 3 bytes in the following order
                     (LINE);
                     ("get".getBytes());//Enter the gset command
                     (LINE);
                     ("$3".getBytes());
                     (LINE);
                     ("aaa".getBytes());//Input key is aaa
                     (LINE);
                     (buf);
                 }
                 private void set(ChannelHandlerContext ctx) {
                     //The following redis command is set aaa bbb
                     ByteBuf buf = ().buffer();
                     ("*3".getBytes());//*3 means that the number of array elements is 3, that is, set aaa bbb is 3 strings of contents
                     (LINE);
                     ("$3".getBytes());//Suppose $3 means that there are 3 bytes in the following order
                     (LINE);
                     ("set".getBytes());//Enter the set command
                     (LINE);
                     ("$3".getBytes());
                     (LINE);
                     ("aaa".getBytes());//Input key is aaa
                     (LINE);
                     ("$3".getBytes());
                     (LINE);
                     ("bbb".getBytes());//The input value is bbb
                     (LINE);
                     (buf);
                 }

                 @Override
                 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                     ByteBuf buf = (ByteBuf) msg;
                     ((()));
                 }
             });
         }
     });
     ChannelFuture channelFuture = ("localhost", 6379).sync();
     ().closeFuture().sync();
 } catch (InterruptedException e) {
     ("client error", e);
 } finally {
     ();
 }

Of course netty provides these ready-made protocols, which do not require us to develop them ourselves, so we are here to know why.

http protocol example

Simulate http server

NioEventLoopGroup boss = new NioEventLoopGroup();
 NioEventLoopGroup worker = new NioEventLoopGroup();
 try {
     ServerBootstrap serverBootstrap = new ServerBootstrap();
     ();
     (boss, worker);
     (new ChannelInitializer<SocketChannel>() {
         @Override
         protected void initChannel(SocketChannel ch) throws Exception {
             ().addLast(new LoggingHandler());
             ().addLast(new HttpServerCodec());
             ().addLast(new SimpleChannelInboundHandler<HttpRequest>() {
                 @Override
                 protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {
                     // Get request
                     (());

                     // Return response
                     DefaultFullHttpResponse response =
                             new DefaultFullHttpResponse((), );

                     byte[] bytes = "<h1>Hello, world!</h1>".getBytes();

                     ().setInt(CONTENT_LENGTH, );
                     ().writeBytes(bytes);

                     // Write a response
                     (response);
                 }
             });
             /*().addLast(new ChannelInboundHandlerAdapter() {
                 @Override
                 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                     ("{}", ());

                     if (msg instanceof HttpRequest) { // Request line, request header

                     } else if (msg instanceof HttpContent) { //Request body

                     }
                 }
             });*/
         }
     });
     ChannelFuture channelFuture = (8080).sync();
     ().closeFuture().sync();
 } catch (InterruptedException e) {
     ("server error", e);
 } finally {
     ();
     ();
 }

Custom protocol elements

  • Magic Number: It is agreed upon to determine whether it is an invalid data packet at the first time.
  • Version number: Can support protocol upgrades
  • Serialization algorithm: Which serialization deserialization method is used for the message body? It can be extended by this, such as: json, protobuf, hessian, jdk
  • Instruction type: login, registration, single chat, group chat... related to business
  • Request serial number: Provides asynchronous capability for duplex communication
  • Text length
  • Message text

Codec

Based on the above elements, design a login request message and a login response message, and use Netty to complete the sending and receiving

@Slf4j
 public class MessageCodec extends ByteToMessageCodec<Message> {

     @Override
     protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
         // 1. 4 byte magic number
         (new byte[]{1, 2, 3, 4});
         // 2. 1 byte version,
         (1);
         // 3. 1 byte serialization method jdk 0 , json 1
         (0);
         // 4. 1 byte instruction type
         (());
         // 5. 4 bytes
         (());
         // No sense, aligning fill
         (0xff);
         // 6. Get the byte array of content
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         ObjectOutputStream oos = new ObjectOutputStream(bos);
         (msg);
         byte[] bytes = ();
         // 7. Length
         ();
         // 8. Write content
         (bytes);
     }

     @Override
     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
         int magicNum = ();
         byte version = ();
         byte serializerType = ();
         byte messageType = ();
         int sequenceId = ();
         ();
         int length = ();
         byte[] bytes = new byte[length];
         (bytes, 0, length);
         ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
         Message message = (Message) ();
         ("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
         ("{}", message);
         (message);
     }
 }

test

EmbeddedChannel channel = new EmbeddedChannel(
     new LoggingHandler(),
     new LengthFieldBasedFrameDecoder(
         1024, 12, 4, 0, 0),
     new MessageCodec()
 );
 // encode
 LoginRequestMessage message = new LoginRequestMessage("zhangsan", "123", "Zhang San");
 // (message);
 // decode
 ByteBuf buf = ();
 new MessageCodec().encode(null, message, buf);

 ByteBuf s1 = (0, 100);
 ByteBuf s2 = (100, () - 100);
 (); // Reference count 2
 (s1); // release 1
 (s2);

@Sharable

  • When the handler does not save the state, it can be safely shared under multithreading
  • But be aware that for codec classes, the ByteToMessageCodec or CombinedChannelDuplexHandler parent class cannot be inherited. Their constructor methods have restrictions on @Sharable
  • If you can ensure that the codec does not save state, you can inherit the MessageToMessageCodec parent class
@Slf4j
 @
 /**
  * Must be used with LengthFieldBasedFrameDecoder to ensure that the received ByteBuf message is complete
  */
 public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {
     @Override
     protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {
         ByteBuf out = ().buffer();
         // 1. 4 byte magic number
         (new byte[]{1, 2, 3, 4});
         // 2. 1 byte version,
         (1);
         // 3. 1 byte serialization method jdk 0 , json 1
         (0);
         // 4. 1 byte instruction type
         (());
         // 5. 4 bytes
         (());
         // No sense, aligning fill
         (0xff);
         // 6. Get the byte array of content
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         ObjectOutputStream oos = new ObjectOutputStream(bos);
         (msg);
         byte[] bytes = ();
         // 7. Length
         ();
         // 8. Write content
         (bytes);
         (out);
     }

     @Override
     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
         int magicNum = ();
         byte version = ();
         byte serializerType = ();
         byte messageType = ();
         int sequenceId = ();
         ();
         int length = ();
         byte[] bytes = new byte[length];
         (bytes, 0, length);
         ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
         Message message = (Message) ();
         ("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
         ("{}", message);
         (message);
     }
 }

Extended serialization algorithm

Serialization, deserialization is mainly used in the conversion of message body

  • When serializing, Java objects need to be changed into data to be transmitted (can be byte[], or json, etc., and eventually they need to be byte[])
  • When deserializing, the incoming body data needs to be restored to Java objects for easy processing.

The current code only supports Java's own serialization and deserialization mechanism. The core code is as follows

// Deserialization
 byte[] body = new byte[bodyLength];
 (body);
 ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(body));
 Message message = (Message) ();
 (sequenceId);

 // Serialization
 ByteArrayOutputStream out = new ByteArrayOutputStream();
 new ObjectOutputStream(out).writeObject(message);
 byte[] bytes = ();

To support more serialization algorithms, abstract a Serializer interface

public interface Serializer {

     // Deserialization method
     <T> T deserialize(Class<T> clazz, byte[] bytes);

     // Serialization method
     <T> byte[] serialize(T object);

 }

Provide two implementations, here we directly add the specific implementation to the enumeration class

enum SerializerAlgorithm implements Serializer {
 // Java implementation
     Java {
         @Override
         public <T> T deserialize(Class<T> clazz, byte[] bytes) {
             try {
                 ObjectInputStream in =
                     new ObjectInputStream(new ByteArrayInputStream(bytes));
                 Object object = ();
                 return (T) object;
             } catch (IOException | ClassNotFoundException e) {
                 throw new RuntimeException("Deserialization error", e);
             }
         }

         @Override
         public <T> byte[] serialize(T object) {
             try {
                 ByteArrayOutputStream out = new ByteArrayOutputStream();
                 new ObjectOutputStream(out).writeObject(object);
                 return ();
             } catch (IOException e) {
                 throw new RuntimeException("Serialization Error", e);
             }
         }
     },
    
     // Json implementation (introduced Gson dependencies)
     Json {
         @Override
         public <T> T deserialize(Class<T> clazz, byte[] bytes) {
             return new Gson().fromJson(new String(bytes, StandardCharsets.UTF_8), clazz);
         }

         @Override
         public <T> byte[] serialize(T object) {
             return new Gson().toJson(object).getBytes(StandardCharsets.UTF_8);
         }
     };

     // What kind of serialization algorithm is needed to get from the bytes of the protocol
     public static SerializerAlgorithm getByInt(int type) {
         SerializerAlgorithm[] array = ();
         if (type < 0 || type > - 1) {
             throw new IllegalArgumentException("Exceed the SerializerAlgorithm range");
         }
         return array[type];
     }
 }

Add configuration classes and configuration files

public abstract class Config {
    static Properties properties;
    static {
        try (InputStream in = ("/")) {
            properties = new Properties();
            (in);
        } catch (IOException e) {
            throw new ExceptionInInitializerError(e);
        }
    }
    public static int getServerPort() {
        String value = ("");
        if(value == null) {
            return 8080;
        } else {
            return (value);
        }
    }
    public static  getSerializerAlgorithm() {
        String value = ("");
        if(value == null) {
            return ;
        } else {
            return (value);
        }
    }
}

Configuration File

=Json

Modify the codec

/**
  * Must be used with LengthFieldBasedFrameDecoder to ensure that the received ByteBuf message is complete
  */
 public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {
     @Override
     public void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {
         ByteBuf out = ().buffer();
         // 1. 4 byte magic number
         (new byte[]{1, 2, 3, 4});
         // 2. 1 byte version,
         (1);
         // 3. 1 byte serialization method jdk 0 , json 1
         (().ordinal());
         // 4. 1 byte instruction type
         (());
         // 5. 4 bytes
         (());
         // No sense, aligning fill
         (0xff);
         // 6. Get the byte array of content
         byte[] bytes = ().serialize(msg);
         // 7. Length
         ();
         // 8. Write content
         (bytes);
         (out);
     }

     @Override
     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
         int magicNum = ();
         byte version = ();
         byte serializerAlgorithm = (); // 0 or 1
         byte messageType = (); // 0,1,2...
         int sequenceId = ();
         ();
         int length = ();
         byte[] bytes = new byte[length];
         (bytes, 0, length);

         // Find the deserialization algorithm
          algorithm = ()[serializerAlgorithm];
         // Determine the specific message type
         Class<? extends Message> messageClass = (messageType);
         Message message = (messageClass, bytes);
 // ("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
 // ("{}", message);
         (message);
     }
 }

The specific message type can be determined according toMessage type bytesGet the correspondingMessage class

@Data
 public abstract class Message implements Serializable {

     /**
      * Obtain the corresponding message class according to the message type bytes
      * @param messageType Message type byte
      * @return message class
      */
     public static Class<? extends Message> getMessageClass(int messageType) {
         return (messageType);
     }

     private int sequenceId;

     private int messageType;

     public abstract int getMessageType();

     public static final int LoginRequestMessage = 0;
     public static final int LoginResponseMessage = 1;
     public static final int ChatRequestMessage = 2;
     public static final int ChatResponseMessage = 3;
     public static final int GroupCreateRequestMessage = 4;
     public static final int GroupCreateResponseMessage = 5;
     public static final int GroupJoinRequestMessage = 6;
     public static final int GroupJoinResponseMessage = 7;
     public static final int GroupQuitRequestMessage = 8;
     public static final int GroupQuitResponseMessage = 9;
     public static final int GroupChatRequestMessage = 10;
     public static final int GroupChatResponseMessage = 11;
     public static final int GroupMembersRequestMessage = 12;
     public static final int GroupMembersResponseMessage = 13;
     public static final int PingMessage = 14;
     public static final int PongMessage = 15;
     private static final Map<Integer, Class<? extends Message>> messageClasses = new HashMap<>();

     static {
         (LoginRequestMessage, );
         (LoginResponseMessage, );
         (ChatRequestMessage, );
         (ChatResponseMessage, );
         (GroupCreateRequestMessage, );
         (GroupCreateResponseMessage, );
         (GroupJoinRequestMessage, );
         (GroupJoinResponseMessage, );
         (GroupQuitRequestMessage, );
         (GroupQuitResponseMessage, );
         (GroupChatRequestMessage, );
         (GroupChatResponseMessage, );
         (GroupMembersRequestMessage, );
         (GroupMembersResponseMessage, );
     }
 }

Parameter tuning

Related source codes to be updated

Client parameters CONNECT_TIMEOUT_MILLIS

  • Belongs to SocketChannal parameters

  • When using when establishing a connection between the client, if the connection cannot be connected within a specified millisecond, a timeout exception will be thrown.

  • SO_TIMEOUT is mainly used to block IO, blocking IO, accept, read, etc. are waiting infinitely. If you don't want to block forever, use it to adjust the timeout time.

@Slf4j
 public class TestConnectionTimeout {
     public static void main(String[] args) {
         NioEventLoopGroup group = new NioEventLoopGroup();
         try {
             Bootstrap bootstrap = new Bootstrap()
                     .group(group)
                     .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 300)
                     .channel()
                     .handler(new LoggingHandler());
             ChannelFuture future = ("127.0.0.1", 8080);
             ().channel().closeFuture().sync(); // Breakpoint 1
         } catch (Exception e) {
             ();
             ("timeout");
         } finally {
             ();
         }
     }
 }

Also source code part#connect

@Override
 public final void connect(
         final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
     // ...
     // Get the timeout time
     int connectTimeoutMillis = config().getConnectTimeoutMillis();
     if (connectTimeoutMillis > 0) {//If the timeout is greater than 0
         connectTimeoutFuture = eventLoop().schedule(new Runnable() {// Then start the timed task
             @Override
             public void run() {
                 //connectPromise is an object that exchanges data between two threads
                 ChannelPromise connectPromise = ;
                 ConnectTimeoutException cause =
                     new ConnectTimeoutException("connection timed out: " + remoteAddress); // Breakpoint 2
                 //(cause) Put the exception in the connectPromise and wake up the main thread
                 if (connectPromise != null && (cause)) {
                     close(voidPromise());
                 }
             }
         }, connectTimeoutMillis, );//The timing task is executed after the connectTimeoutMillis time
     }
 // ...
 }

Server parameter SO_BACKLOG

This is a parameter belonging to ServerSocketChannal

There are semi-connection queues and fully-connection queues during three handshakes. For more information, please see this article:TCP - Semi-connection queue and full-connection queue

  • sync queue - semi-connection queue
    • The size is specified by /proc/sys/net/ipv4/tcp_max_syn_backlog, insyncookiesWhen enabled, there is no logical maximum limit, and this setting is ignored.
  • accept queue - Full connection queue
    • Its size is specified by /proc/sys/net/core/somaxconn. When using the listen function, the kernel will take the smaller values ​​of both based on the incoming backlog parameters and system parameters.
    • If the accpet queue is full, the server will send an error message that refuses to the client

In netty, you can set the size of the backlog by option(ChannelOption.SO_BACKLOG, value)

You can view the default size through the following source code

public class DefaultServerSocketChannelConfig extends DefaultChannelConfig
                                              implements ServerSocketChannelConfig {

    private volatile int backlog = ;
    // ...
}

TCP_NODELAY

  • Belongs to SocketChannal parameters

Send immediately, it is recommended to set to true. false means turning on the nagle algorithm

SO_SNDBUF & SO_RCVBUF

set upSliding windowThese parameters may need to be set earlier, but now tcp will automatically adjust the window according to congestion, etc., so it is not recommended to manually set these two values.

  • SO_SNDBUF belongs to the SocketChannal parameter
  • SO_RCVBUF can be used for both the SocketChannal parameter and the ServerSocketChannal parameter (it is recommended to set it to ServerSocketChannal)

ALLOCATOR

  • Belongs to SocketChannal parameters
  • Used to allocate ByteBuf, ()

RCVBUF_ALLOCATOR

  • Belongs to SocketChannal parameters
  • Control the netty receive buffer size
  • Responsible for the allocation of inbound data, determine the size of the inbound buffer (and can be adjusted dynamically), and adopt direct direct memory uniformly. The specific pooling or non-pooling is determined by the allocator.

Source code detailed explanation

Start the analysis

Let's take a look at how the following code is processed in netty

//1 Use NioEventLoopGroup (nio boss thread for short) in netty to encapsulate threads and selectors
 Selector selector = ();

 //2 Create a NioServerSocketChannel, and initialize its associated handler, and store config for native ssc
 NioServerSocketChannel attachment = new NioServerSocketChannel();

 //3 When creating NioServerSocketChannel, a serverSocketChannel native to java was created
 ServerSocketChannel serverSocketChannel = ();
 (false);

 //4 Start the nio boss thread to perform the next operation

 //5 Register (only associated with selector and NioServerSocketChannel), no events are followed
 SelectionKey selectionKey = (selector, 0, attachment);

 //6 head -> Initializer -> ServerBootstrapAcceptor -> tail, the initializer is one-time, just to add acceptor

 //7 Bind port
 (new InetSocketAddress(8080));

 //8 Trigger channel active event, follow the op_accept event in the head
 (SelectionKey.OP_ACCEPT);

Entrance#bind

Key Code#doBind

Which threads are used to process this function? You can have a concept first and then read:

  1. init & register regFuture processing
    1. init: handled by main
      1. Create NioServerSocketChannel: handled by main
      2. Add NioServerSocketChannel Initialization handler: handled by main
        1. Initialize handler waiting for call
  2. register
    1. Start the nio boss thread: handled by main
    2. Native ssc Register to selector Unattended Events: Handled by nio-thread
    3. Execute NioServerSocketChannel initialization handler: handled by nio-thread
  3. regFuture waits for callback doBind0: handled by nio-thread
    1. Native ServerSocketChannel binding: handled by nio-thread
    2. Triggering NioServerSocketChannel active event: handled by nio-thread
private ChannelFuture doBind(final SocketAddress localAddress) {
 // 1. Execute initialization and registration regFuture will be set by initAndRegister to set whether it is completed, thereby calling back at code in 3.2
     // init is equivalent to ServerSocketChannel ssc= ();
     // Register is equivalent to SelectionKey selectionKey=(selector, 0, nettySsc);
     final ChannelFuture regFuture = initAndRegister();
     final Channel channel = ();
     if (() != null) {
         return regFuture;
     }

     // 2. Because it is an asynchronous execution of initAndRegister, it needs to be divided into two situations. It also needs to be distinguished by the suspend breakpoint type during debugging.
     // 2.1 If completed
     if (()) {
         ChannelPromise promise = ();
         // 3.1 Call doBind0 immediately
         doBind0(regFuture, channel, localAddress, promise);
         return promise;
     }
     // 2.2 Not completed yet
     else {
         final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
         // 3.2 callback doBind0
         (new ChannelFutureListener() {
             @Override
             public void operationComplete(ChannelFuture future) throws Exception {
                 Throwable cause = ();
                 if (cause != null) {
                     // Handle exceptions...
                     (cause);
                 } else {
                     ();
 // 3. The registration thread executes doBind0
                     doBind0(regFuture, channel, localAddress, promise);
                 }
             }
         });
         return promise;
     }
 }

Key Code#initAndRegister

final ChannelFuture initAndRegister() {
     Channel channel = null;
     try {
         channel = ();
         // 1.1 Initialization - What you do is add an initializer ChannelInitializer
         init(channel);
     } catch (Throwable t) {
         // Handle exceptions...
         return new DefaultChannelPromise(new FailedChannel(), ).setFailure(t);
     }

     // 1.2 Registration - What you do is register the native channel to the selector
     ChannelFuture regFuture = config().group().register(channel);
     if (() != null) {
         // Handle exceptions...
     }
     return regFuture;
 }

Key Code#init

// Here the channel is actually NioServerSocketChannel
 void init(Channel channel) throws Exception {
     final Map<ChannelOption<?>, Object> options = options0();
     synchronized (options) {
         setChannelOptions(channel, options, logger);
     }

     final Map<AttributeKey<?>, Object> attrs = attrs0();
     synchronized (attrs) {
         for (Entry<AttributeKey<?>, Object> e: ()) {
             @SuppressWarnings("unchecked")
             AttributeKey<Object> key = (AttributeKey<Object>) ();
             (key).set(());
         }
     }

     ChannelPipeline p = ();

     final EventLoopGroup currentlyChildGroup = childGroup;
     final ChannelHandler currentChildHandler = childHandler;
     final Entry<ChannelOption<?>, Object>[] currentChildOptions;
     final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
     synchronized (childOptions) {
         currentChildOptions = ().toArray(newOptionArray(0));
     }
     synchronized (childAttrs) {
         currentChildAttrs = ().toArray(newAttrArray(0));
     }
 : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : :
     // Add an initializer for NioServerSocketChannel
     (new ChannelInitializer<Channel>() {
         @Override
         public void initChannel(final Channel ch) throws Exception {
             final ChannelPipeline pipeline = ();
             ChannelHandler handler = ();
             if (handler != null) {
                 (handler);
             }

             // The initializer's responsibility is to add ServerBootstrapAcceptor to NioServerSocketChannel
             ().execute(new Runnable() {
                 @Override
                 public void run() {
                     (new ServerBootstrapAcceptor(
                             ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                 }
             });
         }
     });
 }

Key Code#register

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
     // Some checks, a little...

      = eventLoop;

     if (()) {
         register0(promise);
     } else {
         try {
             // When the execute method is executed for the first time, the nio thread will be started, and then the registration and other operations will be executed on the nio thread.
             // Because there is only one NioServerSocketChannel, there will be only one boss nio thread
             // The fact that this line of code is the switching of main -> nio boss thread
             (new Runnable() {
                 @Override
                 public void run() {
                     register0(promise);
                 }
             });
         } catch (Throwable t) {
             // Logging...
             closeForcibly();
             ();
             safeSetFailure(promise, t);
         }
     }
 }

#register0

private void register0(ChannelPromise promise) {
     try {
         if (!() || !ensureOpen(promise)) {
             return;
         }
         boolean firstRegistration = neverRegistered;
         // 1.2.1 The native nio channel is bound to the selector. Note that the selector is not registered at this time. The attachment is NioServerSocketChannel
         doRegister();
         neverRegistered = false;
         registered = true;

         // 1.2.2 Execute the initChannel of the NioServerSocketChannel initializer
         ();

         // Callback 3.2 #doBind0
         safeSetSuccess(promise);
         ();
        
         // The corresponding server socket channel has not been bound yet, isActive is false
         if (isActive()) {
             if (firstRegistration) {
                 ();
             } else if (config().isAutoRead()) {
                 beginRead();
             }
         }
     } catch (Throwable t) {
         // Close the channel directly to avoid FD leak.
         closeForcibly();
         ();
         safeSetFailure(promise, t);
     }
 }

Key Code#initChannel

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
     if ((ctx)) { // Guard against re-entrance.
         try {
             // 1.2.2.1 Perform initialization
             initChannel((C) ());
         } catch (Throwable cause) {
             exceptionCaught(ctx, cause);
         } finally {
             // 1.2.2.2 Remove the initializer
             ChannelPipeline pipeline = ();
             if ((this) != null) {
                 (this);
             }
         }
         return true;
     }
     return false;
 }

Key Code#doBind0

// 3.1 or 3.2 execute doBind0
 private static void doBind0(
         final ChannelFuture regFuture, final Channel channel,
         final SocketAddress localAddress, final ChannelPromise promise) {

     ().execute(new Runnable() {
         @Override
         public void run() {
             if (()) {
                 (localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
             } else {
                 (());
             }
         }
     });
 }

Key Code#bind

public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
     assertEventLoop();

     if (!() || !ensureOpen(promise)) {
         return;
     }

     if ((config().getOption(ChannelOption.SO_BROADCAST)) &&
         localAddress instance of InetSocketAddress &&
         !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
         !() && !()) {
         // Logging...
     }

     boolean wasActive = isActive();
     try {
         // 3.3 Perform port binding
         doBind(localAddress);
     } catch (Throwable t) {
         safeSetFailure(promise, t);
         closeIfClosed();
         return;
     }

     if (!wasActive && isActive()) {
         invokeLater(new Runnable() {
             @Override
             public void run() {
                 // 3.4 Trigger active event
                 ();
             }
         });
     }

     safeSetSuccess(promise);
 }

Key Code#doBind

protected void doBind(SocketAddress localAddress) throws Exception {
    if (() >= 7) {
        javaChannel().bind(localAddress, ());
    } else {
        javaChannel().socket().bind(localAddress, ());
    }
}

Key Code#channelActive

public void channelActive(ChannelHandlerContext ctx) {
     ();
 // Trigger read (the read on NioServerSocketChannel is not reading data, but just to trigger event registration of channel)
     readIfIsAutoRead();
 }

Key Code#doBeginRead

protected void doBeginRead() throws Exception {
     // () or () was called
     final SelectionKey selectionKey = ;
     if (!()) {
         return;
     }

     readPending = true;

     final int interestOps = ();
     // The value of readInterestOp is 16, which is initialized when NioServerSocketChannel is created, which means you are concerned about the accept event
     if ((interestOps & readInterestOp) == 0) {
         (interestOps | readInterestOp);
     }
 }

NioEventLoop Analysis

The NioEventLoop thread not only needs to handle IO events, but also needs to handle Tasks (including normal tasks and timed tasks).

Submit task code#execute

public void execute(Runnable task) {
     if (task == null) {
         throw new NullPointerException("task");
     }

     boolean inEventLoop = inEventLoop();
     // Add a task, where the queue uses the mpsc lock-free queue provided by jctools
     addTask(task);
     if (!inEventLoop) {
         // InEventLoop If false means execute is called by other threads, that is, the first call is called. At this time, the first task needs to be submitted to eventLoop, start the dead loop, and execute it to the following doStartThread
         startThread();
         if (isShutdown()) {
             // If you have shutdown, do rejection logic, the code is slightly...
         }
     }

     if (!addTaskWakesUp && wakesUpForTask(task)) {
         // If the thread is blocked due to IO select, the thread of the added task needs to be responsible for waking up the NioEventLoop thread
         wakeup(inEventLoop);
     }
 }

Wake up select blocking thread#wakeup

@Override
protected void wakeup(boolean inEventLoop) {
    if (!inEventLoop && (false, true)) {
        ();
    }
}

Start EventLoop main loop#doStartThread

private void doStartThread() {
     assert thread == null;
     (new Runnable() {
         @Override
         public void run() {
             // Save the current thread of the thread pool in a member variable for subsequent use
             thread = ();
             if (interrupted) {
                 ();
             }

             boolean success = false;
             updateLastExecutionTime();
             try {
                 // Call the run method of the external class SingleThreadEventExecutor to enter the dead loop. See below for the run method
                 ();
                 success = true;
             } catch (Throwable t) {
                 ("Unexpected exception from an event executor: ", t);
             } finally {
 // Clean up, the code is slightly...
             }
         }
     });
 }

#runThe main task is to execute a dead loop, constantly check whether there are new tasks and IO events

protected void run() {
     for (;;) {
         try {
             try {
                 // The logic of calculateStrategy is as follows:
                 // If there is a task, selectNow will be executed once, clearing the last wakeup result. Regardless of whether there are or not, the switch will be skipped.
                 // There will be no task, it will match to see if it should be blocked
                 switch ((selectNowSupplier, hasTasks())) {
                     case:
                         continue;

                     case SelectStrategy.BUSY_WAIT:

                     case:
                         // Because both the IO thread and the commit task thread may perform wakeup, and wakeup is a relatively expensive operation, an atomic Boolean object wakenUp is used. When it is true, it means that it is woken up by the current thread.
                         // Perform select blocking and set the wake-up state to false
                         boolean oldWakenUp = (false);
                        
                         // If in this position, non-EventLoop threads set wakenUp to true and wakeup
                         // The following select method will not block
                         // After the runAllTasks process is completed, will the newly added tasks be executed in time at this stage of recirculation?
                         // Because oldWakenUp is true, the following select method will block until the timeout
                         // Only then can it be executed, so that the select method is unnecessary to block
                         select(oldWakenUp);

                         if (()) {
                             ();
                         }
                     default:
                 }
             } catch (IOException e) {
                 rebuildSelector0();
                 handleLoopException(e);
                 continue;
             }

             cancelledKeys = 0;
             needsToSelectAgain = false;
             // ioRatio defaults to 50
             final int ioRatio = ;
             if (ioRatio == 100) {
                 try {
                     processSelectedKeys();
                 } finally {
                     // When ioRatio is 100, all non-IO tasks are always run
                     runAllTasks();
                 }
             } else {
                 final long ioStartTime = ();
                 try {
                     processSelectedKeys();
                 } finally {
                     // Recording the time-consuming process of io event
                     final long ioTime = () - ioStartTime;
                     // Run non-IO tasks and exit runAllTasks once timed out
                     runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                 }
             }
         } catch (Throwable t) {
             handleLoopException(t);
         }
         try {
             if (isShuttingDown()) {
                 closeAll();
                 if (confirmShutdown()) {
                     return;
                 }
             }
         } catch (Throwable t) {
             handleLoopException(t);
         }
     }
 }

Notice

One puzzled thing here is wakeup, which can be called by the thread that submits the task (it is easier to understand) or by the EventLoop thread (it is more difficult to understand). Here you need to know the effect of the wakeup method:

  • Called by a non-EventLoop thread, it will wake up the EventLoop thread currently blocking when executing the select
  • Called by EventLoop itself, this wakeup will cancel the next select operation

#select

private void select(boolean oldWakenUp) throws IOException {
     Selector selector = ;
     try {
         int selectCnt = 0;
         long currentTimeNanos = ();
         // Calculate the waiting time
         // * No scheduledTask, timeout is 1s
         // * There is scheduledTask, the timeout is `Next time of execution time of the time of the execution of the task - current time`
         long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

         for (;;) {
             long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
             // If timeout, exit the loop
             if (timeoutMillis <= 0) {
                 if (selectCnt == 0) {
                     ();
                     selectCnt = 1;
                 }
                 break;
             }

             // If the task exits the loop during the period, if there is no such judgment, the task will not be executed until the next time the select timeout
             // (false, true) It is to make non-NioEventLoop no longer need to execute wakeup
             if (hasTasks() && (false, true)) {
                 ();
                 selectCnt = 1;
                 break;
             }

             // select limited time blocking
             // Note that there is a bug in nio. When the bug occurs, the select method will not block even if there is no time to occur, resulting in constant polling, and the CPU occupies 100%
             int selectedKeys = (timeoutMillis);
             // Count is added 1
             selectCnt ++;

             // After waking up, if there is an IO event, or it is awakened by a non-EventLoop thread, or there is a task, exit the loop
             if (selectedKeys != 0 || oldWakenUp || () || hasTasks() || hasScheduledTasks()) {
                 break;
             }
             if (()) {
                // The thread is interrupted and exits the loop
                 // Logging
                 selectCnt = 1;
                 break;
             }

             long time = ();
             if (time - (timeoutMillis) >= currentTimeNanos) {
                 // If the timeout, the count is reset to 1, the next loop will break
                 selectCnt = 1;
             }
             // Count exceeds the threshold, specified by , default 512
             // This is to solve the nio null polling bug
             else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                     selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                 // Rebuild the selector
                 selector = selectRebuildSelector(selectCnt);
                 selectCnt = 1;
                 break;
             }

             currentTimeNanos = time;
         }

         if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
             // Logging
         }
     } catch (CancelledKeyException e) {
         // Logging
     }
 }

Process keys#processSelectedKeys

private void processSelectedKeys() {
     if (selectedKeys != null) {
         // Replace the ready event set in the Selector implementation class with SelectedSelectionKeySet via reflection
         // SelectedSelectionKeySet is implemented as an array, which can improve traversal performance (originally HashSet)
         processSelectedKeysOptimized();
     } else {
         processSelectedKeysPlain(());
     }
 }

#processSelectedKey

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
     final unsafe = ();
     // This key is invalid when the key is cancelled or closed
     if (!()) {
         // Handle if invalid...
         return;
     }

     try {
         int readyOps = ();
         // Connection event
         if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
             int ops = ();
             ops &= ~SelectionKey.OP_CONNECT;
             (ops);

             ();
         }

         // Writable events
         if ((readyOps & SelectionKey.OP_WRITE) != 0) {
             ().forceFlush();
         }

         // Readable or accessible events
         if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
             // If it is available, please access #read
             // If readable #read
             ();
         }
     } catch (CancelledKeyException ignored) {
         (());
     }
 }

accept analysis

The following code in nio, the process in netty

//1 Block until the event occurs
 ();

 Iterator<SelectionKey> iter = ().iterator();
 while (()) {
     //2 Get an event
     SelectionKey key = ();
    
     //3 If it is an accept event
     if (()) {
        
         //4 Execute accept
         SocketChannel channel = ();
         (false);
        
         //5 Follow the read event
         (selector, SelectionKey.OP_READ);
     }
     // ...
 }

Let’s take a look at the access event processing (accept)

#read

public void read() {
     assert eventLoop().inEventLoop();
     final ChannelConfig config = config();
     final ChannelPipeline pipeline = pipeline();
     final allocHandle = unsafe().recvBufAllocHandle();
     (config);

     boolean closed = false;
     Throwable exception = null;
     try {
         try {
             do {
 // DoReadMessages execute accept and create NioSocketChannel as a message to put it into readBuf
                 // readBuf is an ArrayList used to cache messages
                 int localRead = doReadMessages(readBuf);
                 if (localRead == 0) {
                     break;
                 }
                 if (localRead < 0) {
                     closed = true;
                     break;
                 }
 // localRead is 1, just a message, that is, a client connection is received
                 (localRead);
             } while (());
         } catch (Throwable t) {
             exception = t;
         }

         int size = ();
         for (int i = 0; i < size; i ++) {
             readPending = false;
             // Trigger the read event and let the handler on the pipeline handle it, which is the process at this time
             // #channelRead
             ((i));
         }
         ();
         ();
         ();

         if (exception != null) {
             closed = closeOnReadError(exception);

             (exception);
         }

         if (closed) {
             inputShutdown = true;
             if (isOpen()) {
                 close(voidPromise());
             }
         }
     } finally {
         if (!readPending && !()) {
             removeReadOp();
         }
     }
 }

Key Code#channelRead

public void channelRead(ChannelHandlerContext ctx, Object msg) {
     // The msg at this time is NioSocketChannel
     final Channel child = (Channel) msg;

     // NioSocketChannel add childHandler, that is, initializer
     ().addLast(childHandler);

     // Set options
     setChannelOptions(child, childOptions, logger);

     for (Entry<AttributeKey<?>, Object> e: childAttrs) {
         ((AttributeKey<Object>) ()).set(());
     }

     try {
         // Register NioSocketChannel to the nio worker thread, and the next processing will be transferred to the nio worker thread
         (child).addListener(new ChannelFutureListener() {
             @Override
             public void operationComplete(ChannelFuture future) throws Exception {
                 if (!()) {
                     forceClose(child, ());
                 }
             }
         });
     } catch (Throwable t) {
         forceClose(child, t);
     }
 }

Back to the familiar#registermethod

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
     // Some checks, a little...

      = eventLoop;

     if (()) {
         register0(promise);
     } else {
         try {
             // The fact that this line of code is the switching of nio boss -> nio worker thread
             (new Runnable() {
                 @Override
                 public void run() {
                     register0(promise);
                 }
             });
         } catch (Throwable t) {
             // Logging...
             closeForcibly();
             ();
             safeSetFailure(promise, t);
         }
     }
 }

#register0

private void register0(ChannelPromise promise) {
     try {
         if (!() || !ensureOpen(promise)) {
             return;
         }
         boolean firstRegistration = neverRegistered;
         doRegister();
         neverRegistered = false;
         registered = true;
 : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : :
         // Execute the initializer. Before execution, there is only head -> initializer -> tail in the pipeline before execution
         ();
         // After execution, it is head -> logging handler -> my handler -> tail

         safeSetSuccess(promise);
         ();
        
         if (isActive()) {
             if (firstRegistration) {
                 // Trigger active event on pipeline
                 ();
             } else if (config().isAutoRead()) {
                 beginRead();
             }
         }
     } catch (Throwable t) {
         closeForcibly();
         ();
         safeSetFailure(promise, t);
     }
 }

Back to familiar code#channelActive

public void channelActive(ChannelHandlerContext ctx) {
     ();
 // Trigger read (NioSocketChannel read here is just to trigger the channel event registration, and data reading has not been involved)
     readIfIsAutoRead();
 }

#doBeginRead

protected void doBeginRead() throws Exception {
     // () or () was called
     final SelectionKey selectionKey = ;
     if (!()) {
         return;
     }

     readPending = true;
 // At this time interestOps is 0
     final int interestOps = ();
     if ((interestOps & readInterestOp) == 0) {
         // Follow the read event
         (interestOps | readInterestOp);
     }
 }

read analysis

Let's look at the readable events#read, Note that the sent data may not be read in one go, so it will trigger multiple nio read events, pipeline reads will be triggered multiple times within one event, and pipeline reads will be triggered once once

public final void read() {
     final ChannelConfig config = config();
     if (shouldBreakReadReady(config)) {
         clearReadPending();
         return;
     }
     final ChannelPipeline pipeline = pipeline();
     // Decide the implementation of allocator
     final ByteBufAllocator allocator = ();
     // Used to allocate byteBuf to determine the size of a single read
     final allocHandle = recvBufAllocHandle();
     (config);

     ByteBuf byteBuf = null;
     boolean close = false;
     try {
         do {
             byteBuf = (allocator);
             // Read
             (doReadBytes(byteBuf));
             if (() <= 0) {
                 ();
                 byteBuf = null;
                 close = () < 0;
                 if (close) {
                     readPending = false;
                 }
                 break;
             }

             (1);
             readPending = false;
             // Trigger the read event and let the handler on the pipeline handle it. At this time, it is the handler on the NioSocketChannel
             (byteBuf);
             byteBuf = null;
         }
         // Do you want to continue the loop
         while (());

         ();
         // Trigger the read complete event
         ();

         if (close) {
             closeOnRead(pipeline);
         }
     } catch (Throwable t) {
         handleReadException(pipeline, byteBuf, t, close, allocHandle);
     } finally {
         if (!readPending && !()) {
             removeReadOp();
         }
     }
 }

#continueReading()

public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
     Return
            // Generally true
            () &&
            // respectMaybeMoreData defaults to true
            // The logic of maybeMoreDataSupplier is to return true if the expected read byte is equal to the actual read byte
            (!respectMaybeMoreData || ()) &&
            // Less than the maximum number of times, maxMessagePerRead defaults to 16
            totalMessages < maxMessagePerRead &&
            // Actually read the data
            totalBytesRead > 0;
 }

Interview Question Column

Java Interview Question ColumnIt is online, welcome to visit.

  • If you don’t know how to write your resume, you don’t know how to package your resume project;
  • If you don’t know if there is something in your resume?
  • If you don’t know how to answer some comprehensive questions;

Then you can send me a private message and I will do my best to help you.