Location>code7788 >text

Netty source code—8. Codec principle

Popularity:677 ℃/2025-03-27 21:54:37

Outline

1. Read data entry

2. Unpacking principle

Decoding steps

4. Summary of the decoder abstraction process

Common out-of-the-box decoders

The general steps of the () method

Coding steps

()Write queue

()Refresh the write queue

10. How to turn an object into a byte stream and write it to the underlying layer of unsafe

 

1. Read data entry

When the Reactor thread NioEventLoop of the client Channel's Reactor thread detects a read event, the read() method of NioByteUnsafe will be executed. This method will call the doReadBytes() method to read the data of the TCP buffer into a ByteBuf object allocated by ByteBufAllocator, and then use the() method to bring this ByteBuf object to propagate the ChannelRead event downward.

 

During the propagation process, the channelRead() method of the pipeline's head node will first come. This method will continue to propagate the ChannelRead event downward with the ByteBuf object, for example, it will come to the channelRead() method of the ByteToMessageDecoder node.

 

Note: The unsafe variable of the server Channel is a NioMessageUnsafe object, and the unsafe variable of the client Channel is a NioByteUnsafe object.

//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
 public final class NioEventLoop extends SingleThreadEventLoop {
 Selector selector;
 private SelectedSelectionKeySet selectedKeys;
 private boolean needsToSelectAgain;
 private int cancelledKeys;
       ...
 @Override
 protected void run() {
 for (;;) {
                   ...
 //1. Call the select() method to perform an event poll
 select((false));
 if (()) {
                 ();
                   }
                   ...
                  //2. Handle Channels that generate IO events
 needsToSelectAgain = false;
 processSelectedKeys();
                   ...
                    //3. Execute the task of putting the external thread into the TaskQueue
 runAllTasks(ioTime * (100 -ioRatio) / ioRatio);
           }
     }
    
 private void processSelectedKeys() {
 if (selectedKeys != null) {
 //() will return an array
 processSelectedKeysOptimized(());
           } else {
 processSelectedKeysPlain(());
           }
     }
    
 private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
 for (int i = 0;; i ++) {
                  //1.First remove the IO event
 final SelectionKey k = selectedKeys[i];
 if (k == null) {
 break;
                   }
 selectedKeys[i] = null;//Help GC
                    //2. Then get the corresponding Channel and process the Channel
 By default, this a is NioChannel, that is, the Channel encapsulated by Netty when the server is started
 final Object a = ();
             if (a instanceof AbstractNioChannel) {
 // Handling of network events
 processSelectedKey(k, (AbstractNioChannel) a);
                 } else {
                                    //NioTask is mainly used for some tasks executed when a SelectableChannel is registered with the Selector
                     NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
 processSelectedKey(k, task);
                   }
                    //3. Finally determine whether another poll should be conducted
 if (needsToSelectAgain) {
 for (;;) {
                       i++;
 if (selectedKeys[i] == null) {
 break;
                     }
 selectedKeys[i] = null;
                               }
 selectAgain();
 //() will return an array
 selectedKeys = ();
               i = -1;
                   }
           }
     }
    
 private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
 final  unsafe = ();
             ...
 try {
 int readyOps = ();
                   ...
 //The new connection is ready to be accessed or the existing connection has data to read
               if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
 //If the new connection is ready to be accessed, call the read() method of NioMessageUnsafe
 //If the existing connection has data to read, the read() method of NioByteUnsafe is executed
                 ();
                   if (!()) {
 return;
                               }
                   }
           } catch (CancelledKeyException ignored) {
 (());
           }
     }
       ...
 }

 public abstract class AbstractNioByteChannel extends AbstractNioChannel {
       ...
 protected class NioByteUnsafe extends AbstractNioUnsafe {
             ...
 @Override
 public final void read() {
 final ChannelConfig config = config();
 final ChannelPipeline pipeline = pipeline();
 //Create ByteBuf Allocation
 final ByteBufAllocator allocator allocator = ();
               final  allocHandle = recvBufAllocHandle();
 (config);

 ByteBuf byteBuf = null;
                                                                                    
                                    //1. Assign a ByteBuf
 byteBuf = (allocator);
                                    //2. Read the data into the allocated ByteBuf
 (doReadBytes(byteBuf));
                     if (() <= 0) {
 ();
 byteBuf = null;
 close = () < 0;
 break;
                               }
                                                                                    
                            //3. Call the fireChannelRead() method of DefaultChannelPipeline to propagate events from the Head node
 (byteBuf);
 byteBuf = null;
                 } while (());

             ();
 //4. Call the fireChannelReadComplete() method of DefaultChannelPipeline to propagate events from the head node
             ();
                   ...
           }
     }
 }

NioByteUnsafe will mainly deal with the following:

1. Get the memory allocator ByteBufAllocator through the ChannelConfig of the client Channel, and then use the memory allocator to allocate a ByteBuf object.

2. Read the data of the TCP buffer in the client channel to the ByteBuf object

3. After reading the data, call the fireChannelReadComplete() method of DefaultChannelPipeline to propagate the ChannelRead event in the entire ChannelPipeline starting from the HeadContext node.

 

2. Unpacking principle

1. The principle of unpacking without Netty

Continuously read data from the TCP buffer, and determine whether it is a complete data packet every time you read it. If the currently read data is not enough to splice into a complete data packet, keep the data and continue reading from the TCP buffer. If the currently read data plus the read data is enough to form a complete data packet, the spliced ​​data packet will be passed to the service, while the excess data will be retained.

 

2.Netty's unpacking principle

There will be a byte container inside the Netty unpacking base class, which will be added to the byte container every time the data is read. Then try to unpack the accumulated byte data and break it into a complete business data packet. This unpacking base class is called ByteToMessageDecoder.

 

Decoding steps

(1) Overall steps of decoding

(2) First accumulate byte stream

(3) Then call the decode() method of the subclass for parsing

(4) Then clean the accumulated byte container

(5) Finally, the parsed ByteBuf is propagated downward

 

(1) Overall introduction to decoding

1. Accumulated byte stream

Netty will cumulate all the read byte streams to the byte container through a ByteBuf byte container cumulation.

 

2. Call the decode() method of the subclass for parsing

The byte stream in the accumulated byte container is parsed through the decode() method of the subclass.

 

3. Propagate the parsed ByteBuf downwards

If the decode() method of the subclass can be parsed into a ByteBuf object, the ByteBuf object is propagated downward.

public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
       ...
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 //Decode only based on ByteBuf object
 if (msg instanceof ByteBuf) {
                 //1.Accumulated byte stream
 //2. Call the decode() method of the subclass for parsing
                 //3. Clean up the accumulated byte container
                 //4. Propagate the parsed ByteBuf downwards
           } else {
 (msg);
           }
     }
       ...
 }

(2) First accumulate byte stream

If there is no data in the current byte container, then point the pointer of the byte container to the newly read data. If there is data in the current byte container, then the cumulate() method of the accumulator is called to accumulate the data into the byte container.

public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
 ByteBuf cumulation;//Byte container
 private Cumulator cumulator = MERGE_CUMULATOR;//Default accumulator
 private boolean decodeWasNull;
 private boolean first;
 private int discardAfterReads = 16;
 private int numReads;
       ...
 //Cumulate ByteBufs by merge them into one ByteBuf's, using memory copies.
 public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
 //The accumulator method will pass in a byte container cumulation
 @Override
 public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
 ByteBuf buffer;//A large byte container used to copy the byte container passed in
               if (() > () - () || () > 1) {
                                 buffer = expandCumulation(alloc, cumulation, ());
                 } else {
                 buffer = cumulation;
                   }
 (in);//Accumulate the current data into the copy's byte container
             ();
 return buffer;//Return the byte container of copy
           }
     };

 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 if (msg instanceof ByteBuf) {
 CodecOutputList out= ();
 try {
                                //1. Accumulated byte stream
 ByteBuf data = (ByteBuf) msg;
                       first = cumulation == null;
 if (first) {//If there is no data in the current byte container
 //Point the pointer of the byte container to the newly read data
 cumulation = data;
                         } else {//If there is data in the current byte container
 //Then call the cumulate() method of the accumulator to accumulate data into the byte container
 cumulation = ((), cumulation, data);
                               }
             
                                        //2. Pass the data in the byte container to the service unpacker for unpacking
 //Call call callDecode() method to unpack the data
 callDecode(ctx, cumulation, out);
                 } catch (DecoderException e) {
 throw e;
                 } catch (Throwable t) {
                 throw new DecoderException(t);
                 }  finally {
                                      //3. Clean the byte container
                   if (cumulation != null && !()) {
 numReads = 0;
 ();
 cumulation = null;
                       } else if (++ numReads >= discardAfterReads) {
 numReads = 0;
 discardSomeReadBytes();
                               }
          
                                //4. Propagate the parsed ByteBuf downwards
 int size = ();
 decodeWasNull = !();
 fireChannelRead(ctx, out, size);
                 ();
                   }
           } else {
 (msg);
           }
     }
    
   static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) {
 ByteBuf oldCumulation = cumulation;
 cumulation = (() + readable);
 (oldCumulation);
         ();
 return cumulation;
     }
       ...
 }

(3) Then call the decode() method of the subclass for parsing

After the data is accumulated into the byte container, the callDecode() method will be called. This method will try to split the data of the byte container into business data packets and put the business data packets into the business data container out.

 

Netty's support for various user protocols is reflected in the abstract method decode() of ByteToMessageDecoder. The incoming parameter of the decode() method is all data in and business data package out currently read. All unpackers need to implement the decoed() method of ByteToMessageDecoder.

 

After the unpacker completes unpacking: If a complete data packet is not opened, if the unpacker does not read any data, the loop will be jumped out, otherwise unpacking will continue. If a complete packet has been detached, but the unpacker has not read any data at this time, an exception DecodeException is thrown.

public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
       ...
 //Called once data should be decoded from the given ByteBuf.
 //This method will call #decode(ChannelHandlerContext, ByteBuf, List) as long as decoding should take place.
 //@param ctx, the ChannelHandlerContext which this ByteToMessageDecoder belongs to
 //@param in, the ByteBuf from which to read data
 //@param out, the List to which decoded messages should be added
 protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
 try {
 while (()) {
 int outSize = ();
 if (outSize > 0) {
 fireChannelRead(ctx, out, outSize);
                     ();
                     //Check if this handler was removed before continuing with decoding.
                     //If it was removed, it is not safe to continue to operate on the buffer.
 if (()) {
 break;
                     }
 outSize = 0;
                               }

 int oldInputLength = ();
 //Calling the decode() method implemented by the unpacker
 decode(ctx, in, out);
               
 //After the unpacker is completed once:
           
                                      //Check if this handler was removed before continuing the loop.
                                    //If it was removed, it is not safe to continue to operate on the buffer.
                   if (()) {
 break;
                               }
                                  //outSize == () means that a complete data packet has not been disassembled
                 if (outSize == ()) {
 if (oldInputLength == ()) {
                                                //At this time the unpacker does not read any data, it will jump out of the loop
 break;
                     } else {
                                                                                                                                                                          
                         continue;
                     }
                               }
 //Execution here indicates that a complete data packet has been dismantled
 if (oldInputLength == ()) {
                                    //At this time the unpacker did not read any data, so an exception was thrown DecodeException
                     throw new DecoderException((getClass()) + ".decode() did not read anything but decoded a message.");
                               }
 if (isSingleDecode()) {
 break;
                               }
                   }
           } catch (DecoderException e) {
 throw e;
           } catch (Throwable cause) {
 throw new DecoderException(cause);
           }
     }
    
 //Decode the from one ByteBuf to an other.
 //This method will be called till either the input ByteBuf has nothing to read
 //when return from this method or till nothing was read from the input ByteBuf.
 //@param ctx, the ChannelHandlerContext which this ByteToMessageDecoder belongs to
 //@param in, the ByteBuf from which to read data
 //@param out, the List to which decoded messages should be added
 //@throws Exception, is thrown if an error acour
 protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
       ...
 }

(4) Then clean the byte container

After the unpacker is finished unpacking, the data is only taken from the byte container, but this part of the space is still retained for the byte container. Each time the byte container accumulates byte data, it appends the byte data to the tail. If the byte container is not cleaned, it may be OOM over time.

 

Under normal circumstances, every time the data is read, the ByteToMessageDecoder decoder will clean the byte container in the channelReadComplete() method. But if the sending side sends data too fast, the decoder's channelReadComplete() method may be called once for a long time.

 

Therefore, in order to prevent the sending side from sending data too quickly, ByteToMessageDecoder will clean up the byte container after reading the data once and completing business unpacking. If the byte container is currently no data to be read, the byte container's release() method is called to release the byte container. If the byte container still has data to read and has read 16 consecutive times and still has unpacked data, then the compression process will be performed.

public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
 private int discardAfterReads = 16;
       ...
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 if (msg instanceof ByteBuf) {
 CodecOutputList out = ();
 try {
                                //1. Accumulated byte stream
 ByteBuf data = (ByteBuf) msg;
                       first = cumulation == null;
 if (first) {//If there is no data in the current byte container
 //Point the pointer of the byte container to the newly read data
 cumulation = data;
                         } else {//If there is data in the current byte container
 //Then call the cumulate() method of the accumulator to accumulate data into the byte container
 cumulation = ((), cumulation, data);
                               }
             
                                        //2. Pass the data in the byte container to the service unpacker for unpacking
 //Call call callDecode() method to unpack the data
 callDecode(ctx, cumulation, out);
                 } catch (DecoderException e) {
 throw e;
                 } catch (Throwable t) {
                 throw new DecoderException(t);
                 }  finally {
                                      //3. Clean the byte container
                   if (cumulation != null && !()) {
 // If the byte container is currently no data to be read, set numReads to 0 and release the byte container cumulation
 numReads = 0;
 ();
 cumulation = null;
                       } else if (++ numReads >= discardAfterReads) {//numReads >= 16
                                                                                                                                                                          
 //At this time, there is still data in the byte container that has not been unpacked by the service unpacker, so you can do a compression process;
 numReads = 0;
 discardSomeReadBytes();
                               }
          
                                //4. Propagate the parsed ByteBuf downwards
 int size = ();
 decodeWasNull = !();
 fireChannelRead(ctx, out, size);
                       ();
                   }
           } else {
 (msg);
           }
     }
    
 //Get numElements out of the CodecOutputList and forward these through the pipeline.
   static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
 //Travel the business data container
 for(int i = 0; i < numElements; i ++) {
 //Pass the complete business data packets ByteBuf to the subsequent ChannelHandler for processing
 (i));
           }
     }
       ...
 }

(5) Finally, the parsed ByteBuf is propagated downward

That is, call the fireChannelRead() method, traverse the service data package container, and pass the complete service data packets ByteBuf to the subsequent ChannelHandler for processing.

 

4. Summary of the decoder abstraction process

The decoding process is implemented through an abstract decoder called ByteToMessageDecoder. The decoding process implemented by ByteToMessageDecoder is divided into the following four steps.

 

Step 1: Accumulate byte streams

That is, the current read byte stream is accumulated into a byte container.

 

Step 2: Call the decode() method of the subclass for parsing

ByteToMessageDecoder's decode() method is an abstract method. Different types of decoders will have their own decode() method logic. When the decode() method is called, two key parameters will be passed in: one is the ByteBuf object representing the currently accumulated byte stream, and the other is the List list to store the successfully decoded business data packets.

 

Step 3: Clean the byte container

In order to prevent the sending side from sending data too quickly, ByteToMessageDecoder will clean up the byte container after reading the data once and completing business unpacking.

 

Step 4: Propagate the decoded service data packets

If there are parsed business data packets in the List list, then it will be propagated downwards through the pipeline event propagation mechanism.

 

Common out-of-the-box decoders

(1) Based on fixed-length decoder

(2) Based on line separator decoder

(3) Based on delimiter decoder

(4) Based on the length domain decoder

 

(1) Based on fixed-length decoder

Determine whether the readable bytes of the current byte container is less than the fixed length.

//A decoder that splits the received ByteBufs by the fixed number of bytes. 
//For example, if you received the following four fragmented packets:
//+---+----+------+----+
//| A | BC | DEFG | HI |
//+---+----+------+----+
//A FixedLengthFrameDecoder (3) will decode them into the following three packets with the fixed length:
//+-----+-----+-----+
//| ABC | DEF | GHI |
//+-----+-----+-----+
public class FixedLengthFrameDecoder extends ByteToMessageDecoder {
    private final int frameLength;
    //Creates a new instance.
    public FixedLengthFrameDecoder(int frameLength) {
        if (frameLength <= 0) {
            throw new IllegalArgumentException("frameLength must be a positive integer: " + frameLength);
        }
         = frameLength;
    }

    @Override
    protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        Object decoded = decode(ctx, in);
        if (decoded != null) {
            (decoded);
        }
    }

    //Create a frame out of the ByteBuf and return it.
    //@param   ctx,the ChannelHandlerContext which this ByteToMessageDecoder belongs to
    //@param   in,the ByteBuf from which to read data
    //@return  frame,the ByteBuf which represent the frame or null if no frame could be created.
    protected Object decode(@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception { 
        if (() < frameLength) {
            return null;
        } else {
            return (frameLength);
        }
    }
}

(2) Based on line separator decoder

The line delimiter based unpacker can handle two types of line delimiters, \n and \r\n, and its processing logic is divided into non-discard mode and discard mode, finding line delimiters and not found line delimiters.

 

1. Find line separators in non-discard mode

First create a new frame, that is, the ByteBuf frame. Then calculate the length of the packet to be decoded and the length of the separator. Then determine whether the length that needs to be unpacked is greater than the maximum length allowed by the unpacker. If it is greater, discard this data and return null. Then take out a complete packet and if stripDelimiter is set to false in the constructor, the data contains the delimiter.

 

2. No line separator found during non-discard mode

First, obtain the readable byte number of bytes of the current byte container, and then determine whether it exceeds the maximum allowable length. If the maximum length is not exceeded, null is returned directly, and the data of the byte container has not changed. If the maximum length has been exceeded, enter discarding mode and set discarding to true.

 

3. Find the line separator in discard mode

In this case, all data before the delimiter needs to be discarded. After calculating the length of the delimiter, all the data before the delimiter will be discarded by moving the readerIndex pointer of the byte container. Of course, the discarded data also includes the delimiter. After such a discard, there may be normal data packets later. So, discarding is set to false to enter non-discard mode, so that the next time the packet is decoded, the normal decoding process will be entered.

 

4. No line separator found in discard mode

Since it is still in discard mode, the line separator not found means that the current complete data packet has not been discarded yet, so the current data continues to be discarded, moving the readerIndex pointer of the byte container.

//A decoder that splits the received {@link ByteBuf}s on line endings.
 //Both "\n" and "\r\n" are handled.
 //For a more general delimiter-based decoder, see DelimiterBasedFrameDecoder.
 public class LineBasedFrameDecoder extends ByteToMessageDecoder {
 //Maximum length of a frame we're willing to decode.
 private final int maxLength;
 //Whether or not to throw an exception as soon as we exceed maxLength.
 private final boolean failFast;
 private final boolean stripDelimiter;
 //True if we're discarding input because we're already over maxLength.
 private boolean discarding;
 private int discardedBytes;

 public LineBasedFrameDecoder(final int maxLength) {
 this(maxLength, true, false);
     }

 public LineBasedFrameDecoder(final int maxLength, final boolean stripDelimiter, final boolean failFast) {
 = maxLength;
          = failFast;
 = stripDelimiter;
     }

 @Override
 protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
           Object decoded = decode(ctx, in);
 if (decoded != null) {
 (decoded);
           }
     }

     //Create a frame out of the ByteBuf and return it.
 //@param ctx, the ChannelHandlerContext which this ByteToMessageDecoder belongs to
 //@param   buffer, the ByteBuf from which to read data
   //@return  frame, the ByteBuf which represents the frame or null if no frame could be created.
 protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
 final int eol = findEndOfLine(buffer);
 if (!discarding) {//Non-discard mode
 if (eol >= 0) {//Finding the line separator
 // Create a new frame, that is, ByteBuf frame
 final ByteBuf frame;
 // Calculate the length of the packet to be decoded
                   final int length = eol - ();
 // Calculate the length of the separator
                       final int delimLength = (eol) == '\r'? 2 : 1;
          
 //Judge whether the length of unpacking needs to be unpacked is greater than the maximum allowable length of the unpacker
 if (length > maxLength) {
                                    //If it is greater than, discard this data and return null
 (eol + delimLength);
                     fail(ctx, length);
 return null;
                               }
          
 //Fetch a complete packet
 if (stripDelimiter) {
 frame = (length);
 (delimLength);
                         } else {
                                      //If stripDelimiter is set to false in the constructor, the data contains the delimiter
 frame = (length + delimLength);
                               }
 return frame;
                 } else {//No line separator found
                                //First get the number of readable bytes of the current byte container
 final int length = ();
 // Then determine whether the maximum allowable length is exceeded
 if (length > maxLength) {
                                    //If the maximum length has been exceeded, enter discarding mode and set discarding to true
 discardedBytes = length;
 (());
 discarding = true;
 if (failFast) {
                         fail(ctx, "over " + discardedBytes);
                     }
                               }
                      //If the maximum length is not exceeded, null will be returned directly, and the data of the byte container will not change
 return null;
                   }
           } else {//Drop mode
 if (eol >= 0) {//Finding the line separator
                     final int length = discardedBytes + eol - ();
                            //Calculate the length of the separator
                       final int delimLength = (eol) == '\r'? 2 : 1;
 //Discard all the data before the delimiter and move the readerIndex pointer of the byte container
 (eol + delimLength);
 discardedBytes = 0;
 After such a discard, there may be normal data packets later
                          //So set discarding to false so that the next time the packet is decoded, the normal decoding process will be entered.
 discarding = false;
 if (!failFast) {
                     fail(ctx, length);
                               }
                 } else {//No line separator found
 //The current drop mode is still in discard mode, and the line delimiter is not found means that the current complete packet has not been discarded yet
                                //So the current data continues to be discarded, move the readerIndex pointer of the byte container
 discardedBytes += ();
 (());
                   }
 return null;
           }
     }

 private void fail(final ChannelHandlerContext ctx, int length) {
 fail(ctx, (length));
     }

 private void fail(final ChannelHandlerContext ctx, String length) {
 (new TooLongFrameException("frame length (" + length + ") exceeds the allowed maximum (" + maxLength + ')'));
     }

     //Returns the index in the buffer of the end of line found.
 //Returns -1 if no end of line was found in the buffer.
 private static int find find find byteBuf buffer) {
 int i = (ByteProcessor.FIND_LF);
 if (i > 0 && (i - 1) == '\r') {
             i--;
           }
 return i;
     }
 }

(3) Based on delimiter decoder

A delimiter list can be passed to the delimiter-based delimiter decoder, DelimiterBasedFrameDecoder, so that the decoder splits the packets according to the delimiter list. The decode() method based on the delimiter decoder is basically similar to the decode() method based on the line delimiter decoder.

//A decoder that splits the received ByteBufs by one or more delimiters.  
public class DelimiterBasedFrameDecoder extends ByteToMessageDecoder {
    private final ByteBuf[] delimiters;
    private final int maxFrameLength;
    private final boolean stripDelimiter;
    private final boolean failFast;
    private boolean discardingTooLongFrame;
    private int tooLongFrameLength;
    private final LineBasedFrameDecoder lineBasedDecoder;
    ...
    //Creates a new instance.
    //@param maxFrameLength,the maximum length of the decoded frame.
    //A TooLongFrameException is thrown if the length of the frame exceeds this value.
    //@param stripDelimiter,whether the decoded frame should strip out the delimiter or not
    //@param failFast,If true, a TooLongFrameException is thrown as soon as the decoder 
    //notices the length of the frame will exceed maxFrameLength regardless of 
    //whether the entire frame has been read.
    //If false, a TooLongFrameException is thrown after the entire frame that exceeds maxFrameLength has been read.
    //@param delimiters  the delimiters
    public DelimiterBasedFrameDecoder(int maxFrameLength, boolean stripDelimiter, boolean failFast, ByteBuf... delimiters) {
        validateMaxFrameLength(maxFrameLength);
        if (delimiters == null) {
            throw new NullPointerException("delimiters");
        }
        if ( == 0) {
            throw new IllegalArgumentException("empty delimiters");
        }

        if (isLineBased(delimiters) && !isSubclass()) {
            lineBasedDecoder = new LineBasedFrameDecoder(maxFrameLength, stripDelimiter, failFast);
             = null;
        } else {
             = new ByteBuf[];
            for (int i = 0; i < ; i ++) {
                ByteBuf d = delimiters[i];
                validateDelimiter(d);
                [i] = ((), ());
            }
            lineBasedDecoder = null;
        }
         = maxFrameLength;
         = stripDelimiter;
         = failFast;
    }
    
    //Returns true if the delimiters are "\n" and "\r\n".
    private static boolean isLineBased(final ByteBuf[] delimiters) {
        if ( != 2) {
            return false;
        }
        ByteBuf a = delimiters[0];
        ByteBuf b = delimiters[1];
        if (() < ()) {
            a = delimiters[1];
            b = delimiters[0];
        }
        return () == 2 && () == 1
            && (0) == '\r' && (1) == '\n'
            && (0) == '\n';
    }

    //Return true if the current instance is a subclass of DelimiterBasedFrameDecoder
    private boolean isSubclass() {
        return getClass() != ;
    }

    @Override
    protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        Object decoded = decode(ctx, in);
        if (decoded != null) {
            (decoded);
        }
    }

    //Create a frame out of the {@link ByteBuf} and return it.
    //@param   ctx,the ChannelHandlerContext which this ByteToMessageDecoder belongs to
    //@param   buffer,the ByteBuf from which to read data
    //@return  frame,the ByteBuf which represent the frame or null if no frame could be created.
    protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
        if (lineBasedDecoder != null) {
            return (ctx, buffer);
        }
        //Try all delimiters and choose the delimiter which yields the shortest frame.
        int minFrameLength = Integer.MAX_VALUE;
        ByteBuf minDelim = null;
        for (ByteBuf delim: delimiters) {
            int frameLength = indexOf(buffer, delim);
            if (frameLength >= 0 && frameLength < minFrameLength) {
                minFrameLength = frameLength;
                minDelim = delim;
            }
        }

        if (minDelim != null) {
            int minDelimLength = ();
            ByteBuf frame;

            if (discardingTooLongFrame) {
                //We've just finished discarding a very large frame.
                //Go back to the initial state.
                discardingTooLongFrame = false;
                (minFrameLength + minDelimLength);

                int tooLongFrameLength = ;
                 = 0;
                if (!failFast) {
                    fail(tooLongFrameLength);
                }
                return null;
            }

            if (minFrameLength > maxFrameLength) {
                //Discard read frame.
                (minFrameLength + minDelimLength);
                fail(minFrameLength);
                return null;
            }

            if (stripDelimiter) {
                frame = (minFrameLength);
                (minDelimLength);
            } else {
                frame = (minFrameLength + minDelimLength);
            }
            return frame;
        } else {
            if (!discardingTooLongFrame) {
                if (() > maxFrameLength) {
                    //Discard the content of the buffer until a delimiter is found.
                    tooLongFrameLength = ();
                    (());
                    discardingTooLongFrame = true;
                    if (failFast) {
                        fail(tooLongFrameLength);
                    }
                }
            } else {
                //Still discarding the buffer since a delimiter is not found.
                tooLongFrameLength += ();
                (());
            }
            return null;
        }
    }
    
    private void fail(long frameLength) {
        if (frameLength > 0) {
            throw new TooLongFrameException("frame length exceeds " + maxFrameLength + ": " + frameLength + " - discarded");
        } else {
            throw new TooLongFrameException("frame length exceeds " + maxFrameLength + " - discarding");
        }
    }

    //Returns the number of bytes between the readerIndex of the haystack and the first needle found in the haystack.  
    //-1 is returned if no needle is found in the haystack.
    private static int indexOf(ByteBuf haystack, ByteBuf needle) {
        for (int i = (); i < (); i ++) {
            int haystackIndex = i;
            int needleIndex;
            for (needleIndex = 0; needleIndex < (); needleIndex ++) {
                if ((haystackIndex) != (needleIndex)) {
                    break;
                } else {
                    haystackIndex ++;
                    if (haystackIndex == () && needleIndex != () - 1) {
                        return -1;
                    }
                }
            }

            if (needleIndex == ()) {
                //Found the needle from the haystack!
                return i - ();
            }
        }
        return -1;
    }

    private static void validateDelimiter(ByteBuf delimiter) {
        if (delimiter == null) {
            throw new NullPointerException("delimiter");
        }
        if (!()) {
            throw new IllegalArgumentException("empty delimiter");
        }
    }

    private static void validateMaxFrameLength(int maxFrameLength) {
        if (maxFrameLength <= 0) {
            throw new IllegalArgumentException("maxFrameLength must be a positive integer: " + maxFrameLength);
        }
    }
    ...
}

(4) Based on the length domain decoder

The main logical steps are as follows:

1. Processing of discarding mode

2. Get the size of the packet to be unpacked

3. Perform length verification of data packets

4. Skip the specified byte length

5. Extract data packets

public class LengthFieldBasedFrameDecoder extends ByteToMessageDecoder {
 private final ByteOrder byteOrder;//Indicates whether the data represented by the byte stream is big-endian or small-endian, used for reading of the length domain
 private final int maxFrameLength;//Indicates the maximum length of the packet
 private final int lengthFieldOffset;//Indicate the offset of the length field
 private final int lengthFieldLength;//Indicate the length of the length field
 private final int lengthFieldEndOffset;//Indicates the offset of the first byte immediately following the length field in the entire packet
 private final int lengthAdjustment;// means that the length of the data packet body is adjusted, and the length field only indicates the length of the data packet body
 private final int initialBytesToStrip;//Indicate how many bytes should be skipped before passing to the service decoder after obtaining the complete data packet
 private final boolean failFast;//Default is true, otherwise it may OOM
 private boolean discardingTooLongFrame;
 private long tooLongFrameLength;
 private long bytesToDiscard;
       ...
 //Creates a new instance.
 //@param byteOrder, the ByteOrder of the length field
 //@param maxFrameLength, the maximum length of the frame.
     //If the length of the frame is greater than this value, TooLongFrameException will be thrown.
 //@param lengthFieldOffset, the offset of the length field
 //@param lengthFieldLength, the length of the length field
   //@param lengthAdjustment, the compensation value to add to the value of the length field
 //@param initialBytesToStrip, the number of first bytes to strip out from the decoded frame
 //@param failFast, If true, a TooLongFrameException is thrown as soon as the decoder notices the length of the frame
 //will exceed maxFrameLength regardless of whether the entire frame has been read.
 //If false, a TooLongFrameException is thrown after the entire frame that exceeds maxFrameLength has been read.
 public LengthFieldBasedFrameDecoder(ByteOrder byteOrder, int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, boolean failFast) {
             ...
 = byteOrder;
 = maxFrameLength;
 = lengthFieldOffset;
 = lengthFieldLength;
 = lengthAdjustment;
 lengthFieldEndOffset = lengthFieldOffset + lengthFieldLength;
 = initialBytesToStrip;
 = failFast;
     }
    
 @Override
 protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
           Object decoded = decode(ctx, in);
 if (decoded != null) {
 (decoded);
           }
     }

     //Create a frame out of the {@link ByteBuf} and return it.
 //@param ctx, the ChannelHandlerContext which this ByteToMessageDecoder belongs to
 //@param  in, the ByteBuf from which to read data
   //@return  frame, the ByteBuf which represents the frame or null if no frame could be created.
 protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
 //Step start: Processing of discarding mode
 if (discardingTooLongFrame) {
 //If you are currently in discard mode, first calculate how many bytes you need to discard, and take the minimum value of the bytes and readable bytes you need to discard.
 long bytesToDiscard = ;
 int localBytesToDiscard = (int) (bytesToDiscard, ());
 (localBytesToDiscard);//Discard
 bytesToDiscard -= localBytesToDiscard;
 = bytesToDiscard;
 failIfNecessary(false);
           }
 //Step as soon as the step ends
       
 //Step 2 starts: Get the size of the packet to be unpacked
 //If the currently readable byte has not reached the offset of the length domain, it means that the length domain must not be read, and it will not be read directly
 if (() < lengthFieldEndOffset) {
 return null;
           }
 // Calculate the actual byte offset of the length field
 int actualLengthFieldOffset = () + lengthFieldOffset;
 //Get the actual unadjusted packet length
 long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);
 //If the length obtained is negative, skip the length field and throw an exception
 if (frameLength < 0) {
 (lengthFieldEndOffset);
                 throw new CorruptedFrameException("negative pre-adjustment length field: " + frameLength);
           }
 //Adjust the length of the data packet and split it later
 frameLength += lengthAdjustment + lengthFieldEndOffset;
 //Step 2 ends
      
 //Step 3 starts: Perform length verification of the data packet
 //The length of the entire packet is not as long as the length domain is, an exception will be thrown directly
 if (frameLength < lengthFieldEndOffset) {
 (lengthFieldEndOffset);
             throw new CorruptedFrameException("Adjusted frame length (" + frameLength + ") is less " + "than lengthFieldEndOffset: " + lengthFieldEndOffset);
           }
 //The packet length exceeds the maximum packet length and enters discard mode
 if (frameLength > maxFrameLength) {
 long discard= frameLength - ();
 tooLongFrameLength = frameLength;
             if (discard < 0) {
                      //The current readable byte has reached frameLength, directly skip frameLength byte
 //After discarding, there may be a legal data packet behind it
 (int) frameLength);
                 } else {
                                //The current readable bytes does not reach frameLength, which means that the unread bytes later need to be discarded. Enter the discard mode and discard all the currently accumulated bytes first.
 discardingTooLongFrame = true;
                                        //bytesToDiscard indicates how many bytes need to be discarded
 bytesToDiscard = discard;
 (());
                   }
 //Call failIfNecessary to determine whether an exception needs to be thrown
 failIfNecessary(true);
 return null;
           }
 //Step three ends
      
 //Step 4 starts: Skip the specified byte length
           //never overflows because it's less than maxFrameLength
 int frameLengthInt = (int) frameLength;
 if (() < frameLengthInt) {
 //If the readable byte is still less than the length of the packet, return and continue reading next time
 return null;
           }

 if (initialBytesToStrip > frameLengthInt) {
 // If the skipped byte is greater than the length of the packet, an exception will be thrown
 (frameLengthInt);
             throw new CorruptedFrameException("Adjusted frame length (" + frameLength + ") is less " + "than initialBytesToStrip: " + initialBytesToStrip);
           }
 (initialBytesToStrip);
 //Step 4 ends
      
 //Step 5 starts: Extract data packets
 //Get the read pointer of the currently accumulated data
 int readerIndex = ();
 //Get the actual length of the data packet to be extracted for extraction
 int actualFrameLength = frameLengthInt - initialBytesToStrip;
 //Document data
 ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength);
 //Move read pointer
 (readerIndex + actualFrameLength);
 return frame;
     }
    
 protected ByteBuf extractFrame(ChannelHandlerContext ctx, ByteBuf buffer, int index, int length) {
 return (index, length);
     }
    
 //Get the actual unadjusted packet length
 //If the value expression represented by the length field is not of normal int, short, etc., then this method can be rewrite
 //For example, although some length fields are 4 bytes, such as 0x1234, their meaning is decimal, that is, the length is decimal 1234
 protected long getUnadjustedFrameLength(ByteBuf buf, int offset, int length, ByteOrder order) {
             buf = (order);
 long frameLength;
 switch (length) {
 case 1:
 frameLength = (offset);
 break;
 case 2:
 frameLength = (offset);
 break;
 case 3:
 frameLength = (offset);
 break;
 case 4:
 frameLength = (offset);
 break;
 case 8:
 frameLength = (offset);
 break;
 default:
             throw new DecoderException("unsupported lengthFieldLength: " + lengthFieldLength + " (expected: 1, 2, 3, 4, or 8)");
           }
 return frameLength;
     }

 private void failIfNecessary(boolean firstDetectionOfTooLongFrame) {
 //No need to discard the subsequent unread bytes, start resetting the discard state
 if (bytesToDiscard == 0) {
                    //Reset to the initial state and tell the handlers that the frame was too large.
 long tooLongFrameLength = ;
                   = 0;
 discardingTooLongFrame = false;
 //If the fast failure is not set, or the fast failure is set and the packet error is detected for the first time, an exception is thrown and the Handler handles it
               if (!failFast || failFast && firstDetectionOfTooLongFrame) {
                             fail(tooLongFrameLength);
                   }
           } else {
 //If the fast failure is set and the packaging error is detected for the first time, an exception is thrown and the Handler handles it
               if (failFast && firstDetectionOfTooLongFrame) {
                             fail(tooLongFrameLength);
                   }
           }
     }

 protected ByteBuf extractFrame(ChannelHandlerContext ctx, ByteBuf buffer, int index, int length) {
 return (index, length);
     }

 private void fail(long frameLength) {
 if (frameLength > 0) {
             throw new TooLongFrameException("Adjusted frame length exceeds " + maxFrameLength + ": " + frameLength + " - discarded");
           } else {
             throw new TooLongFrameException("Adjusted frame length exceeds " + maxFrameLength + " - discarding");
           }
     }
       ...
 }

 

The general steps of the () method

(1) Calling portal of writeAndFlush() method

(2) Execution process of writeAndFlush() method

 

(1) Calling portal of writeAndFlush() method

The entry is usually: ().writeAndFlush().

public class NettyServerHandler extends ChannelInboundHandlerAdapter {
     //After three handshakes of tcp in the network, a channel (network connection communication pipeline) will be established and encapsulated.
     //At this time, this Channel can realize an activation
     @Override
     public void channelActive(ChannelHandlerContext ctx) throws Exception {
         ("Channel Active...");
         ().writeAndFlush("test5");
     }

     @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
         ("Channel Read: " + (String)msg);
         String response = "Hello World...";
         ByteBuf responseByteBuf = ();
         (());

         ().writeAndFlush(responseByteBuf);
         ("Channel Write: " + response);
     }

     @Override
     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
         ("Channel Read Complete...");
         ();
     }

     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
         ();
         ();
     }
 }

(2) Execution process of writeAndFlush() method

First, start propagating from the tail node. Then call the write() method of ChannelHandler one by one until a ChannelHandler no longer propagates the write event forward. Then call the ChannelHandler flush() method one by one until a ChannelHandler no longer propagates the flush event forward.

 

Generally speaking, as long as each ChannelHandler propagates the write event and flush event downward, it will eventually propagate to the write() method and flush() method of the HeadContext node, and then execute () and () respectively to write the data to the channel under the JDK through the underlying unsafe.

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
     private final DefaultChannelPipeline pipeline;
     ...
     @Override
     public ChannelFuture writeAndFlush(Object msg) {
         return (msg);
     }
     ...
 }

 public class DefaultChannelPipeline implements ChannelPipeline {
     final AbstractChannelHandlerContext head;
     final AbstractChannelHandlerContext tail;
     private final Channel channel;
    
     protected DefaultChannelPipeline(Channel channel) {
          = (channel, "channel");
         tail = new TailContext(this);
         head = new HeadContext(this);
          = tail;
          = head;
     }
    
     @Override
     public final ChannelFuture writeAndFlush(Object msg) {
         //Start propagation from TailContext
         //But TailContext does not rewrite writeAndFlush() method
         //So the writeAndFlush() method of AbstractChannelHandlerContext will be called
         return (msg);
     }
     ...
 }

 abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint {
     volatile AbstractChannelHandlerContext next;
     volatile AbstractChannelHandlerContext prev;
     ...
     @Override
     public ChannelFuture writeAndFlush(Object msg) {
         return writeAndFlush(msg, newPromise());
     }
    
     @Override
     public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
         if (msg == null) throw new NullPointerException("msg");
         if (!validatePromise(promise, true)) {
             (msg);
             return promise;
         }
         write(msg, true, promise);
         return promise;
     }
    
     private void write(Object msg, boolean flush, ChannelPromise promise) {
         //Reversely traverse the linked list to search
         AbstractChannelHandlerContext next = findContextOutbound();
         final Object m = (msg, next);
         EventExecutor executor = ();
         //In the end, the Reactor thread will handle the channel's data reading and writing
         if (()) {
             if (flush) {
                 //Calling the invokeWriteAndFlush() method of the node
                 (m, promise);
             } else {
                 (m, promise);
             }
         } else {
             AbstractWriteTask task;
             if (flush) {
                 task = (next, m, promise);
             } else {
                 task = (next, m, promise);
             }
             safeExecute(executor, task, promise, m);
         }
     }
    
     private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
         if (invokeHandler()) {
             //Call the write() method of the ChannelHandler node one by one, but the premise is that the current ChannelHandler can be uploaded down
             //In the end, the write() method also calls () and propagates downwards like ChannelOutboundHandlerAdapter.
             invokeWrite0(msg, promise);
             //Call the flush() method of the ChannelHandler node one by one, but the premise is that the current ChannelHandler can be uploaded down
             //In the end, the flush() method also calls () and propagates downwards like ChannelOutboundHandlerAdapter.
             invokeFlush0();
         } else {
             writeAndFlush(msg, promise);
         }
     }
    
     private void invokeWrite0(Object msg, ChannelPromise promise) {
         try {
             //Call one by one, and finally return to the write() method of HeadContext
             ((ChannelOutboundHandler) handler()).write(this, msg, promise);
         } catch (Throwable t) {
             notifyOutboundHandlerException(t, promise);
         }
     }
    
     private void invokeFlush0() {
         try {
             //Call one by one, and finally return to the flush() method of HeadContext
             ((ChannelOutboundHandler) handler()).flush(this);
         } catch (Throwable t) {
             notifyHandlerException(t);
         }
     }
     ...
 }

 public class DefaultChannelPipeline implements ChannelPipeline {
     ...
     final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {
         private final Unsafe unsafe;
         HeadContext(DefaultChannelPipeline pipeline) {
             super(pipeline, null, HEAD_NAME, false, true);
             unsafe = ().unsafe();
             setAddComplete();
         }
         ...
         @Override
         public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
             (msg, promise);
         }

         @Override
         public void flush(ChannelHandlerContext ctx) throws Exception {
             ();
         }
     }
     ...
 }

 //Skeleton implementation of a ChannelOutboundHandler. This implementation just forwards each method call via the ChannelHandlerContext.
 public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler {
     //Calls ChannelHandlerContext#bind(SocketAddress, ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline.
     @Override
     public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
         (localAddress, promise);
     }

     //Calls ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline.
     @Override
     public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
         (remoteAddress, localAddress, promise);
     }

     //Calls ChannelHandlerContext#disconnect(ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline.
     @Override
     public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
         (promise);
     }

     //Calls ChannelHandlerContext#close(ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline.
     @Override
     public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
         (promise);
     }

     //Calls ChannelHandlerContext#deregister(ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline.
     @Override
     public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
         (promise);
     }

     //Calls ChannelHandlerContext#read() to forward to the next ChannelOutboundHandler in the ChannelPipeline.
     @Override
     public void read(ChannelHandlerContext ctx) throws Exception {
         ();
     }

     //Calls ChannelHandlerContext#write(Object, ChannelPromise)} to forward to the next ChannelOutboundHandler in the ChannelPipeline.
     @Override
     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
         (msg, promise);
     }

     //Calls ChannelHandlerContext#flush() to forward to the next ChannelOutboundHandler in the ChannelPipeline.
     @Override
     public void flush(ChannelHandlerContext ctx) throws Exception {
         ();
     }
 }

 

Coding steps

(1) Specific steps for encoding

(2) Summary of coding steps

(3) Examples of subclass implementation encoding

 

(1) Specific steps for encoding

Step 1: Determine the object

Determine whether the current ChannelHandler node can handle writing to incoming Java objects. If it can be processed, then execute it down, otherwise it will be passed directly to the next ChannelHandler node for processing.

 

Step 2: Allocate memory

Allocate a piece of memory space to the newly created ByteBuf object, which will store byte data converted from the Java object.

 

Step 3: Call encode

The subclass will implement the abstract method encode() of MessageToByteEncoder to define its own encoding protocol. The encode() method of the subclass will write the byte data converted from Java objects to ByteBuf.

 

Step 4: Release the object

Since the incoming Java object has been converted into a ByteBuf byte stream, the incoming Java object is no longer available for release.

 

Step 5: Propagate data

When the encode() method of the subclass writes data to the ByteBuf object and releases the object, the ByteBuf object will be propagated to the previous ChannelHandler node, otherwise the empty object will be propagated to the previous ChannelHandler node.

 

Step 6: Free the memory

If an exception occurs or ByteBuf does not write data or ByteBuf has been processed in the pipeline, the memory allocated to the ByteBuf object is released.

//ChannelOutboundHandlerAdapter which encodes message in a stream-like fashion from one message to an ByteBuf.
 //Example implementation which encodes Integers to a ByteBuf.
 //public class IntegerEncoder extends MessageToByteEncoder<Integer> {
 // @code @Override
 // public void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) throws Exception {
 // (msg);
 // }
 //}
 public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter {
     private final TypeParameterMatcher matcher;
     private final boolean preferDirect;

     protected MessageToByteEncoder() {
         this(true);
     }
    
     protected MessageToByteEncoder(Class<? extends I> outboundMessageType) {
         this(outboundMessageType, true);
     }

     //Create a new instance which will try to detect the types to match out of the type parameter of the class.
     //@param preferDirect, true if a direct ByteBuf should be tried to be used as target for the encoded messages.
     //If false is used it will allocate a heap ByteBuf, which is backed by an byte array.
     protected MessageToByteEncoder(boolean preferDirect) {
         matcher = (this, , "I");
          = preferDirect;
     }

     //Create a new instance
     //@param outboundMessageType, The tpye of messages to match
     //@param preferDirect, true if a direct ByteBuf should be tried to be used as target for the encoded messages.
     //If false is used it will allocate a heap ByteBuf, which is backed by an byte array.
     protected MessageToByteEncoder(Class<? extends I> outboundMessageType, boolean preferDirect) {
         matcher = (outboundMessageType);
          = preferDirect;
     }

     //Returns true if the given message should be handled.
     //If false it will be passed to the next ChannelOutboundHandler in the ChannelPipeline.
     public boolean acceptOutboundMessage(Object msg) throws Exception {
         return (msg);
     }

     @Override
     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
         ByteBuf buf = null;
         try {
             //Step 1: Determine whether the current ChannelHandler can process the written message
             if (acceptOutboundMessage(msg)) {
                 @SuppressWarnings("unchecked")
                 I cast = (I) msg;//Captive conversion
                 //Step 2: Allocate memory to ByteBuf object
                 buf = allocateBuffer(ctx, cast, preferDirect);
                 try {
                     //Step 3: Call the encode() method implemented by the subclass
                     encode(ctx, cast, buf);
                 } finally {
                     //Step 4: Release the object
                     //Since the custom Java object msg has been converted to a ByteBuf object, the object is useless and needs to be released
                     //Note: When the type of the incoming msg is of ByteBuf, it does not need to be released
                     (cast);
                 }
                 //Step 5: If data is written in buf, pass the buf to the next ChannelHandler node
                 if (()) {
                     (buf, promise);
                 } else {
                     //Step 6: If no data is written in the buf, release the buf and pass an empty data to the next ChannelHandler node
                     ();
                     (Unpooled.EMPTY_BUFFER, promise);
                 }
                 buf = null;
             } else {
                 (msg, promise);
             }
         } catch (EncoderException e) {
             throw e;
         } catch (Throwable e) {
             throw new EncoderException(e);
         } finally {
             if (buf != null) {
                 ();//When the buf is processed in the pipeline, it needs to be released
             }
         }
     }

     //Allocate a ByteBuf which will be used as argument of #encode(ChannelHandlerContext, I, ByteBuf).
     //Sub-classes may override this method to return ByteBuf with a perfect matching initialCapacity.
     protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, @SuppressWarnings("unused") I msg, boolean preferDirect) throws Exception {
         if (preferDirect) {
             return ().ioBuffer();
         } else {
             return ().heapBuffer();
         }
     }

     //Encode a message into a ByteBuf.
     //This method will be called for each written message that can be handled by this encoder.
     //@param ctx, the ChannelHandlerContext which this MessageToByteEncoder belongs to
     //@param msg, the message to encode
     //@param out, the ByteBuf into which the encoded message will be written
     //@throws Exception, is thrown if an error acour
     protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception;
 }

(2) Summary of coding steps

During the encoding process of MessageToByteEncoder, it will first determine whether the current ChannelHandler can process the incoming Java object. If it can be processed, allocate a piece of memory space to the newly created ByteBuf object. Then the encode() method of the subclass implements a specific encoding protocol, and the encoded data is stored in the memory space allocated to the ByteBuf object. Finally, propagate the ByteBuf object to the previous ChannelHandler node.

 

If an exception occurs during the encoding process, the memory space that has been applied and allocated to the ByteBuf object is released.

 

If the passed Java object is a ByteBuf object, Netty will automatically help release the object after the custom encoding is completed, and there is no need to release the object in the subclass.

 

(3) Examples of subclass implementation encoding

The following Encoder implements the effect of converting a custom Response object into a byte stream and writing to the underlying Socket.

public class Encoder extends MessageToByteEncoder<Response> {
    protected void encode(ChannelHandlerContext ctx, Response response, ByteBuf out) throws Exception {
        (());
        (4+ ().length);
        (());    
    }
}

 

() Add data to the write buffer

(1) The entrance to ()

(2) The main logic of ()

(3) Data structure of write buffer (write queue)

 

(1) The entrance to ()

Whether it is ().write() or (), it will eventually come to the head node in the pipeline.

public class DefaultChannelPipeline implements ChannelPipeline {
    ...
    final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {
        private final Unsafe unsafe;
        HeadContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, HEAD_NAME, false, true);
            unsafe = ().unsafe();
            setAddComplete();
        }
        ...
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            (msg, promise);
        }

        @Override
        public void flush(ChannelHandlerContext ctx) throws Exception {
            ();
        }
    }
    ...
}

(2) The main logic of ()

The main logic of the () method to add data to the write buffer (write queue) is as follows.

 

1. Directed ByteBuf object

If the passed ByteBuf object is not off-heap memory, it will be directly converted into off-heap memory and its size will be estimated.

 

2. Add to write buffer

The ByteBuf object converted to off-heap memory will first be encapsulated into an Entry object, and then the Entry object will be added to the write buffer, which will use several pointers to identify the state of the write buffer.

 

3. Set the write status

If the memory is insufficient, you cannot add ByteBuf objects to the write buffer all the time. If the write buffer is already larger than the default size of 64KB, the current Channel is set to the unwritable state by spin + CAS.

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
     private final DefaultChannelPipeline pipeline;
     ...
     protected abstract class AbstractUnsafe implements Unsafe {
         //Write buffer (write queue)
         private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer();
         ...
         @Override
         public final void write(Object msg, ChannelPromise promise) {
             // Make sure that the method is called in the Reactor thread
             assertEventLoop();
             //Write buffer
             ChannelOutboundBuffer outboundBuffer = ;
             ...
             int size;
             try {
                 //Convert to external memory
                 msg = filterOutboundMessage(msg);
                 //Estimate the size of ByteBuf that needs to be written to
                 size = ().size(msg);
                 if (size < 0) {
                     size = 0;
                 }
             } catch (Throwable t) {
                 safeSetFailure(promise, t);
                 (msg);
                 return;
             }
             //Add msg converted to off-heap memory to write buffer outboundBuffer
             (msg, size, promise);
         }
         ...
     }
     ...
 }

 public abstract class AbstractNioByteChannel extends AbstractNioChannel {
     ...
     @Override
     protected final Object filterOutboundMessage(Object msg) {
         if (msg instanceof ByteBuf) {
             ByteBuf buf = (ByteBuf) msg;
             if (()) {
                 return msg;
             }
             return newDirectBuffer(buf);
         }
         if (msg instanceof FileRegion) {
             return msg;
         }
         throw new UnsupportedOperationException("unsupported message type: " + (msg) + EXPECTED_TYPES);
     }
     ...
 }

(3) Data structure of write buffer (write queue)

The data structure in ChannelOutboundBuffer is a one-way linked list, and each node of the one-way linked list is an Entry object. An Entry object will contain the ByteBuf object to be written and the message callback promise. The flushedEntry pointer represents the first node written to the Socket buffer, the unflushedEntry pointer represents the first node not written to the Socket buffer, and the tailEntry pointer represents the last node of the ChannelOutboundBuffer buffer.

 

After the first call to the addMessage() method of ChannelOutboundBuffer, the flushedEntry pointer points to NULL, and the unflushedEntry pointer and tailEntry pointer point to the newly added node. After calling the addMessage() method of ChannelOutboundBuffer multiple times, if the flushedEntry pointer always points to NULL, it means that the ByteBuf object that does not have a node is written to the Socket buffer. If there are n nodes after the unflushedEntry pointer, it means that the ByteBuf object with n nodes has not been written to the Socket buffer.

public final class ChannelOutboundBuffer {
    private final Channel channel;
    
    //Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)
    //The Entry that is the first in the linked-list structure that was flushed
    private Entry flushedEntry;
    //The Entry which is the first unflushed in the linked-list structure
    private Entry unflushedEntry;
    //The Entry which represents the tail of the buffer
    private Entry tailEntry;
    ...
    
    ChannelOutboundBuffer(AbstractChannel channel) {
         = channel;
    }

    //Add given message to this ChannelOutboundBuffer. 
    //The given {@link ChannelPromise} will be notified once the message was written.
    public void addMessage(Object msg, int size, ChannelPromise promise) {
        Entry entry = (msg, size, total(msg), promise);
        if (tailEntry == null) {
            flushedEntry = null;
            tailEntry = entry;
        } else {
            Entry tail = tailEntry;
             = entry;
            tailEntry = entry;
        }
        if (unflushedEntry == null) {
            unflushedEntry = entry;
        }

        //increment pending bytes after adding message to the unflushed arrays.
        incrementPendingOutboundBytes(size, false);
    }
    
    static final class Entry {
        private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() {
            @Override
            protected Entry newObject(Handle handle) {
                return new Entry(handle);
            }
        };
        private final Handle<Entry> handle;
        Entry next;
        Object msg;
        ByteBuffer[] bufs;
        ByteBuffer buf;
        ChannelPromise promise;
        long progress;
        long total;
        int pendingSize;
        int count = -1;
        boolean cancelled;
  
        private Entry(Handle<Entry> handle) {
             = handle;
        }

        static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {
            Entry entry = ();
             = msg;
             = size;
             = total;
             = promise;
            return entry;
        }
        ...
    }
}

 

() Refresh the data of the write buffer

(1) The entrance to ()

(2) The main logic of ()

 

(1) The entrance to ()

Whether it is ().flush() or (), it will eventually come to the head node in the pipeline.

public class DefaultChannelPipeline implements ChannelPipeline {
    ...
    final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {
        private final Unsafe unsafe;
        HeadContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, HEAD_NAME, false, true);
            unsafe = ().unsafe();
            setAddComplete();
        }
        ...
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            (msg, promise);
        }

        @Override
        public void flush(ChannelHandlerContext ctx) throws Exception {
            ();
        }
    }
    ...
}

(2) The main logic of ()

Step 1:Set the flushedEntry pointer to the Entry node pointed to by the unflushedEntry pointer, and count the number of Entry nodes that need to be refreshed.

 

Step 2:Iterate through the Entry node of the write buffer and write the corresponding ByteBuf object to the Socket, and then remove the Entry node. If the write buffer size is already less than 32KB, then the Channel is set to the writable state by spin + CAS.

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
     private final DefaultChannelPipeline pipeline;
     ...
     protected abstract class AbstractUnsafe implements Unsafe {
         private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer();
         ...
         @Override
         public final void flush() {
             assertEventLoop();
             ChannelOutboundBuffer outboundBuffer = ;
             if (outboundBuffer == null) {
                 return;
             }
             //Step 1
             ();
             //Step 2
             flush0();
         }
         protected void flush0() {
             final ChannelOutboundBuffer outboundBuffer = ;
             ...
             doWrite(outboundBuffer);
             ...
         }
     }
    
     //Flush the content of the given buffer to the remote peer.
     protected abstract void doWrite(ChannelOutboundBuffer in) throws Exception;
 }

 public abstract class AbstractNioByteChannel extends AbstractNioChannel {
     ...
     @Override
     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
         //Default spin 16 times to improve memory usage and write throughput
         int writeSpinCount = config().getWriteSpinCount();
         do {
             Object msg = ();
             if (msg == null) {
                 //Re-register, do not pay attention to the OP_WRITE event
                 clearOpWrite();
                 return;
             }
             writeSpinCount -= doWriteInternal(in, msg);
         } while(writeSpinCount > 0);
         incompleteWrite(setOpWrite);
     }
    
     private int doWriteInternal(ChannelOutboundBuffer in, Object msg) {
         ...
         ByteBuf buf = (ByteBuf) msg;
         if (!()) {
             //Remove nodes from write buffer (write queue)
             ();
             return 0;
         }
         //Write the ByteBuf object into the Socket
         final int localFlushedAmount = doWriteBytes(buf);
         if (localFlushedAmount > 0) {
             (localFlushedAmount);
             if (!()) {
                 //Remove nodes from write buffer (write queue)
                 ();
             }
             return 1;
         }
         ...
     }
    
     protected final void clearOpWrite() {
         final SelectionKey key = selectionKey();
         //Check first if the key is still valid as it may be cancelled as part of the deregistration from the EventLoop.
         if (!()) {
             return;
         }
         final int interestOps = ();
         if ((interestOps & SelectionKey.OP_WRITE) != 0) {
             (interestOps & ~SelectionKey.OP_WRITE);
         }
     }
    
     @Override
     protected int doWriteBytes(ByteBuf buf) throws Exception {
         final int expectedWrittenBytes = ();
         return (javaChannel(), expectedWrittenBytes);
     }
     ...
 }

 public final class ChannelOutboundBuffer {
     private final Channel channel;
    
     //Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)
     //The Entry that is the first in the linked-list structure that was flushed
     private Entry flushedEntry;
     //The Entry which is the first unflushed in the linked-list structure
     private Entry unflushedEntry;
     //The Entry which represents the tail of the buffer
     private Entry tailEntry;
    
     private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER;
     @SuppressWarnings("UnusedDeclaration")
     private volatile int unwritable;
    
     //The number of flushed entries that are not written yet
     private int flushed;
     ...
    
     //Set the flushedEntry pointer to the Entry node pointed to by the unflushedEntry pointer.
     // And count the number of Entry nodes that need to be refreshed
     public void addFlush() {
         Entry entry = unflushedEntry;
         if (entry != null) {
             if (flushedEntry == null) {
                 flushedEntry = entry;
             }
             do {
                 flushed ++;//Number of nodes to flush is required
                 entry = ;
             } while (entry != null);
             unflushedEntry = null;
         }
     }
    
     public boolean remove() {
         //Get the node currently being flushed
         Entry e = flushedEntry;
         Object msg = ;
         //Get the callback object of this node
         ChannelPromise promise = ;
         int size = ;
         //Remove nodes from write buffer queue
         removeEntry(e);
         if (!) {
             (msg);
             safeSuccess(promise);
             //If the write buffer size is less than 32KB, set the Channel state to writeable through spin + CAS
             decrementPendingOutboundBytes(size, false, true);
         }
         //Recycle entity
         ();
         return true;
     }
    
     private void removeEntry(Entry e) {
         if (-- flushed == 0) {
             flushedEntry = null;
             if (e == tailEntry) {
                 tailEntry = null;
                 unflushedEntry = null;
             }
         } else {
             flushedEntry = ;
         }
     }

     //Return the current message to write or null if nothing was flushed before and so is ready to be written.
     public Object current() {
         Entry entry = flushedEntry;
         if (entry == null) {
             return null;
         }
         return ;
     }

     //Notify the ChannelPromise of the current message about writing progress.
     public void progress(long amount) {
         Entry e = flushedEntry;
         assert e != null;
         ChannelPromise p = ;
         if (p instanceof ChannelProgressivePromise) {
             long progress = + amount;
              = progress;
             ((ChannelProgressivePromise) p).tryProgress(progress, );
         }
     }
    
     private void declarationPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
         if (size == 0) {
             return;
         }
         long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
         if (notifyWritability && newWriteBufferSize < ().getWriteBufferLowWaterMark()) {
             setWritable(invokeLater);
         }
     }
    
     private void setWritable(boolean invokeLater) {
         for (;;) {
             final int oldValue = unwritable;
             final int newValue = oldValue & ~1;
             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
                 if (oldValue != 0 && newValue == 0) {
                     fireChannelWritabilityChanged(invokeLater);
                 }
                 break;
             }
         }
     }
     ...
 }

 

10. How to turn an object into a byte stream and write it to the underlying layer of unsafe

When calling().writeAndFlush(user) propagates a custom User object along the entire Pipeline:

 

First, the write() method of tail node will be called to start propagating forward, and to a node inherited from MessageToByteEncoder. This node will implement the encode() method of MessageToByteEncoder to convert the custom User object into a ByteBuf object. The conversion process will first assign a ByteBuf object by MessageToByteEncoder, and then call the abstract method implemented by its subclass encode() to fill the User object into the ByteBuf object. After filling, continue to call the write() method to propagate the ByteBuf object forward, and by default, it will eventually propagate to the head node.

 

The write() method of the head node will be processed as follows through the underlying unsafe: add the current ByteBuf object to a write buffer maintained by unsafe, and calculate whether the write buffer size exceeds 64KB. If the write buffer size exceeds 64KB, set the current Channel to be unwritable. After the propagation of the write() method is completed, the write buffer maintained by the unsafe object of the head node corresponds to a ByteBuf queue, which is a one-way linked list.

 

Then the flush() method of tail node will be called to start propagating forward, and will eventually propagate to the head node by default. When the head node receives the flush event, it will process the following process through the underlying unsafe: first adjust the pointer, and then remove the ByteBuf object from the write buffer through loop traversal. Every time a ByteBuf object is taken out, it will be converted into a ByteBuffer object that is acceptable to the JDK underlying layer, and finally write the ByteBuffer object through the JDK Channel. After each ByteBuffer object is written, the Entry node where the current ByteBuf is located in the write buffer is deleted, and it is determined that if the size in the current write buffer is less than 32KB, the Channel is reset to writeable through spin + CAS.