This article is based on Netty version 4.1.
It's been a long time since I've updated you on Netty-related articles, but in the interim, I've been continuously updatingLinux Memory Management related articles So far, it is the Linux memory management subsystem related to the backbone of the source code is more complete to present to you, but also met a lot of readers like the kernel, often in the background message to discuss some of the design details of the code, in the process, we share with each other, learning from each other, and a strong sense of the pure love of technology, for myself, but also a kind of incentive! For myself, it's also a kind of incentive, learning and improving opportunities.
The perspective of the previous series of articles has been to stay in the kernel state, I tried to reveal the essence of memory management from the perspective of the Linux kernel, so from today onwards, we move the perspective in the upward a little bit, from the kernel state to the user state, and continue to follow the memory management of the main line, to see how the memory management of the user state is carried out.
Next I plan to use the length of three articles for you to analyze the Netty memory management module, this article is the first, mainly around the periphery of the Netty memory management to introduce the overall design of ByteBuf.
Don't look at the ByteBuf system involves more classes, a glance at the past is relatively large, but we in accordance with different perspectives, they are categorized one by one, the entire system will become very clear:
-
From the perspective of JVM memory area layout, Netty's ByteBuf is mainly divided into two types: HeapByteBuf (in-heap) and DirectByteBuf (out-of-heap).
-
From the perspective of memory management, Netty's ByteBuf is divided into two subtypes: PooledByteBuf (pooled) and UnpooledByteBuf (unpooled). One is managed by the memory pool, while the other is like a normal ByteBuf, which is created temporarily when in use and released when not in use.
-
From the perspective of memory access, Netty divides ByteBuf into UnsafeByteBuf and normal ByteBuf. UnsafeByteBuf relies on the underlying API provided by the Unsafe class to manipulate memory addresses directly. UnsafeByteBuf relies on the underlying API provided by the Unsafe class to manipulate memory addresses directly, while ordinary ByteBuf relies on the ByteBuffer in NIO to manipulate memory.
-
From the perspective of memory reclamation, ByteBuf is divided into ByteBuf with Cleaner and NoCleanerByteBuf without Cleaner, which is used by the JDK to free the Native Memory referenced behind the NIO ByteBuffer, which is managed by the JVM. NoCleanerByteBuf NoCleanerByteBuf is used to free the Native Memory behind the NIO ByteBuffer, which is managed by the JVM.
-
In terms of memory footprint statistics, Netty takes a step further and divides ByteBufs into InstrumentedByteBufs, which have metrics related to memory footprint for monitoring, and regular ByteBufs, which don't have any metrics. Metrics.
-
From a zero-copy perspective, Netty introduces CompositeByteBuf to provide a unified logical view of multiple ByteBufs when aggregating them into a single logical CompositeByteBuf, whereas traditional aggregation operations first allocate a large ByteBuf and then copy the contents of the multiple ByteBufs to the new ByteBuf. CompositeByteBuf avoids the overhead of allocating a large block of memory and copying the contents of multiple ByteBufs that need to be aggregated into a single logical CompositeByteBuf, whereas traditional aggregation operations first allocate a large ByteBuf and then copy the contents of the multiple ByteBufs that need to be aggregated into a new ByteBuf. Note that zero-copy refers to Netty's own implementation of memory-copy avoidance at the userland level, not at the OS level.
-
In addition, Netty's ByteBuf supports reference counting and automatic memory leak detection, so if there is a memory leak, Netty will report the exact location of the leak.
-
Netty's ByteBuf supports scaling, while NIO's ByteBuffer does not.
After Netty's ByteBuf design system is sorted out completely, we will find that Netty's ByteBuf is actually an extension and improvement of JDK ByteBuffer, so the following author's line of thought is to introduce Netty's ByteBuf in contrast to JDK ByteBuffer, and with the contrast, we can more deeply appreciate the subtleties of Netty's design. So I'm going to introduce Netty's ByteBuf in comparison to the JDK ByteBuffer.
1. What's wrong with the ByteBuffer design in the JDK?
The author has worked onA step-by-step guide to analyze the design and implementation of JDK NIO ByteBuffer in different byte sequences. The JDK ByteBuffer design system is described in full in the article, here is a brief recollection of the core elements of the ByteBuffer.
public abstract class Buffer {
private int mark = -1;
private int position = 0;
private int limit;
private int capacity;
}
-
capacity specifies the capacity of the entire Buffer, specifically how many elements it can hold. the elements before capacity are the Buffer's operable space, and ByteBuffer in the JDK is not expandable.
-
The position pointer is used to point to the next operable element in the Buffer, and is initially set to 0. All read and write operations to and from the Buffer share the same position pointer, which is used to point to the next writable position when the Buffer is in write mode. In read mode, the position pointer points to the next readable position.
-
limit is used to limit the maximum number of elements that can be manipulated by the Buffer, and the position pointer cannot exceed limit.
Since the JDK ByteBuffer is designed with only one position pointer, we need to constantly adjust the position when reading and writing ByteBuffer. For example, use flip(), rewind(), compact(), clear() and other methods to constantly switch between read and write modes.
Some specific scenarios are that when we write to a ByteBuffer, the position pointer keeps moving backward as data is written to the ByteBuffer. After the write operation is complete, if we want to read the data we just wrote from the ByteBuffer, it's a problem.
Since the JDK's design for ByteBuffer mixes read and write operations with a single position pointer, we need to flip() to adjust the position before reading the ByteBuffer to switch between read modes.
public final Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}
After we have read all the data in the ByteBuffer, if we write to the ByteBuffer again, then we need to readjust the position again, and switch the write mode by clear().
public final Buffer clear() {
position = 0;
limit = capacity;
mark = -1;
return this;
}
If we only partially read the data in the ByteBuffer but not all of it, we need to switch to write mode by using the compact() method when writing to avoid the unread portion being overwritten by the next write operation.
class HeapByteBuffer extends ByteBuffer {
//HeapBufferThe bottom layer in the middle is responsible for storing the data in arrays
final byte[] hb;
public ByteBuffer compact() {
(hb, ix(position()), hb, ix(0), remaining());
position(remaining());
limit(capacity());
discardMark();
return this;
}
public final int remaining() {
return limit - position;
}
final void discardMark() {
mark = -1;
}
}
From the above list of these read and write ByteBuffer scenarios can be seen, when we operate the ByteBuffer, we need to always keep a clear head, which parts of the ByteBuffer is readable, which parts are writable to have a clear understanding, a little inattention will be wrong. In complex encoding and decoding logic, if you use ByteBuffer words, you need to constantly read and write mode switching, cut the cut people will be stupid.
In addition to the ByteBuffer operation is more troublesome, the JDK for ByteBuffer is not designed for pooling management mechanism, and in the face of a large number of scenarios that require the use of off-heap memory, we need to constantly create DirectBuffer, DirectBuffer in the use of the end of the recycling is a problem.
The JDK itself has a delay in recycling DirectBuffers, we need to wait until a FullGc, the Native Memory referenced behind these DirectBuffers can be automatically recycled by the JVM. So in order to reclaim the Native Memory, we need to worry about the manual release of DirectBuffer.
JDK's ByteBuffer does not support reference counting. Without reference counting, there is no way to know how many times a DirectBuffer has been referenced and how many times it has been freed, and there is no way to automatically detect memory leaks caused by DirectBuffers.
In addition, the JDK's ByteBuffer does not support dynamic on-demand adaptive scaling; when a ByteBuffer is created, its capacity is fixed. In practice, it's difficult to accurately assess how big a ByteBuffer is needed at the outset; allocating a large size is wasteful; allocating a small size is wasteful; and allocating a small size is wasteful. If the allocated capacity is too big, it will be wasted; if the allocated capacity is too small, we need to judge whether the remaining capacity is enough every time we write, and if it is not enough, we need to manually apply for a bigger ByteBuffer, and then migrate the data from the original ByteBuffer to the new one, which is a pain in the ass to think about.
There is also the fact that when multiple JDK ByteBuffers are merged and aggregated, a larger ByteBuffer is always created first, and then the contents of the original multiple ByteBuffers are copied into the new ByteBuffer. This involves memory allocation and copying overhead.
Why can't we use the memory space occupied by the original ByteBuffer to create only one logical view ByteBuffer, and transfer all logical operations on the view ByteBuffer to the original memory space, so as to save the overhead of reallocating memory and memory copying?
Let's take a look at how ByteBuf in Netty solves and refines the above problem~~~
2. Netty's design and implementation of ByteBuf.
In the previous introduction to the overall design of the JDK ByteBuffer, the author is to HeapByteBuffer as an example of the ByteBuffer of the entire design system together, then in this article I will use DirectByteBuf for the Netty ByteBuf design system together.
2.1 Basic Structure of ByteBuf
public abstract class AbstractByteBuf extends ByteBuf {
int readerIndex;
int writerIndex;
private int markedReaderIndex;
private int markedWriterIndex;
private int maxCapacity;
}
public class UnpooledDirectByteBuf extends AbstractReferenceCountedByteBuf {
private int capacity;
}
In order to avoid the cumbersome operations caused by the JDK ByteBuffer sharing a single position pointer in read and write modes, Netty introduces two pointers to the ByteBuf, readerIndex, which points to the first readable byte position in the ByteBuf, and writerIndex, which points to the first writable byte position in the ByteBuf. byte location in the ByteBuf. With these two independent pointers, we don't need to switch between read and write modes when reading and writing Netty ByteBufs. The corresponding markedReaderIndex and markedWriterIndex are used to support ByteBuf-related mark and reset operations, which is consistent with the design in the JDK.
@Override
public ByteBuf markReaderIndex() {
markedReaderIndex = readerIndex;
return this;
}
@Override
public ByteBuf resetReaderIndex() {
readerIndex(markedReaderIndex);
return this;
}
@Override
public ByteBuf markWriterIndex() {
markedWriterIndex = writerIndex;
return this;
}
@Override
public ByteBuf resetWriterIndex() {
writerIndex(markedWriterIndex);
return this;
}
Since the JDK ByteBuffer is not designed to support scaling, Netty introduces a new field, maxCapacity, to indicate that the ByteBuf capacity can only be scaled up to maxCapacity.
@Override
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
if (minNewCapacity > maxCapacity) {
throw new IllegalArgumentException((
"minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
minNewCapacity, maxCapacity));
}
}
The capacity of Netty ByteBuf is consistent with the meaning of capacity in JDK ByteBuffer, and is used to represent the initial capacity of ByteBuf, that is, the initialCapacity parameter passed in the following when creating UnpooledDirectByteBuf.
public class UnpooledDirectByteBuf extends AbstractReferenceCountedByteBuf {
// Netty ByteBuf underlying JDK ByteBuffer dependencies
ByteBuffer buffer.
// The initial capacity of the ByteBuf, which is the real memory footprint.
private int capacity; // The initial capacity of the ByteBuf, which is the real memory footprint.
public UnpooledDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
// Set the maximum expandable capacity
super(maxCapacity); // Set the maximum expandable capacity.
= alloc; // Set the maximum expandable capacity as specified by initialCapacity.
// Create a JDK ByteBuffer with the initial capacity specified by initialCapacity.
setByteBuffer(allocateDirect(initialCapacity), false);
}
void setByteBuffer(ByteBuffer buffer, boolean tryFree) {
// UnpooledDirectByteBuf depends on a JDK ByteBuffer underneath.
// All subsequent operations on the UnpooledDirectByteBuf will be proxied by Netty to the JDK ByteBuffer.
= ByteBuffer; // Initialize the specified ByteBuffer.
// Initial capacity of the specified ByteBuf initialCapacity
capacity = (); // initial specified ByteBuf capacity.
}
}
As a result, the ByteBuf in Netty will be divided into four parts by the four pointers readerIndex, writerIndex, capacity and maxCapacity, which are distinguished by different colors in the figure above.
-
included among these
[0 , capacity)
This part is the initial capacity allocated when the ByteBuf is created, and this is the part that really takes up the memory, while the[capacity , maxCapacity)
This part represents the expandable capacity of the ByteBuf, which has not yet been allocated memory. -
[0 , readerIndex)
This part of the byte is the byte that has already been read and is the range that can be discarded. -
[readerIndex , writerIndex)
This part of the byte indicates the bytes in the ByteBuf that can be read. -
[writerIndex , capacity)
This part indicates the remaining capacity of the ByteBuf, that is, the range of bytes that can be written.
The relationship between these four pointers is :0 <= readerIndex <= writerIndex <= capacity <= maxCapacity
。
private static void checkIndexBounds(final int readerIndex, final int writerIndex, final int capacity) {
if (readerIndex < 0 || readerIndex > writerIndex || writerIndex > capacity) {
throw new IndexOutOfBoundsException((
"readerIndex: %d, writerIndex: %d (expected: 0 <= readerIndex <= writerIndex <= capacity(%d))",
readerIndex, writerIndex, capacity));
}
}
When we do a read operation on a ByteBuf, we need to pass it through theisReadable
Determines whether the ByteBuf is readable. as well as byreadableBytes
Determines how many bytes of ByteBuf are left to read. When readerIndex equals writerIndex, the ByteBuf is unreadable.[0 , readerIndex)
This part of the byte can then be discarded.
@Override
public boolean isReadable() {
return writerIndex > readerIndex;
}
@Override
public int readableBytes() {
return writerIndex - readerIndex;
}
When we write to the ByteBuf, we need to write to it via theisWritable
Determines whether the ByteBuf is writable. and bywritableBytes
Determines how many bytes can be written to the ByteBuf. When writerIndex equals capacity, the ByteBuf is unwritable.
@Override
public boolean isWritable() {
return capacity() > writerIndex;
}
@Override
public int writableBytes() {
return capacity() - writerIndex;
}
When the capacity of ByteBuf is full and becomes unwritable, if you continue to write to ByteBuf, then you need to expand the capacity, but the capacity after expansion cannot exceed maxCapacity.
final void ensureWritable0(int minWritableBytes) {
// minWritableBytes Indicates the number of bytes to be written this time
// Get current writerIndex location
final int writerIndex = writerIndex();
// To satisfy this write operation,intended ByteBuf dimension of capacity
final int targetCapacity = writerIndex + minWritableBytes;
// in the event that targetCapacity exist(capacity , maxCapacity] among,then expand the capacity
if (targetCapacity >= 0 & targetCapacity <= capacity()) {
// targetCapacity exist [0 , capacity] among,Then no expansion is required,It would have satisfied
return;
}
// expanded capacity Maximum must not exceed maxCapacity
if (checkBounds && (targetCapacity < 0 || targetCapacity > maxCapacity)) {
throw new IndexOutOfBoundsException((
"writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
writerIndex, minWritableBytes, maxCapacity, this));
}
..... expansion ByteBuf ......
}
2.2 ByteBuf Read Operation
After understanding the basic structure of a ByteBuf, let's take a look at how basic operations such as reading and writing to a ByteBuf work.Netty supports reading and writing to ByteBufs at the granularity of a number of basic types, in addition to support forUnsigned
Conversion of basic types and size of the end of the conversion. The following is an example of how to read ByteBuf with two basic types, Byte and Int.
The get method in ByteBuf simply reads the data from ByteBuf without changing the location of its readerIndex.getByte
Reads a byte from the index at the specified location in the ByteBuf, which can also be accessed via thegetUnsignedByte
Reads a Byte from the ByteBuf and converts it into aUnsignedByte
。
public abstract class AbstractByteBuf extends ByteBuf {
@Override
public byte getByte(int index) {
// probe index boundaries,index cannot exceed capacity(index < capacity)
checkIndex(index);
return _getByte(index);
}
@Override
public short getUnsignedByte(int index) {
// set the fetched Byte convert to UnsignedByte
return (short) (getByte(index) & 0xFF);
}
protected abstract byte _getByte(int index);
}
Its underlying dependency is on an abstract method_getByte
The implementation of AbstractByteBuf is handled by specific subclasses of AbstractByteBuf. For example, the implementation of the UnpooledDirectByteBuf class directly proxies the _getByte operation to its underlying JDK DirectByteBuffer dependency.
public class UnpooledDirectByteBuf {
// underlying dependency JDK (used form a nominal expression) DirectByteBuffer
ByteBuffer buffer;
@Override
protected byte _getByte(int index) {
return (index);
}
}
And in the implementation of the UnpooledUnsafeDirectByteBuf class, the Reads directly from the corresponding memory address.
public class UnpooledUnsafeDirectByteBuf {
// direct operation OS memory address
long memoryAddress;
@Override
protected byte _getByte(int index) {
// underlying dependency PlatformDependent0,Read directly from memory address byte
return (addr(index));
}
final long addr(int index) {
// Get Offset index 对应memory address
return memoryAddress + index;
}
}
final class PlatformDependent0 {
//
static final Unsafe UNSAFE;
static byte getByte(long address) {
return (address);
}
}
Netty also provides the ability to batch read Bytes, for example, by using thegetBytes
method reads the data in the ByteBuf into a byte array.byte[]
in the ByteBuf, and can also be read into another ByteBuf.
@Override
public ByteBuf getBytes(int index, byte[] dst) {
getBytes(index, dst, 0, );
return this;
}
public abstract ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length);
@Override
public ByteBuf getBytes(int index, ByteBuf dst, int length) {
getBytes(index, dst, (), length);
// align dst (used form a nominal expression) writerIndex
(() + length);
return this;
}
// 注意这里(used form a nominal expression) getBytes method neither changes the original ByteBuf (used form a nominal expression) readerIndex cap (a poem) writerIndex
// 也不会改变目(used form a nominal expression) ByteBuf (used form a nominal expression) readerIndex cap (a poem) writerIndex
public abstract ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length);
After reading data from the original ByteBuf to the destination ByteBuf via the getBytes method, the readerIndex of the original ByteBuf will not change, but the writerIndex of the destination ByteBuf will be readjusted.
The implementation of the UnpooledDirectByteBuf class naturally proxies the getBytes operation directly to its underlying JDK DirectByteBuffer dependency, while the implementation of the UnpooledUnsafeDirectByteBuf class does so via the Copy directly based on memory address.
The read method of ByteBuf, on the other hand, not only reads data from ByteBuf, but also changes the position of its readerIndex. For example, thereadByte
The method will first pass the previously described_getByte
Reads a byte from ByteBuf and moves readerIndex back one bit.
@Override
public byte readByte() {
checkReadableBytes0(1);
int i = readerIndex;
byte b = _getByte(i);
readerIndex = i + 1;
return b;
}
Netty also provides a method readBytes to read data from ByteBufs in bulk, so you can read data from one ByteBuf into another ByteBuf using the readBytes method. But here, Netty will change the readerIndex of the original ByteBuf and the writerIndex of the destination ByteBuf.
@Override
public ByteBuf readBytes(ByteBuf dst, int length) {
readBytes(dst, (), length);
// change dst (used form a nominal expression) writerIndex
(() + length);
return this;
}
In addition, we can specify dstIndex explicitly, so that we can start copying data from the original ByteBuf at a certain location in the destination ByteBuf, but only the readerIndex of the original ByteBuf will be changed, and not the writerIndex of the destination ByteBuf, which is quite understandable, because we have already specified the writerIndex (dstIndex) explicitly when we write to the destination ByteBuf, and naturally, the position of the writerIndex does not need to be changed after writing. This is understandable, because we have already specified the writerIndex (dstIndex) when writing to the destination ByteBuf, so naturally the position of the writerIndex doesn't need to be changed after the write is complete.
@Override
public ByteBuf readBytes(ByteBuf dst, int dstIndex, int length) {
checkReadableBytes(length);
getBytes(readerIndex, dst, dstIndex, length);
// change from the original ByteBuf (used form a nominal expression) readerIndex
readerIndex += length;
return this;
}
In addition, Netty supports reading data from a ByteBuf to different destinations, such as to a JDK ByteBuffer, to a FileChannel, to an OutputStream, and to a GatheringByteChannel.
public abstract ByteBuf readBytes(ByteBuffer dst);
public abstract ByteBuf readBytes(OutputStream out, int length) throws IOException;
public abstract int readBytes(GatheringByteChannel out, int length) throws IOException;
public abstract int readBytes(FileChannel out, long position, int length) throws IOException;
Netty in addition to supporting Byte as the granularity of ByteBuf read and write, but also supports a variety of basic types of ByteBuf read and write, here I take the Int type as an example to illustrate.
We can do this through thereadInt()
Reads an Int-type data from ByteBuf, and then the readerIndex of ByteBuf is shifted back 4 positions.
@Override
public int readInt() {
checkReadableBytes0(4);
int v = _getInt(readerIndex);
readerIndex += 4;
return v;
}
protected abstract int _getInt(int index);
Similarly, the method _getInt, which is actually responsible for reading the data, needs to be implemented by a specific subclass of AbstractByteBuf, but unlike _getByte, _getInt needs to take into account the byte order.Because network protocols use big-endian byte order, Netty's ByteBuf defaults to big-endian byte order.。
The UnpooledDirectByteBuf implementation also proxies the getInt operation directly to its underlying JDK DirectByteBuffer dependency.
public class UnpooledDirectByteBuf {
@Override
protected int _getInt(int index) {
// The proxy gives its underlying dependencies to the JDK DirectByteBuffer
return (index);
}
}
In the UnpooledUnsafeDirectByteBuf implementation, since it is implemented via the Netty's ByteBuf defaults to big-endian byte order, so you can just put the low byte into the high byte of the Int data.
public class UnpooledUnsafeDirectByteBuf {
@Override
protected int _getInt(int index) {
return (addr(index));
}
}
final class UnsafeByteBufUtil {
static int getInt(long address) {
return (address) << 24 |
((address + 1) & 0xff) << 16 |
((address + 2) & 0xff) << 8 |
(address + 3) & 0xff;
}
}
Netty also supports reading Int data from ByteBuf in little-endian byte order, which involves byte order conversion.
@Override
public int readIntLE() {
checkReadableBytes0(4);
int v = _getIntLE(readerIndex);
readerIndex += 4;
return v;
}
protected abstract int _getIntLE(int index);
In the UnpooledDirectByteBuf implementation, an Int is first read in big-endian order by the JDK DirectByteBuffer on which it depends, and then read in big-endian order by the Switch to small end-order return.
public class UnpooledDirectByteBuf {
@Override
protected int _getIntLE(int index) {
// Toggle byte order,From the big end to the small end
return ((index));
}
}
In the UnpooledUnsafeDirectByteBuf implementation, it is sufficient to place the bytes from the low address directly onto the low bits of the Int data.
public class UnpooledUnsafeDirectByteBuf {
@Override
protected int _getIntLE(int index) {
return (addr(index));
}
}
final class UnsafeByteBufUtil {
static int getIntLE(long address) {
return (address) & 0xff |
((address + 1) & 0xff) << 8 |
((address + 2) & 0xff) << 16 |
(address + 3) << 24;
}
}
Netty also supports reading basic types from ByteBuf.Unsigned Type
。
@Override
public long readUnsignedInt() {
return readInt() & 0xFFFFFFFFL;
}
@Override
public long readUnsignedIntLE() {
return readIntLE() & 0xFFFFFFFFL;
}
Other basic types of read operations related to the implementation of the logic are much the same, I will not enumerate.
2.3 discardReadBytes
As the readBytes method continues to be called, the readerIndex in the ByteBuf continues to move backward, and Netty's design of the readerIndex has two layers of semantics:
-
The semantics of the first layer is obvious, it is used to indicate the current reading position of ByteBuf, when we call the readBytes method, it is from the readerIndex to start reading the data, when the readerIndex is equal to the writerIndex, the ByteBuf is not readable.
-
The second level of semantics is more subtle, it is used to indicate the current ByteBuf can be discarded bytes, because readerIndex is used to indicate the current read position, then located in the readerIndex before the bytes must have already been read, has been read bytes to continue to reside in the ByteBuf is not necessary, it is better to free up space, and can also write more data. It's better to free up the space and write more data.
So the true remaining writable capacity of a ByteBuf can be calculated in a number of ways other than the one described in the previous subsectionwritableBytes()
method should be added to the number of bytes returned by the readerIndex.
@Override
public int writableBytes() {
return capacity() - writerIndex;
}
To give a more concrete example, when we are about to write n bytes to a ByteBuf, if thewritableBytes()
is less than n, then the remaining capacity of the current ByteBuf is not enough for the number of bytes to be written.
stillreaderIndex + writableBytes()
is greater than or equal to n, it means that if we discard the number of bytes already read in the ByteBuf, then the write request can be satisfied.
In this case, we can then use thediscardReadBytes()
method discards the bytes before readerIndex so that the writable bytes are sufficient for this write, but what if they are discarded?
Let's start withreaderIndex < writerIndex
case, which indicates that there are unread bytes in the ByteBuf.
ByteBuf The current range of bytes that can be read is: [readerIndex, writerIndex)
and the bytes before readerIndex can be discarded, so we need to set the [readerIndex, writerIndex)
All the bytes in this range are copied to the top of the ByteBuf, overwriting the bytes before readerIndex.
Then adjust the position of readerIndex and writerIndex, because the bytes before readerIndex are now all overwritten by the readable bytes, so readerIndex is reset to 0, and writerIndex is shifted forward by the size of readerIndex. In this way, the writable capacity of the current ByteBuf is increased by the size of readerIndex.
Another scenario isreaderIndex = writerIndex
case, which means that there are no more readable bytes in the ByteBuf.
Since there are no more readable bytes in the ByteBuf, you don't need to copy the readable bytes to the beginning of the ByteBuf, just reset the readerIndex and writerIndex to zero.
public abstract class AbstractByteBuf extends ByteBuf {
public abstract class AbstractByteBuf extends ByteBuf { @Override
public ByteBuf discardReadBytes() {
// A readerIndex of 0 means there are no bytes to discard.
if (readerIndex == 0) {
if (readerIndex == 0) { return this; }
}
if (readerIndex ! = writerIndex) {
// move the byte range [readerIndex, writerIndex) to the beginning of the ByteBuf
// i.e. discard the bytes before readerIndex
// Both writerIndex and readerIndex are moved forward by the readerIndex size
writerIndex -= readerIndex; // Resize markedReaderIndex.
// reposition markedReaderIndex and markedWriterIndex
// Both move the readerIndex forward by the same size.
adjustMarkers(readerIndex); // readersIndex = 0; readerIndex = 0; readerIndex -= readerIndex
readerIndex = 0.
} else {
// readerIndex = writerIndex means the current ByteBuf is no longer readable.
// Discard all bytes before readerIndex, restoring the ByteBuf to its original state.
// The entire capacity of the ByteBuf can be written to.
ensureAccessible();
adjustMarkers(readerIndex); writerIndex = readerIndex; // the entire ByteBuf can be written to.
writerIndex = readerIndex = 0; }
}
return this; }
}
}
If there are bytes in the ByteBuf that can be discarded (readerIndex > 0), then as soon as we call thediscardReadBytes()
The bytes before readerIndex are discarded unconditionally.
Netty additionally providesdiscardSomeReadBytes()
method performs a conditional discard of bytes with the following two discard conditions:
-
When ByteBuf is no longer readable, the read bytes are discarded unconditionally.
-
Read bytes are discarded when the number of bytes read exceeds half the capacity of the entire ByteBuf. Otherwise the gain is not high if it is discarded unconditionally.
@Override
public ByteBuf discardSomeReadBytes() {
if (readerIndex > 0) {
// (coll.) fail (a student) ByteBuf It's unreadable.,then unconditionally discard the read byte
if (readerIndex == writerIndex) {
adjustMarkers(readerIndex);
writerIndex = readerIndex = 0;
return this;
}
// (coll.) fail (a student)已读的字节数超过整个 ByteBuf Read bytes are discarded only when half the capacity of the
if (readerIndex >= capacity() >>> 1) {
setBytes(0, this, readerIndex, writerIndex - readerIndex);
writerIndex -= readerIndex;
adjustMarkers(readerIndex);
readerIndex = 0;
return this;
}
}
return this;
}
This method of discarding bytes designed by Netty is very useful in decoding scenarios. Since TCP is a stream-oriented network protocol, it will only send a stream of bytes based on the size of the sliding window, so the data we receive at the application layer may be a half-packet or a sticky-packet, and it won't be a full packet anyway.
This requires that when decoding, we first determine whether the data in the ByteBuf constitutes a completed packet, and if it does, only then do we read the bytes in the ByteBuf and decode them, followed by the readerIndex moving backward.
If there is not enough for one packet, then the ByteBuf needs to be accumulated and cached until a full packet arrives. In an extreme case, even though we have decoded the packet many times, there are still half packets in the cached ByteBuf, which is getting bigger and bigger due to the constant stream of sticky packets coming in. Since it has been decoded many times, the bytes in the ByteBuf that can be discarded take up a lot of memory space, and if the half-packet situation persists, it will lead to OutOfMemory.
So Netty specifies that if there are still half-packets in the ByteBuf after it has been decoded 16 times, then it will call thisdiscardSomeReadBytes()
Discard all bytes that have been decoded to save unnecessary memory overhead.
2.4 ByteBuf Write Operations
ByteBuf write operations and read operations are the opposite of each other, each read method getBytes, readBytes, readInt, etc. has a corresponding setBytes, writeBytes, writeInt and other basic types of write operations.
Like the get method, the set-related methods simply write data to the ByteBuf without changing the position of its writerIndex.setByte
Writes data value to a specified location index in the ByteBuf.
@Override
public ByteBuf setByte(int index, int value) {
checkIndex(index);
_setByte(index, value);
return this;
}
protected abstract void _setByte(int index, int value);
Performing concrete write operations is also an abstract method whose implementation is handled by the concrete subclasses of AbstractByteBuf. For UnpooledDirectByteBuf implementations, the _setByte operation is directly proxied to the underlying JDK DirectByteBuffer dependency.
public class UnpooledDirectByteBuf {
// underlying dependency JDK (used form a nominal expression) DirectByteBuffer
ByteBuffer buffer;
@Override
protected void _setByte(int index, int value) {
(index, (byte) value);
}
}
For the UnpooledUnsafeDirectByteBuf implementation, this is done directly via the Writes a Byte to the corresponding memory address (memoryAddress + index).
public class UnpooledUnsafeDirectByteBuf {
// direct operation OS memory address,non-dependence JDK (used form a nominal expression) buffer
long memoryAddress;
@Override
protected void _setByte(int index, int value) {
// underlying dependency PlatformDependent0,Write directly to a memory address byte
(addr(index), value);
}
final long addr(int index) {
// Get Offset index 对应memory address
return memoryAddress + index;
}
}
final class PlatformDependent0 {
//
static final Unsafe UNSAFE;
static void putByte(long address, byte value) {
(address, value);
}
}
Netty also provides the ability to batch write Bytes to a ByteBuf. The setBytes method is used to batch write an array of bytes, byte[], to a specified index location in the ByteBuf.
@Override
public ByteBuf setBytes(int index, byte[] src) {
setBytes(index, src, 0, );
return this;
}
public abstract ByteBuf setBytes(int index, byte[] src, int srcIndex, int length);
For the UnpooledDirectByteBuf implementation, it also proxies the setBytes operation directly to the JDK DirectByteBuffer, writing bytes from the byte array byte[] directly to the DirectByteBuffer.
For the UnpooledUnsafeDirectByteBuf implementation, the memory addresses of the byte array and ByteBuf are manipulated directly, via the Copies the data in the corresponding memory address of the byte array to the corresponding memory address of ByteBuf.
We can also write bytes of data from other ByteBufs to a ByteBuf using the setBytes method.
@Override
public ByteBuf setBytes(int index, ByteBuf src, int length) {
setBytes(index, src, (), length);
// align src (used form a nominal expression) readerIndex
(() + length);
return this;
}
// 注意这里(used form a nominal expression) setBytes method neither changes the original ByteBuf (used form a nominal expression) readerIndex cap (a poem) writerIndex
// 也不会改变目(used form a nominal expression) ByteBuf (used form a nominal expression) readerIndex cap (a poem) writerIndex
public abstract ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length);
Note that the writerIndex of the ByteBuf being written to does not change, but the readerIndex of the original ByteBuf is readjusted.。
The write method in ByteBuf has an underlying dependency on the related set method, except that the write method changes the position of the writerIndex in ByteBuf. For example, if we pass thewriteByte
method writes a byte to the ByteBuf, the writerIndex moves back one bit.
@Override
public ByteBuf writeByte(int value) {
ensureWritable0(1);
_setByte(writerIndex++, value);
return this;
}
We can also write data to a ByteBuf in bulk via writeBytes, which writes data from a byte array or another ByteBuf to a ByteBuf, but here, Netty will change the writerIndex of the ByteBuf being written to and the readerIndex of the ByteBuf from which the data originated.
@Override
public ByteBuf writeBytes(ByteBuf src, int length) {
writeBytes(src, (), length);
// Adjustment of data sources ByteBuf (used form a nominal expression) readerIndex
(() + length);
return this;
}
If you explicitly specify where in the data source ByteBuf to start reading (srcIndex), the readerIndex in the data source ByteBuf will not be changed, only the writerIndex of the ByteBuf to which the data is written.
@Override
public ByteBuf writeBytes(ByteBuf src, int srcIndex, int length) {
ensureWritable(length);
setBytes(writerIndex, src, srcIndex, length);
// Adjustments are written ByteBuf (used form a nominal expression) writerIndex
writerIndex += length;
return this;
}
In addition, Netty supports batch writing to ByteBuf from different data sources, such as from JDK ByteBuffer, from FileChannel, from InputStream, and from ScatteringByteChannel.
public ByteBuf writeBytes(ByteBuffer src)
public int writeBytes(InputStream in, int length)
public int writeBytes(ScatteringByteChannel in, int length) throws IOException
public int writeBytes(FileChannel in, long position, int length) throws IOException
Netty supports writing data to ByteBuf with the granularity of Byte, but also supports writing to ByteBuf with the granularity of many basic types, here I take the Int type as an example.
We can write an Int to the ByteBuf with writeInt(), and then the writerIndex of the ByteBuf is moved back 4 positions.
@Override
public ByteBuf writeInt(int value) {
ensureWritable0(4);
_setInt(writerIndex, value);
writerIndex += 4;
return this;
}
protected abstract void _setInt(int index, int value);
Unlike writing Byte data, here we need to consider the byte order, Netty ByteBuf defaults to big-endian byte order, which is consistent with the byte order used by network protocols for transmission. Here we need to put the high bits of the value to be written into the low address of the ByteBuf.
public class UnpooledUnsafeDirectByteBuf {
@Override
protected void _setInt(int index, int value) {
// Write in big-endian byte order ByteBuf
(addr(index), value);
}
}
final class UnsafeByteBufUtil {
static void setInt(long address, int value) {
(address, (byte) (value >>> 24));
(address + 1, (byte) (value >>> 16));
(address + 2, (byte) (value >>> 8));
(address + 3, (byte) value);
}
}
Netty also supports writing data to the ByteBuf in little-endian byte order.
@Override
public ByteBuf writeIntLE(int value) {
ensureWritable0(4);
_setIntLE(writerIndex, value);
writerIndex += 4;
return this;
}
protected abstract void _setIntLE(int index, int value);
Here, you need to put the low bits of the value of the data to be written into the low address of ByteBuf.
public class UnpooledUnsafeDirectByteBuf {
@Override
protected void _setIntLE(int index, int value) {
// // Write in little endian byte order ByteBuf
(addr(index), value);
}
}
final class UnsafeByteBufUtil {
static void setIntLE(long address, int value) {
(address, (byte) value);
(address + 1, (byte) (value >>> 8));
(address + 2, (byte) (value >>> 16));
(address + 3, (byte) (value >>> 24));
}
}
2.5 ByteBuf expansion mechanism
Each time it writes data to the ByteBuf, Netty calls theensureWritable0
method to determine whether the remaining writable capacity (capacity - writerIndex) of the current ByteBuf can meet the size of the data to be written this time, minWritableBytes; if the remaining capacity is insufficient, then the ByteBuf needs to be enlarged, but the enlarged capacity must not exceed the size of maxCapacity.
final void ensureWritable0(int minWritableBytes) {
final int writerIndex = writerIndex(); // The expected size of the ByteBuf to satisfy this write operation.
// The expected ByteBuf capacity for this write operation.
final int targetCapacity = writerIndex + minWritableBytes; // remaining capacity to fulfill this write operation.
// If the remaining capacity is sufficient for the current write operation, return it without expanding it.
if (targetCapacity >= 0 & targetCapacity <= capacity()) {
return;
}
// Expanded capacity must not exceed maxCapacity.
if (checkBounds && (targetCapacity < 0 || targetCapacity > maxCapacity)) {
ensureAccessible(); throw new IndexOutOfBounds
throw new IndexOutOfBoundsException((
"writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
writerIndex, minWritableBytes, maxCapacity, this)));
}
// expand if targetCapacity is between (capacity , maxCapacity])
// fastWritable is the capacity of the current ByteBuf that can be written to directly without memory reallocation or data-copying.
// For UnpooledDirectBuffer fastWritable = capacity - writerIndex
// PooledDirectBuffer has a different implementation, so we don't need to pay attention to it here.
final int fastWritable = maxFastWritableBytes(); // Calculate the new capacity after expansion.
// Calculate the expanded capacity newCapacity
// For an UnpooledDirectBuffer it is straightforward to calculate the expanded capacity by calculateNewCapacity.
int newCapacity = fastWritable >= minWritableBytes ? writerIndex + fastWritable
: alloc().calculateNewCapacity(targetCapacity, maxCapacity); int newCapacity = fastWritable >= minWritableBytes ?
// Expand the ByteBuf according to the new capacity
capacity(newCapacity);
}
2.5.1 Calculation Logic for newCapacity
The initial default capacity of ByteBuf is 256 bytes, and the initial default maxCapacity isInteger.MAX_VALUE
That's 2G in size.
public abstract class AbstractByteBufAllocator implements ByteBufAllocator {
// ByteBuf The initial default of the CAPACITY
static final int DEFAULT_INITIAL_CAPACITY = 256;
// ByteBuf The initial default of the MAX_CAPACITY
static final int DEFAULT_MAX_CAPACITY = Integer.MAX_VALUE;
@Override
public ByteBuf directBuffer() {
return directBuffer(DEFAULT_INITIAL_CAPACITY, DEFAULT_MAX_CAPACITY);
}
}
To satisfy this write operation, the minimum capacity requirement for ByteBuf is minNewCapacity, which is the value in theensureWritable0
computed in the methodtargetCapacity
, calculated as:minNewCapacity = writerIndex + minWritableBytes (the number of bytes to be written this time)
。
In the ByteBuf scaling logic, Netty sets an important thresholdCALCULATE_THRESHOLD
, with a size of 4M, determines the scale of ByteBuf expansion.
// Scale of expansion
static final int CALCULATE_THRESHOLD = 1048576 * 4; // 4 MiB page
If minNewCapacity is exactly equal to CALCULATE_THRESHOLD, then the expanded capacity newCapacity is 4M.
If minNewCapacity is greater than CALCULATE_THRESHOLD, then newCapacity will be scaled up to 4M with the following logic:
first byminNewCapacity / threshold * threshold
Calculates a baseline before it is ready to be scaled up, which is then used as the basis for scaling up at the granularity of CALCULATE_THRESHOLD.
This baseline requirement must be the smallest multiple of CALCULATE_THRESHOLD and must be less than or equal to minNewCapacity.
What does it mean? Assuming minNewCapacity is 5M, then its expansion baseline is 4M, in which case the capacity after expansion will benewCapacity = 4M + CALCULATE_THRESHOLD = 8M
。
If the calculated baseline exceedsmaxCapacity - 4M
, then newCapacity scales directly to maxCapacity.
If minNewCapacity is less than CALCULATE_THRESHOLD, then newCapacity will start at 64 and loop through double , that is, it will be expanded by a multiple of 64. Until newCapacity is greater than or equal to minNewCapacity.
int newCapacity = 64;
while (newCapacity < minNewCapacity) {
newCapacity <<= 1;
}
-
If minNewCapacity is in the
[0 , 64]
within this range, then the expanded newCapacity is 64 -
If minNewCapacity is in the
[65 , 128]
Within this range, then the expanded newCapacity is 128. -
If minNewCapacity is in the
[129 , 256]
Within this range, then the expanded newCapacity is 256.
public abstract class AbstractByteBufAllocator implements ByteBufAllocator {
@Override
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
// Minimum capacity to satisfy this write operation minNewCapacity cannot exceed maxCapacity
if (minNewCapacity > maxCapacity) {
throw new IllegalArgumentException((
"minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
minNewCapacity, maxCapacity));
}
// Scale used to determine capacity expansion
final int threshold = CALCULATE_THRESHOLD; // 4 MiB page
if (minNewCapacity == threshold) {
return threshold;
}
// If over threshold, do not double but just increase by threshold.
if (minNewCapacity > threshold) {
// Calculation of the baseline for capacity expansion。
// Requirements must be CALCULATE_THRESHOLD least multiple of,And it must be less than or equal to minNewCapacity
int newCapacity = minNewCapacity / threshold * threshold;
if (newCapacity > maxCapacity - threshold) {
newCapacity = maxCapacity;
} else {
// on the basis of threshold (4M)expansion
newCapacity += threshold;
}
return newCapacity;
}
// Not over threshold. Double up to 4 MiB, starting from 64.
// on the basis of 64 的倍数进行expansion。but newCapacity Needs to be greater than or equal to minNewCapacity。
int newCapacity = 64;
while (newCapacity < minNewCapacity) {
newCapacity <<= 1;
}
return (newCapacity, maxCapacity);
}
}
2.5.2 ByteBuf Expansion Logic
public class UnpooledDirectByteBuf {
// underlying dependency JDK (used form a nominal expression) DirectByteBuffer
ByteBuffer buffer;
}
For UnpooledDirectByteBuf, its underlying real data storage place actually relies on the DirectByteBuffer in JDK, the logic of expanding capacity is very simple, that is, firstly, according to the newCapacity calculated in the previous subsection, reallocate a new JDK DirectByteBuffer, then copy the data in the original DirectByteBuffer to the new DirectByteBuffer, and finally release the original DirectByteBuffer, and set the new DirectByteBuffer to Unpooled. Then copy the data in the original DirectByteBuffer to the new DirectByteBuffer, and finally release the original DirectByteBuffer and set the new DirectByteBuffer to UnpooledDirectByteBuf.
public class UnpooledDirectByteBuf {
void setByteBuffer(ByteBuffer buffer, boolean tryFree) {
if (tryFree) {
ByteBuffer oldBuffer = ;
// Release the original buffer
freeDirect(oldBuffer);
}
// Reset the new buffer
= buffer;
capacity = ();
}
}
For UnpooledUnsafeDirectByteBuf, since it directly relies on the OS memory address, the operations related to ByteBuf are performed by directly manipulating the memory address, so the expansion logic of UnpooledUnsafeDirectByteBuf needs to set the memory address of the new DirectByteBuffer to the memoryAddress, in addition to executing the above contents. In addition to the above, the expansion logic of UnpooledUnsafeDirectByteBuf also needs to set the memory address of the new DirectByteBuffer to the memoryAddress.
public class UnpooledUnsafeDirectByteBuf extends UnpooledDirectByteBuf {
// ByteBuf memory address
long memoryAddress;
@Override
final void setByteBuffer(ByteBuffer buffer, boolean tryFree) {
(buffer, tryFree);
// set to new buffer memory address
memoryAddress = (buffer);
}
}
Here is the complete logic of the expansion operation:
public class UnpooledDirectByteBuf {
// underlying dependency JDK (used form a nominal expression) DirectByteBuffer
ByteBuffer buffer;
@Override
public ByteBuf capacity(int newCapacity) {
// newCapacity cannot exceed maxCapacity
checkNewCapacity(newCapacity);
int oldCapacity = capacity;
if (newCapacity == oldCapacity) {
return this;
}
// 计算扩容之后需要拷贝(used form a nominal expression)字节数
int bytesToCopy;
if (newCapacity > oldCapacity) {
bytesToCopy = oldCapacity;
} else {
........ shrinkage .......
}
ByteBuffer oldBuffer = buffer;
// according to newCapacity 分配一个新(used form a nominal expression) ByteBuffer(JDK)
ByteBuffer newBuffer = allocateDirect(newCapacity);
(0).limit(bytesToCopy);
(0).limit(bytesToCopy);
// general term "will" be used for the original oldBuffer center(used form a nominal expression)数据拷贝到 newBuffer center
(oldBuffer).clear();
// liberate (a *er) oldBuffer,set up newBuffer
// insofar as UnpooledUnsafeDirectByteBuf In other words, it's a matter of putting newBuffer (used form a nominal expression)地址set up到 memoryAddress center
setByteBuffer(newBuffer, true);
return this;
}
}
2.5.3 Forced Capacity Expansion
The ensureWritable method described earlier checks to see if the size of the data being written, minWritableBytes, exceeds the maximum writable capacity of ByteBuf:maxCapacity - writerIndex
。
public ByteBuf ensureWritable(int minWritableBytes)
If exceeded, theIndexOutOfBoundsException
Exceptionally stopping scaling, Netty provides another scaling method with a force parameter to determine whether to force scaling in this case.
public int ensureWritable(int minWritableBytes, boolean force)
When minWritableBytes has exceeded the maximum writable capacity of ByteBuf:
-
force = false
, then stop the expansion and return directly without throwing an exception. -
force = true
, then force expansion, expanding the ByteBuf to maxCapacity, but stopping expansion if the current capacity has reached maxCapacity.
ensureWritable with a force parameter does not throw an exception, but rather returns a status code to inform the caller about the capacity of the ByteBuf.
-
Return 0 means that the current writable capacity of ByteBuf can meet the demand of this write operation and does not need to be expanded.
-
Returns 1 to indicate that the size of the data written has exceeded the maximum writable capacity of ByteBuf, but the capacity of ByteBuf has reached maxCapacity and cannot be expanded.
-
Returns 3 to indicate that the size of the data written has exceeded the maximum writable capacity of ByteBuf, in which case the capacity is forced to be expanded to maxCapacity.
-
Returns 2 to indicate that the normal expansion logic is executed.
The return values 0 and 2 indicate that the ByteBuf capacity (before or after expansion) can accommodate the size of the data being written, while the return values 1 and 3 indicate that the ByteBuf capacity (before or after expansion) cannot accommodate the size of the data being written.
@Override
public int ensureWritable(int minWritableBytes, boolean force) {
// If the remaining capacity is sufficient for this write operation, then it will not be expanded and will be returned directly
if (minWritableBytes <= writableBytes()) {
if (minWritableBytes <= writableBytes()) { return 0.
}
final int maxCapacity = maxCapacity(); final int writerIndex = writerIndex = writerIndex()
final int writerIndex = writerIndex(); // If the size of the data being written is too large, write the data.
// If the size of the data being written exceeds the ByteBuf's maxWritableBytes capacity maxCapacity - writerIndex
if (minWritableBytes > maxCapacity - writerIndex) {
// force = false, then stop expanding and return straight back
// force = true, expand directly to maxCapacity, stop expanding if capacity is already equal to maxCapacity
if (!force || capacity() == maxCapacity) {
return 1; }
}
// Force capacity to maxCapacity even though it's not enough to write to.
capacity(maxCapacity); return 3; }
return 3; }
}
// Here's the normal scaling logic
int fastWritable = maxFastWritableBytes(); int newCapacity = fastWritableBytes(); int newCapacity = fastWritableBytes()
int newCapacity = fastWritable >= minWritableBytes ? writerIndex + fastWritable
: alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity);
// Adjust to the new capacity.
Adjust to the new capacity. capacity(newCapacity); // Adjust to the new capacity. capacity(newCapacity); }
capacity(newCapacity); // Adjust to the new capacity. capacity(newCapacity); }
}
2.5.4 Adaptive dynamic capacity expansion
When Netty receives network data, it is very difficult to determine how large a ByteBuf should be used to receive it, so Netty will first estimate an initial capacity in the beginning.DEFAULT_INITIAL (2048)
。
public class AdaptiveRecvByteBufAllocator {
static final int DEFAULT_INITIAL = 2048;
}
Netty uses a 2048-sized ByteBuf to read data from the socket. After each read, Netty evaluates whether the size of the ByteBuf is appropriate. If the ByteBuf fills up every time, the estimated capacity is too small, and there is still more data in the socket, then we need to expand the ByteBuf and replace it with a larger ByteBuf the next time we read the socket.
private final class HandleImpl extends MaxMessageHandle {
private final class HandleImpl extends MaxMessageHandle { @Override
public void lastBytesRead(int bytes) {
// bytes is the size of the actual read from the socket.
// attemptedBytesRead is the size of the ByteBuf that can be written to, initially 2048.
if (bytes == attemptedBytesRead()) {
// If the current read of the socket fills up the ByteBuf, // then the ByteBuf will be read.
// then expand the ByteBuf and use a larger ByteBuf for the next read
record(bytes); }
}
// Record the size of the data read from the socket.
(bytes); }
}
}
Netty will keep reading data from the socket in a read loop until the data has been read or until the data has been read 16 times, then the read loop will end and the reading will stop.The larger the ByteBuf is, the fewer times Netty will read, and the smaller the ByteBuf is, the more times Netty will read, so a mechanism is needed to keep the size of the ByteBuf within a reasonable range. The larger the ByteBuf is, the fewer times Netty reads it, and the smaller the ByteBuf is, the more times Netty reads it.
Netty counts how much data is read in each read loop -- totalBytesRead.
public abstract class MaxMessageHandle implements ExtendedHandle {
// Counts the total amount of data received on the client connection in a read loop.
private int totalBytesRead; }
}
At the end of each read loop, Netty will use the totalBytesRead to determine if the ByteBuf should be expanded or shrunk, so that at the beginning of the next read loop, Netty can use a relatively reasonable capacity to receive the data in the socket, minimizing the number of times it reads the socket as much as possible.
private final class HandleImpl extends MaxMessageHandle {
@Override
public void readComplete() {
// whether or not to ByteBuf expand or reduce capacity
record(totalBytesRead());
}
}
Under what circumstances do I need to expand ByteBuf and how much do I need to expand each time? When should ByteBuf be shrunk and how much should it be shrunk each time? ?
This uses an important capacity index structure -- SIZE_TABLE, which defines the index of each capacity size of ByteBuf. It is equivalent to the capacity index table of expansion and contraction. How much each expansion and contraction are recorded in this capacity index table.
public class AdaptiveRecvByteBufAllocator {
// Expansion step
private static final int INDEX_INCREMENT = 4; // Expansion step; private static final int INDEX_INCREMENT = 4; // Expansion step
// private static final int INDEX_INCREMENT = 4
private static final int INDEX_DECREMENT = 1; // Expansion step; // Decrease step; // Decrease step; // Decrease step.
// The ByteBuf allocation capacity table (index table for expansion and contraction) is expanded and contracted according to the capacity of the records in the table.
private static final int[] SIZE_TABLE; // ByteBuf allocation table (scale-up/down index table) scales according to the size of the records in the table.
}
When the index capacityLess than 512
whenSIZE_TABLE
The capacity defined in the16
initial recognition16
Incremental.
When the index capacityGreater than 512
When the capacity defined in SIZE_TABLE is the same as the capacity of the previous index, the capacity defined in SIZE_TABLE is the same as the capacity of the previous index.2 times
Incremental.
Then the initial capacity of ByteBuf is 2048 and its index in SIZE_TABLE is 33. After a read loop is completed, if totalBytesRead is found to be in theSIZE_TABLE[index - INDEX_DECREMENT]
together withSIZE_TABLE[index]
If the total number of bytes read after the end of the read loop is between [1024 , 2048]. This means that the allocated ByteBuf capacity is just right, and there is no need to shrink or expand it. For example, totalBytesRead = 2000, which is between 1024 and 2048. It means the capacity of 2048 is just right.
If totalBytesRead is less than or equal toSIZE_TABLE[index - INDEX_DECREMENT]
If the total number of bytes read after the end of the current read loop is less than or equal to 1024, it means that the number of bytes read this time is smaller than the capacity of the next level of the current ByteBuf capacity, which means that the capacity of the current ByteBuf is allocated to be a bit too large, so set the shrinking flag.decreaseNow = true
The next time the read loop continues to satisfy the shrink condition, then the shrinking process begins. If the shrink condition continues to be met when the next read loop is performed, then the shrinking process begins. The reduced capacity is SIZE_TABLE[index - INDEX_DECREMENT], but cannot be less than SIZE_TABLE[minIndex] (16).
Note that here the condition of two drawdowns needs to be satisfied before drawdown is performed, and the drawdown step is 1 (INDEX_DECREMENT), which makes drawdown more prudent.
If totalBytesRead is greater than or equal to the current ByteBuf capacity - nextReceiveBufferSize, it means that the capacity of ByteBuf is a bit small and needs to be expanded. The capacity after expansion isSIZE_TABLE[index + INDEX_INCREMENT]
, but cannot exceed SIZE_TABLE[maxIndex] (65535).
The expansion is done once the expansion condition is met, and the expansion step is 4 (INDEX_INCREMENT), so the expansion is more spontaneous.
private void record(int actualReadBytes) {
if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT)]) {
// Downsizing is performed after the downsizing condition is triggered twice
if (decreaseNow) {
index = max(index - INDEX_DECREMENT, minIndex);
nextReceiveBufferSize = SIZE_TABLE[index];
decreaseNow = false;
} else {
decreaseNow = true;
}
} else if (actualReadBytes >= nextReceiveBufferSize) {
// Expansion is performed after the expansion condition is satisfied once
index = min(index + INDEX_INCREMENT, maxIndex);
nextReceiveBufferSize = SIZE_TABLE[index];
decreaseNow = false;
}
}
2.6 ByteBuf Reference Counting Design
Netty introduced the reference counting mechanism for ByteBuf, in the whole design system of ByteBuf, all ByteBuf will inherit an abstract class AbstractReferenceCountedByteBuf, which is the implementation of the interface ReferenceCounted.
public interface ReferenceCounted {
int refCnt();
ReferenceCounted retain();
ReferenceCounted retain(int increment);
boolean release();
boolean release(int decrement);
}
Each ByteBuf maintains an internal reference count called refCnt.refCnt()
method to get the current reference count refCnt of ByteBuf. When ByteBuf is referenced in other contexts, we need to pass theretain()
method to add 1 to the reference count of ByteBuf. Alternatively, we can add 1 to the reference count of ByteBuf via theretain(int increment)
method to specify the size of the refCnt increase.
If there is a reference to a ByteBuf, then there is a release of the ByteBuf, and whenever we are done using the ByteBuf, we need to manually call therelease()
method decrements the reference count of the ByteBuf by one. When the reference count, refCnt, becomes 0, Netty passes thedeallocate
method to free the memory resource referenced by ByteBuf. At this point therelease()
method returns true , or false if refCnt is not already 0. Similarly, we can pass therelease(int decrement)
method to specify how much the refCnt is reduced (decrease).
2.6.1 Why Introduce Reference Counting
"What does it mean to refer to an ByteBuf in another context? Let's say we create an ByteBuf in thread 1, and then drop it to thread 2 for processing, which in turn may drop it to thread 3, each of which has its own contextual processing logic, such as handling the ByteBuf, releasing it, etc. This makes the ByteBuf de facto shared across multiple thread contexts. Each thread has its own contextual processing logic, such as ByteBuf handling, releasing, etc. This makes ByteBuf in fact a shared situation in multiple thread contexts.
In this case, it's hard to tell in the context of a single thread whether an ByteBuf should be freed or not. For example, thread 1 may be ready to free the ByteBuf, but it may be in use by another thread. That's why it's important for Netty to introduce reference counting for ByteBufs, so that every time a ByteBuf is referenced you need to pass it through theretain()
method adds 1 to the reference count.release()
When you release the ByteBuf, subtract the reference count by 1. When the reference count reaches 0, there is no other context referencing the ByteBuf, so Netty can release it.
In addition, compared to JDK DirectByteBuffer, which relies on the GC mechanism to free the Native Memory referenced behind it, Netty prefers to manually free DirectByteBuf in a timely manner. Because the JDK DirectByteBuffer needs to wait until GC occurs, it is difficult to trigger GC because the JVM heap memory occupied by the DirectByteBuffer's object instance is too small, which leads to a delay in releasing the referenced Native Memory, and in serious cases, it will accumulate more and more, resulting in OOM. This will cause a delay in releasing the referenced Native Memory, and in serious cases, more and more will accumulate, resulting in OOM. This will also cause a very large delay in the process of requesting DirectByteBuffer.
Netty avoids this by manually freeing Native Memory after each use, but without relying on the JVM, there will always be memory leaks, such as forgetting to call therelease()
method to release it.
So in order to detect memory leaks, this is another reason why Netty introduced reference counting for ByteBuf. When ByteBuf is no longer referenced, i.e., there are no strong references or soft references, and if a GC occurs at this time, then the ByteBuf instance (located in the JVM heap) needs to be reclaimed, and Netty will check if the reference count of Netty will check if the reference count of the ByteBuf is 0. If it is not 0, then we forgot to call therelease()
The ByteBuf has been freed, and a memory leak has been detected.
After detecting that a memory leak has occurred, Netty then passes thereportLeak()
The information about the memory leak is summarized in the form of aerror
of the log level is output to the log.
Here, you may wonder, is not just the introduction of a small reference counting, this is not difficult? Is it worth mentioning here? Isn't it just initializing refCnt to 1 when creating the ByteBuf, adding 1 to refCnt every time it is referenced in another context, and subtracting 1 from refCnt every time it is released? When it reaches 0, the Native Memory will be released, that's too simple, right?
In fact, Netty's design of reference counting is very careful, not so simple, even a little complex, behind which lies a great deal of performance considerations and a thorough consideration of complex concurrency issues, and the repeated trade-offs between performance and thread-safety issues.
2.6.2 Initial design of reference counting
So in order to make sense of the whole design lineage regarding reference counting, we need to roll back to the original starting point -- version 4.1. and take a look at the original design.
public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf {
// Atomic Updates refCnt (used form a nominal expression) Updater
private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater =
(, "refCnt");
// reference count,initialize to 1
private volatile int refCnt;
protected AbstractReferenceCountedByteBuf(int maxCapacity) {
super(maxCapacity);
// reference countinitialize to 1
(this, 1);
}
// reference count增加 increment
private ByteBuf retain0(int increment) {
for (;;) {
int refCnt = ;
// at a time retain (used form a nominal expression)时候对reference count加 1
final int nextCnt = refCnt + increment;
// Ensure we not resurrect (which means the refCnt was 0) and also that we encountered an overflow.
if (nextCnt <= increment) {
// in the event that refCnt have already provided 0 Or an overflow.,failing which an exception is thrown
throw new IllegalReferenceCountException(refCnt, increment);
}
// CAS update refCnt
if ((this, refCnt, nextCnt)) {
break;
}
}
return this;
}
// reference count减少 decrement
private boolean release0(int decrement) {
for (;;) {
int refCnt = ;
if (refCnt < decrement) {
// 引用(used form a nominal expression)次数必须和释放(used form a nominal expression)次数相等对应
throw new IllegalReferenceCountException(refCnt, -decrement);
}
// at a time release reference count减 1
// CAS update refCnt
if ((this, refCnt, refCnt - decrement)) {
if (refCnt == decrement) {
// in the event thatreference count为 0 ,then release Native Memory,and returns true
deallocate();
return true;
}
// reference count不为 0 ,come (or go) back false
return false;
}
}
}
}
In the design of versions prior to 4.1., it was really as simple as we thought it would be, initializing refCnt to 1 when the ByteBuf was created. Each time a reference was made to retain, the reference count was increased by 1, and each time a release was made, the reference count was decreased by 1, which was replaced by CAS in a for loop. When the reference count is 0, it is replaced bydeallocate()
Release Native Memory.
2.6.3 Introducing Instruction-Level Optimizations
4.1. The design is clean and clear, and we see no problems with it at all, but Netty's performance concerns don't stop there. Because the XADD instruction is more powerful than the CMPXCHG instruction on the x86 architecture, the compareAndSet method is implemented underneath by the CMPXCHG instruction, and the getAndAdd method is underneath by the XADD instruction.
So in the quest for the ultimate in performance, Netty replaced the compareAndSet method with getAndAdd in version 4.1.
public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf {
private volatile int refCnt;
protected AbstractReferenceCountedByteBuf(int maxCapacity) {
super(maxCapacity);
// The reference count is still initially 1
(this, 1);
}
private ByteBuf retain0(final int increment) {
// compared with compareAndSet implementation,Here will for Remove the loop.
// And each time, it's the first thing to do with the refCnt count up increment
int oldRef = (this, increment);
// convex curve refCnt The counting is done to determine the anomaly.
if (oldRef <= 0 || oldRef + increment < oldRef) {
// Ensure we don't resurrect (which means the refCnt was 0) and also that we encountered an overflow.
// If the original refCnt have already provided 0 or refCnt spillage,classifier for written items (such as an official statement) refCnt perform a fallback,and throw an exception
(this, -increment);
throw new IllegalReferenceCountException(oldRef, increment);
}
return this;
}
private boolean release0(int decrement) {
// firstly refCnt lower count decrement
int oldRef = (this, -decrement);
// in the event that refCnt have already provided 0 follow Native Memory release
if (oldRef == decrement) {
deallocate();
return true;
} else if (oldRef < decrement || oldRef - decrement > oldRef) {
// in the event that释放ordinal number大于 retain ordinal number or refCnt underflow
// classifier for written items (such as an official statement) refCnt perform a fallback,and throw an exception
(this, decrement);
throw new IllegalReferenceCountException(oldRef, decrement);
}
return false;
}
}
In the 4.1. implementation, Netty updates the refCnt via CAS after checking for retain and release exceptions in a for loop, otherwise it throws an IllegalReferenceCountException, which is a pessimistic strategy for updating reference counts.
In the 4.1. implementation, Netty removes the for loop, which is the opposite of compareAndSet, and instead updates the refCnt via getAndAdd, then determines the relevant exceptions after updating it, and if an exception is found, it backs out and throws an IllegalReferenceCountException, which is an optimistic strategy for updating reference counts.
For example, when retain increases the reference count, first increment the refCnt, then determine whether the original reference count oldRef is already 0 or whether refCnt has overflowed, if so, back off the value of refCnt and throw an exception.
When release decrements the reference count, first decrements the refCnt, then determines whether the number of releases is greater than the number of retains to prevent over-release, and whether the refCnt overflows, and if so, backs out the value of the refCnt and throws an exception.
2.6.4 Introduction of Concurrent Security Issues
In 4.1. we designed the retain and release operations for reference counting to be more performant than they were in 4.1., and while they are now more performant, they introduce new concurrency issues.
Let's assume a scenario where we have a ByteBuf with refCnt = 1 and thread 1 executes therelease()
Operation.
In the 4.1. implementation, Netty first updates the refCnt to 0 with getAndAdd and then calls thedeallocate()
method to free up Native Memory, that's simple and clear, right? Let's add a little more concurrency complexity to it.
Now we insert a thread 2 between steps 1 and 2 of the above diagram, and thread 2 executes this ByteBuf concurrently.retain()
Methods.
In 4.1.'s implementation, thread 2 first updates refCnt from 0 to 1 via getAndAdd, and then thread 2 realizes that refCnt's original value, oldRef, is equal to 0. That is, thread 2 is callingretain()
At that point, ByteBuf's reference count is already 0, and Thread 1 is ready to release the Native Memory.
So thread 2 needs to call the getAndAdd method again to back out the value of refCnt, from 1 to 0 again, and finally throw an IllegalReferenceCountException. this is obviously the correct and semantic result. After all, you can't just call theretain()
。
Now that everything seems to be calm and organized as we envisioned, we might as well add a little more concurrency complexity to it. Insert a thread 3 between step 1.1 and step 1.2 above, and thread 3 executes the ByteBuf concurrently again.retain()
Methods.
Since the update of the reference count (step 1.1) and the fallback of the reference count (step 1.2) are not atomic operations, if a thread 3 is inserted between these two operations, and the thread 3 is concurrently executing theretain()
method, the reference count refCnt is first increased from 1 to 2 by getAndAdd.
Note that at this point, thread 2 hasn't had a chance to back out of the refCnt yet, so thread 3 sees a refCnt of 1 instead of 0. 。
Since the oldRef seen by thread 3 at this point is 1, thread 3 successfully calledretain()
method increases the reference count of the ByteBuf to 2 without backing out or throwing an exception. In thread 3, it appears that the ByteBuf is a fully functional ByteBuf.
Immediately after that, Thread 1 begins executing Step 2--deallocate()
method releases the Native Memory, after which Thread 3 has problems accessing the ByteBuf because the Native Memory has already been released by Thread 1.
2.6.5 Trade-offs between performance and concurrency safety
Netty now has two choices. The first choice is to roll back to version 4.1. and forgo the performance gains of the XADD directive. The CMPXCHG directive that was used in the previous design was less powerful, but did not suffer from the concurrency safety issues mentioned above.
Because Netty uses a pessimistic strategy to update the reference count in a for loop, first determining exceptions and then updating the refCnt via CAS, it doesn't matter if more than one thread sees the intermediate state of the refCnt, because the next CAS performed will fail along with it.
For example, in the above example, when thread 1 releases ByteBuf, in the gap before thread 1 executes CAS to replace refCnt with 0, refCnt is 1. If thread 2 executes the retain method concurrently in this gap, the refCnt seen by thread 2 is indeed 1, which is an intermediate state. CAS replaces refCnt with 2.
At this point, thread 1 will fail to execute the CAS, but will replace refCnt with 1 in the next round of for loops, which is entirely consistent with reference counting semantics.
Another case is that thread 1 has already executed CAS to replace refCnt to 0, and then thread 2 goes to retain, since the design of 4.1. is to check the exception first and then CAS replacement, thread 2 will first check the ByteBuf's refCnt to 0 in the retain method, and then throw an IllegalReferenceCountException and does not perform CAS. This is also consistent with the semantics of reference counting; after all, you can't access a ByteBuf that already has a reference count of zero.
The second option was to retain the performance gains of the XADD directive while also addressing the concurrency safety issues introduced in the 4.1. release. There is no doubt that Netty chose this option.
Before we get into the exciting design of Netty, I think we should review what the root cause of this concurrency security issue is ?
In the 4.1. design, Netty first updates the value of refCnt via the getAndAdd method, and then rolls back if an exception occurs. The two operations, update and rollback, are not atomic, and the intermediate state between them is visible to other threads.
For example, Thread 2 sees the intermediate state of Thread 1 (refCnt = 0) and adds the reference count to 1
, before thread 2 rolls back, the intermediate state (refCnt = 1, oldRef = 0) is seen again by thread 3, which increases the reference count to 2 (refCnt = 2, oldRef = 1). Thread 3 thinks this is a normal state, but thread 1 thinks the value of refCnt is already 0, and then thread 1 releases the Native Memory, which is a problem.
The root cause of the problem is that different values of refCnt represent different semantics. For example, for thread 1, the release reduces the refCnt to 0, which means that the ByteBuf is no longer referenced and the Native Memory can be released.
Thread 2 then adds refCnt to 1 via retain, which changes the semantics of ByteBuf to mean that the ByteBuf is referenced once in thread 2. Finally, thread 3 adds refCnt to 2 via retain, again changing the semantics of ByteBuf.
As long as the XADD instruction is used to update the reference count, the above concurrent update of refCnt is unavoidable, and the key is that the semantics of ByteBuf change every time the value of refCnt is concurrently modified by other threads. This is the key issue in version 4.1.
If Netty wants to enjoy the performance gains of the XADD directive while also addressing the concurrency safety issues mentioned above, it will have to re-design the reference count. The first requirement is to continue to use the XADD directive for reference count updates, but this will result in a change in the semantics of ByteBuf due to concurrent modifications by multiple threads.
Since multi-thread concurrent modification is unavoidable, can we redesign the reference counting so that the semantics of ByteBuf will always remain the same no matter how many threads modify it. That is, as long as thread 1 reduces refCnt to 0, then no matter how thread 2 and thread 3 concurrently modify refCnt or increase the value of refCnt, the semantics of refCnt being equal to 0 will always remain the same?
2.6.6 Introduction of parity design
Here's one of Netty's most ingenious and brilliant designs, where reference counting is no longer designed in the logical sense of the word0 , 1 , 2 , 3 .....
, rather, they fall into two broad categories, either even or odd.
-
The semantics of an even number is that the refCnt of a ByteBuf is not 0, i.e., as long as a ByteBuf is being referenced, its refCnt is an even number, and the exact number of times it has been referenced can be determined via the
refCnt >>> 1
to get. -
The semantics of an odd number is that the refCnt of a ByteBuf is equal to 0. As soon as a ByteBuf is no longer referenced anywhere, its refCnt is an odd number, and the Native Memory referenced behind it is then freed.
When ByteBuf is initialized, refCnt is initialized to 2 (an even number) instead of 1. Each time it is retained, instead of adding 1 to refCnt, it is added 2 (an even number of steps), and each time it is released, instead of subtracting 1 from refCnt, it is subtracted 2 (also an even number of steps). In this way, as long as the reference count of a ByteBuf is even, no matter how many threads call the retain method concurrently, the reference count will still be even, and the semantics will remain the same.
public final int initialValue() {
return 2;
}
When a ByteBuf is released without any reference count, Netty does not set the refCnt to 0, but to 1 (odd), and for an odd refCnt, no matter how many threads concurrently call the retain and release methods, the reference count will still be odd, and the semantics of a ByteBuf with a reference count of 0 will remain unchanged. The semantics of the ByteBuf reference count of 0 will remain unchanged.
Let's take the concurrency safety problem shown above as an example. In the new reference counting design scheme, first thread 1 performs a release method on the ByteBuf, and Netty sets refCnt to 1 (an odd number).
Thread 2 calls the retain method concurrently and adds refCnt from 1 to 3 via getAndAdd. refCnt is still an odd number, and according to the semantics of an odd number -- the ByteBuf reference count is already 0 -- then Thread 2 throws an IllegalReferenceCountException in the retain method.
Thread 3 calls the retain method concurrently and adds refCnt from 3 to 5 via getAndAdd. See, in the design of the new scheme, no matter how many threads execute the retain method concurrently, the value of refCnt is always just an odd number, and then Thread 3 throws an This is entirely consistent with the concurrency semantics of reference counting.
This new reference counting design scheme was introduced in 4.1. version, just through a parity design, it is very clever to solve the concurrency security problems in 4.1. version. Now that the core design elements of the new scheme are clear, I will continue to introduce the implementation details of the new scheme with the 4.1. version.
All ByteBufs in Netty inherit from AbstractReferenceCountedByteBuf, which implements all ByteBuf reference counting operations, and the implementation of the ReferenceCounted interface is here.
public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf {
// gain refCnt The fields in the ByteBuf Offsets in Object Memory
// follow-up action Unsafe treat (sb a certain way) refCnt carry out an operation
private static final long REFCNT_FIELD_OFFSET =
(, "refCnt");
// gain refCnt field (used form a nominal expression) AtomicFieldUpdater
// follow-up action AtomicFieldUpdater to operate refCnt field
private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> AIF_UPDATER =
(, "refCnt");
// establish ReferenceCountUpdater,treat (sb a certain way)于引用计数(used form a nominal expression)所有manipulate最终都会代理到这个类中
private static final ReferenceCountUpdater<AbstractReferenceCountedByteBuf> updater =
new ReferenceCountUpdater<AbstractReferenceCountedByteBuf>() {
@Override
protected AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> updater() {
// pass (a bill or inspection etc) AtomicIntegerFieldUpdater manipulate refCnt field
return AIF_UPDATER;
}
@Override
protected long unsafeOffset() {
// pass (a bill or inspection etc) Unsafe manipulate refCnt field
return REFCNT_FIELD_OFFSET;
}
};
// ByteBuf 中(used form a nominal expression)引用计数,initialize 2 (even number)
private volatile int refCnt = ();
}
A refCnt field is defined to record the number of times a ByteBuf has been referenced. Due to the parity design, when creating a ByteBuf, Netty initializes the refCnt to 2 (an even number), which logically means that the ByteBuf has been referenced once. Subsequent retains on the ByteBuf will add 2 to the refCnt, and releases will subtract 2 from the refCnt, and the single operation for reference counting is done in steps of 2.
Since there is a more general reference counting abstract class AbstractReferenceCounted in Netty, in addition to AbstractReferenceCountedByteBuf, which is specifically designed to implement ByteBuf's reference counting functionality, there is a more general reference counting abstract class AbstractReferenceCounted, which is used to implement reference counting functionality for all system resource classes (of which ByteBuf is just one). ByteBuf is just one of the memory resources).
Since both are implementations of reference counting, these two classes contained a lot of duplicate logic for reference counting related operations in previous versions, so Netty introduced a ReferenceCountUpdater class in the 4.1. version to aggregate all reference counting related implementations here.
ReferenceCountUpdater There are two ways to manipulate the reference count refCnt, one is to manipulate the refCnt via the AtomicFieldUpdater, which we can do via theupdater()
Gets the AtomicFieldUpdater corresponding to the refCnt field.
The other is to manipulate refCnt via Unsafe, which we can do via theunsafeOffset()
to get the offset of the refCnt field in the ByteBuf instance's memory.
Why does Netty provide two ways of accessing or updating refCnt when it is logical to do it in one way ? Wouldn't that be redundant? This point you can first think about why , then we analyze the source code details when the author in the answer for you.
OK, here we formally start to introduce the specific implementation details of the new version of the reference counting design scheme, the first question, in the new design scheme, how do we get the logical reference count of ByteBuf ?
public abstract class ReferenceCountUpdater<T extends ReferenceCounted> {
public final int initialValue() {
// ByteBuf The reference count is initialized to 2
return 2;
}
public final int refCnt(T instance) {
// pass (a bill or inspection etc) updater gain refCnt
// according to refCnt exist realRefCnt 中gain真实的引用计数
return realRefCnt(updater().get(instance));
}
// gain ByteBuf The logical reference count of the
private static int realRefCnt(int rawCnt) {
// parity judgment
return rawCnt != 2 && rawCnt != 4 && (rawCnt & 1) != 0 ? 0 : rawCnt >>> 1;
}
}
Because of the parity reference counting design, we need to determine whether the current rawCnt (refCnt) is odd or even when we get the logical reference count, which represent different semantics.
-
If rawCnt is an odd number, the current ByteBuf is not referenced anywhere, and the logical reference count returns 0.
-
If rawCnt is an even number, then the current ByteBuf is still referenced somewhere and the logical reference count is
rawCnt >>> 1
。
The realRefCnt function is actually a simple parity judgment logic, but its implementation reflects Netty's extreme pursuit of performance. For example, it's easy to determine whether a number is odd or even by using therawCnt & 1
If it returns 0, rawCnt is an even number, and if it returns 1, rawCnt is an odd number.
But then we see that Netty prefixes the parity condition with therawCnt != 2 && rawCnt != 4
What's this for?
In fact, Netty is here to try to use the more performant==
replace&
operations, but it is not possible to use==
operation to enumerate all even values (and it's not necessary), so just use the==
operation to determine the reference counts that occur frequently in real-world scenarios, generally the most frequent reference counts are 2 or 4, which means that ByteBuf will only be referenced 1 or 2 times in most scenarios, and for such high-frequency scenarios, Netty uses the==
operations to target optimizations, and low-frequency scenarios fall back to the&
Arithmetic.
Most of the performance optimizations are the same, and we usually can't come up with a big, global optimization scheme, it's impossible and inefficient. Often, the most effective optimizations with immediate results are those that are specific to local hotspots.
The same is true for the setting of the reference count, both of which need to take into account the parity conversion, which we do in thesetRefCnt
The parameter refCnt specified in the method represents the logical reference count -- the0, 1 , 2 , 3 ....
The logical reference count is multiplied by 2 so that it is always an even number, but to set it to ByteBuf, the logical reference count is multiplied by 2.
public final void setRefCnt(T instance, int refCnt) {
updater().set(instance, refCnt > 0 ? refCnt << 1 : 1); // overflow OK here
}
With these foundations in place, let's take a look at how Netty addresses the concurrency safety issues that existed in version 4.1. in the design of the new version of the retain method. First, Netty's parity design for reference counting is transparent to the user. The reference count is still a normal natural number to the user--0, 1 , 2 , 3 ....
。
So whenever the user calls the retain method in an attempt to increase the reference count of a ByteBuf, the logical increment step - increment - is usually specified (from the user's point of view), whereas from an implementation-specific point of view, Netty adds twice as many increments (rawIncrement ) to the refCnt field.
public final T retain(T instance) {
// Logically add 1 to the reference count, but actually add 2 (from an implementation point of view).
return retain0(instance, 1, 2); }
}
public final T retain(T instance, int increment) {
// all changes to the raw count are 2x the "real" change - overflow is OK
// rawIncrement is always twice the logical count increment
int rawIncrement = checkPositive(increment, "increment") << 1; // Set rawIncrement to the logical count of increment.
// Set rawIncrement to the ByteBuf's refCnt field.
return retain0(instance, increment, rawIncrement);
}
// rawIncrement = increment << 1
// increment is the logical incremental step of the reference count.
// rawIncrement represents the actual incremental step of the reference count.
private T retain0(T instance, final int increment, final int rawIncrement) {
// First add up the value of refCnt with the XADD instruction.
int oldRef = updater().getAndAdd(instance, rawIncrement); // If oldRef is an odd value, then add it.
// If oldRef is an odd number, i.e. the ByteBuf is no longer referenced, throw an exception
if (oldRef ! = 2 &&; oldRef ! = 4 && (oldRef & 1) ! = 0) {
// If oldRef is already an odd number, it will be an odd number no matter how many threads retain it concurrently here, and an exception will be thrown here
throw new IllegalReferenceCountException(0, increment);
}
// don't pass 0!
// refCnt can't be 0, it can only be 1!
if ((oldRef <= 0 && oldRef + rawIncrement >= 0))
|| (oldRef >= 0 && oldRef + rawIncrement < oldRef)) {
// If the refCnt field is already overflowed, backoff and throw an exception
updater().getAndAdd(instance, -rawIncrement);
throw new IllegalReferenceCountException(realRefCnt(oldRef), increment);
}
return instance; }
}
First of all, the new version of the retain0 method still retains the performance benefits of the XADD instruction introduced in version 4.1. The general processing logic is also similar, where the refCnt is first added to the rawIncrement via the getAndAdd method.retain(T instance)
For example, just add 2 here.
Then determine whether the original reference count oldRef is an odd number, if it is an odd number, then it means that ByteBuf does not have any reference, the logical reference count is already 0, then throw IllegalReferenceCountException.
In the case of an odd reference count, no matter how many threads concurrently add 2 to refCnt, refCnt will always be an odd number, and will eventually throw an exception. The point of solving the concurrency safety problem is to ensure that concurrent execution of the retain method does not change the original semantics.
Finally, it will determine whether the refCnt field is overflowed or not, and if it is overflowed, it will be backed out and an exception will be thrown. Let's take the previous concurrency scenario as an example, and use a concrete example to recall the subtleties of the parity design.
Now thread 1 executes the release method on a ByteBuf whose refCnt is 2, and the logical reference count of the ByteBuf is 0. For an ByteBuf without any reference, the new design of the ByteBuf is that its refCnt can only be an odd number, not 0, so Netty will set the refCnt to 1 here. Netty will set refCnt to 1, and then call the deallocate method in step 2 to free the Native Memory.
Thread 2 inserts between step 1 and step 2 and executes the retain method concurrently on the ByteBuf. At this point, thread 2 sees a refCnt of 1, and then adds the refCnt to 3 through getAndAdd, which is still an odd number, and then throws an IllegalReferenceCountException.
Thread 3 inserts itself between steps 1.1 and 1.2 and executes the retain method again concurrently on the ByteBuf. At this point, Thread 3 sees a refCnt of 3, and then adds the refCnt to 5, which is still an odd number, via getAndAdd, and then throws an IllegalReferenceCountException.
This ensures the concurrent semantics of reference counting -- as long as a ByteBuf doesn't have any references (refCnt = 1), other threads will get an exception no matter how concurrently they execute the retain method.
However, the concurrent semantics of reference counting cannot be guaranteed by the retain method alone, it also needs to cooperate with the release method, so in order to guarantee the concurrent semantics, the design of the release method can not use the higher performance of the XADD directive, but to fall back to the CMPXCHG directive to achieve.
Why do you say so? Because the new version of reference counting design adopts parity implementation, refCnt is even means ByteBuf still has references, refCnt is odd means ByteBuf doesn't have any references any more, so you can release the Native Memory safely. For a ByteBuf with an odd refCnt, no matter how many threads concurrently execute the retain method, the refCnt is still an odd number, and an IllegalReferenceCountException will be thrown, which is the concurrency semantics of reference counting.
To ensure this, you need to update the refCnt in an even number of steps for each call to the retain and release methods, such as adding 2 to the refCnt for each call to the retain method and subtracting 2 from the refCnt for each call to the release method.
But there is always a moment when refCnt will be reduced to 0, right? In the new version of parity design, refCnt is not allowed to be 0, because once refCnt is reduced to 0, the concurrent execution of retain by multiple threads will add refCnt to an even number again, which will lead to concurrency problems.
And every time we call the release method, we subtract 2 from the refCnt. If we use the XADD instruction to implement release, think back to the design in version 4.1., the first thing that comes in is to subtract 2 from the refCnt via the getAndAdd method, and then the refCnt becomes 0, which is a concurrency safety issue. So we need to update refCnt to 1 with the CMPXCHG instruction.
Here some students may want to ask, then can not do a little if judgment, if the refCnt after subtracting 2 becomes 0, we getAndAdd method to update the refCnt to 1 (minus an odd number), so can not also take advantage of the performance of the XADD instruction?
The answer is no, because the two operations, the if judgment and the getAndAdd update, are still not atomic, and multiple threads can still execute the retain method concurrently during this gap, as shown in the following figure:
Between thread 1's if judgment and getAndAdd update, thread 2 sees refCnt 2, then thread 2 adds refCnt to 4, and thread 3 immediately adds refCnt to 6. This ByteBuf looks completely normal to both thread 2 and thread 3, but thread 1 releases Native Memory.
And with this design, you can subtract an odd number from refCnt by getAndAdd, and add an even number to refCnt by getAndAdd, which messes up the original parity design.
So our design goal is to make sure that the release method atomically updates the refCnt to 1 when the ByteBuf does not have any reference counts. This must be accomplished using the CMPXCHG instruction and not the XADD instruction.
Furthermore, the CMPXCHG instruction can atomically determine if there is a concurrency situation, and if there is, CAS will fail and we can continue to retry. However, the XADD instruction cannot atomically determine whether there is a concurrency situation, because it updates first and then determines the concurrency every time, which is not atomic. This is particularly evident in the following source code implementation。
2.6.7 Minimizing Memory Barrier Overheads
public final boolean release(T instance) {
// First attempt to read the refCnf in an unSafe nonVolatile way
int rawCnt = nonVolatileRawCnt(instance); // If the logical reference count is reduced to 0, then tryFinalRawCnt(instance).
// If the logical reference count is reduced to 0, then use CAS to update refCnf to 1 by tryFinalRelease0.
// If CAS fails, retry with retryRelease0
// If the logical reference count is not 0, subtract 2 from refCnf with nonFinalRelease0
return rawCnt == 2 ? tryFinalRelease0(instance, 2) || retryRelease0(instance, 1) ?
: nonFinalRelease0(instance, 1, rawCnt, toLiveRealRefCnt(rawCnt, 1)); }
}
One small detail that again demonstrates Netty's dedication to performance is that the refCnt field is declared by Netty to be a volatile field in the ByteBuf.
private volatile int refCnt = ();
Our normal reads and writes to refCnt have to go through a memory barrier, but Netty uses nonVolatile to read the value of refCnt for the first time in the release method, without going through a memory barrier, and reads the cache line directly, avoiding the barrier overhead.
private int nonVolatileRawCnt(T instance) {
// gain REFCNT_FIELD_OFFSET
final long offset = unsafeOffset();
// pass (a bill or inspection etc) UnSafe way to access the refCnt , Avoiding Memory Barrier Overhead
return offset != -1 ? (instance, offset) : updater().get(instance);
}
Some students may ask, if the refCnt is read without going through the memory barrier, won't the refCnt read be an incorrect value?
In fact, it does, but Netty doesn't care, and it doesn't matter if it reads an incorrect value, because reference counting is parity-designed, and we don't need to read an exact value the first time we read the reference count, and since we can just read it through UnSafe, there's still a memory-barrier overhead left.
Why don't we need an exact value? Because if the original refCnt is an odd number, then no matter how many threads concurrently retain, the final number is still an odd number, we only need to know that the refCnt is an odd number can be thrown IllegalReferenceCountException. It doesn't really matter if you read a 3 or a 5.
What if the original refCnt is an even number? It doesn't really matter, we may read the right value or the wrong value, and if we happen to read the right value, so much the better. If we get the wrong value, it doesn't matter because we're using CAS to update it later, in which case CAS will fail and we'll just need to update it correctly in the next round of for loops.
If the refCnt read happens to be 2, that means the logical reference count of the ByteBuf is 0 after this release, and Netty will update the refCnt to 1 via CAS.
private boolean tryFinalRelease0(T instance, int expectRawCnt) {
return updater().compareAndSet(instance, expectRawCnt, 1); // any odd number will work
}
If the CAS update fails, it means that multiple threads may concurrently execute the retain method on the ByteBuf, and the logical reference count may not be 0 at this time, for this concurrent situation, Netty will retry in the retryRelease0 method, and subtract 2 from the refCnt.
private boolean retryRelease0(T instance, int decrement) {
for (;;) {
// Read the refCnt in a Volatile way.
int rawCnt = updater().get(instance), // get the logical reference count if refCnt has been changed.
// Get the logical reference count and throw an exception if the refCnt has become an odd number.
realCnt = toLiveRealRefCnt(rawCnt, decrement); // If this release is performed , the logical reference count is thrown.
// If this release is complete, the logical reference count is 0.
if (decrement == realCnt) {
// CAS updates refCnt to 1
if (tryFinalRelease0(instance, rawCnt)) {
return true; }
}
} else if (decrement < realCnt) {
// original logical reference count realCnt is greater than 1 (decrement)
// then reduce refCnt by 2 via CAS
if (updater().compareAndSet(instance, rawCnt, rawCnt - (decrement << 1))) {
return false; }
}
} else {
// Throw an exception if the refCnt field overflows.
throw new IllegalReferenceCountException(realCnt, -decrement); }
}
// Call yield after a CAS failure
// Reduce fearless contention, otherwise all threads in high concurrency situations CAS fail here
(); }
}
}
As we can see from the implementation of the retryRelease0 method, CAS can atomically detect if there is a concurrency situation, and if there is, all CASs here will fail, and then the correct value will be updated to refCnt in the next round of for loops. This is something that the XADD instruction cannot do.
If the refCnt read for the first time after entering the release method is not 2, then instead of following the tryFinalRelease0 logic above, the value of the refCnt is subtracted from 2 by CAS in nonFinalRelease0.
private boolean nonFinalRelease0(T instance, int decrement, int rawCnt, int realCnt) {
if (decrement < realCnt
&& updater().compareAndSet(instance, rawCnt, rawCnt - (decrement << 1))) {
// ByteBuf (used form a nominal expression) rawCnt minimize 2 * decrement
return false;
}
// CAS Failure is always a retry,If the reference count is already 0 ,Then throw an exception.,Can't do it again. release
return retryRelease0(instance, decrement);
}
Here, Netty's wonderful design of reference counting, I will provide you with a complete analysis of the end of a total of four very exciting optimization design, we summarize the following:
-
Replace the CMPXCHG instruction with the better performing XADD instruction.
-
Reference counting uses a parity design to ensure concurrency semantics.
-
Adoption of better performance
==
Operate to replace&
Arithmetic. -
Try not to go through the memory barrier if you can.
2.7 ByteBuf view design
Like the JDK's design, ByteBufs in Netty can be passed through theslice()
methodology as well asduplicate()
method creates a view ByteBuf, the native ByteBuf and its view ByteBuf share the same memory area at the bottom, which means that any changes made to the view ByteBuf will be reflected in the native ByteBuf. Similarly, any changes made to the native ByteBuf will also be reflected to its view ByteBuf. We can think of the view ByteBuf as a shallow copy of the native ByteBuf.
The native ByteBuf differs from its view ByteBuf in that they each have their own independent readerIndex, writerIndex, capacity, and maxCapacity.
slice()
method is added to the native ByteBuf[readerIndex , writerIndex)
A view ByteBuf is created in this memory area, i.e., the native ByteBuf is shared with the view ByteBuf.[readerIndex , writerIndex)
This memory area. The data area of the view ByteBuf is actually the readable byte area of the native ByteBuf.
view ByteBuf (used form a nominal expression) readerIndex = 0 , writerIndex = capacity = maxCapacity = protozoa ByteBuf (used form a nominal expression)readableBytes()
。
@Override
public int readableBytes() {
// protozoa ByteBuf
return writerIndex - readerIndex;
}
Here's what we're looking at.slice()
method creates a logical implementation of the view ByteBuf:
public abstract class AbstractByteBuf extends ByteBuf {
@Override
public ByteBuf slice() {
return slice(readerIndex, readableBytes());
}
@Override
public ByteBuf slice(int index, int length) {
// assure ByteBuf The reference count of 0
ensureAccessible();
return new UnpooledSlicedByteBuf(this, index, length);
}
}
Netty wraps the slice view ByteBuf in the UnpooledSlicedByteBuf class, where it initializes the readerIndex, writerIndex, capacity, and maxCapacity of the slice view ByteBuf.
class UnpooledSlicedByteBuf extends AbstractUnpooledSlicedByteBuf {
UnpooledSlicedByteBuf(AbstractByteBuf buffer, int index, int length) {
// index = readerIndex
// length = readableBytes()
super(buffer, index, length);
}
@Override
public int capacity() {
// view ByteBuf (used form a nominal expression) capacity cap (a poem) maxCapacity equivalent
// All native ByteBuf (used form a nominal expression) readableBytes()
return maxCapacity();
}
}
As shown in the above figure, the index here is the readerIndex = 4 of the native ByteBuf, and index is used to indicate the offset of the memory area of the view ByteBuf relative to the native ByteBuf, because the view ByteBuf and the native ByteBuf share the same memory area, and operations on the view ByteBuf are actually converted to operations on the native ByteBuf at the bottom. operations on the view ByteBuf are in fact converted to operations on the native ByteBuf.
However, since the view ByteBuf and the native ByteBuf have their own readerIndex and writerIndex, for example, in the above figure, the readerIndex = 0 in the view ByteBuf actually points to the location where the readerIndex = 4 in the native ByteBuf. So every time we read or write to the view ByteBuf, we need to convert the readerIndex of the view ByteBuf to the readerIndex of the native ByteBuf by adding an offset (index), and then read or write data from the native ByteBuf.
@Override
protected byte _getByte(int index) {
// The bottom layer is actually a response to the native ByteBuf visits
return unwrap()._getByte(idx(index));
}
@Override
protected void _setByte(int index, int value) {
unwrap()._setByte(idx(index), value);
}
/**
* Returns the index with the needed adjustment.
*/
final int idx(int index) {
// Convert to Native ByteBuf (used form a nominal expression) readerIndex or writerIndex
return index + adjustment;
}
idx(int index)
The adjustment in the method is the index offset in the UnpooledSlicedByteBuf constructor above, initialized to the readerIndex of the native ByteBuf.
length is initialized to the native ByteBuf'sreadableBytes()
In the view ByteBuf, the writerIndex, capacity, and maxCapacity are all initialized with length.
abstract class AbstractUnpooledSlicedByteBuf extends AbstractDerivedByteBuf {
// protozoa ByteBuf
private final ByteBuf buffer;
// view ByteBuf 相对于protozoa ByteBufof the data area offset
private final int adjustment;
AbstractUnpooledSlicedByteBuf(ByteBuf buffer, int index, int length) {
// 设置view ByteBuf (used form a nominal expression) maxCapacity,readerIndex because of 0
super(length);
// protozoa ByteBuf
= buffer;
// 数据偏移because ofprotozoa ByteBuf (used form a nominal expression) readerIndex
adjustment = index;
// 设置view ByteBuf (used form a nominal expression) writerIndex
writerIndex(length);
}
}
But byslice()
The view ByteBuf created by the method does not change the reference count of the native ByteBuf, which is problematic because the view ByteBuf and the native ByteBuf share the same area of memory underneath, and they may not be aware of each other's existence in the application context of either the native ByteBuf or the view ByteBuf.
If you call the release method on the native ByteBuf, the reference count will be 0, and the Native Memory of the native ByteBuf will be released. Accessing the view ByteBuf at this point is problematic because the Native Memory has already been released by the native ByteBuf. In the same way, calling the release method on a view ByteBuf also affects the native ByteBuf.
For this purpose Netty provides aretainedSlice()
method, which adds 1 to the reference count of the native ByteBuf when creating the slice view ByteBuf, so that they both share the same reference count.
@Override
public ByteBuf retainedSlice() {
// protozoa ByteBuf The reference count of the 1
return slice().retain();
}
apart fromslice()
In addition, Netty also providesduplicate()
method to create the view ByteBuf.
@Override
public ByteBuf duplicate() {
// assure ByteBuf The reference count of 0
ensureAccessible();
return new UnpooledDuplicatedByteBuf(this);
}
peacebuildingslice()
The difference is thatduplicate()
The duplicate view ByteBuf is an exact replica of the native ByteBuf, although the duplicate view ByteBuf and the native ByteBuf have their own readerIndex, writerIndex, capacity, and maxCapacity, they all have the same values. duplicate view ByteBuf also shares the same Native Memory as the native ByteBuf. The duplicate view ByteBuf also shares the same Native Memory as the native ByteBuf.
public class DuplicatedByteBuf extends AbstractDerivedByteBuf {
// protozoa ByteBuf
private final ByteBuf buffer;
public DuplicatedByteBuf(ByteBuf buffer) {
this(buffer, (), ());
}
DuplicatedByteBuf(ByteBuf buffer, int readerIndex, int writerIndex) {
// Initializing the view ByteBuf (used form a nominal expression) maxCapacity 与protozoa(used form a nominal expression)相同
super(());
// protozoa ByteBuf
= buffer;
// view ByteBuf (used form a nominal expression) readerIndex , writerIndex 也与protozoa相同
setIndex(readerIndex, writerIndex);
markReaderIndex();
markWriterIndex();
}
@Override
public int capacity() {
// view ByteBuf (used form a nominal expression) capacity 也与protozoa相同
return unwrap().capacity();
}
}
Netty also provides a correspondingretainedDuplicate()
method, which is used to create a duplicate view ByteBuf while increasing the reference count of the native ByteBuf. The same reference count is shared between the view ByteBuf and the native ByteBuf.
@Override
public ByteBuf retainedDuplicate() {
return duplicate().retain();
}
The two view ByteBufs described above can be thought of as a shallow copy of the native ByteBuf, and Netty also provides thecopy()
method to realize the deep copy of the native ByteBuf, copy out of the ByteBuf is a copy of the native ByteBuf, the two underlying reliance on the Native Memory is different, each has a separate readerIndex, writerIndex, capacity, maxCapacity.
public abstract class AbstractByteBuf extends ByteBuf {
@Override
public ByteBuf copy() {
// from the original ByteBuf hit the nail on the head readerIndex commencement,copy (loanword) readableBytes bytes to the new ByteBuf center
return copy(readerIndex, readableBytes());
}
}
copy()
method of the native ByteBuf.[readerIndex , writerIndex)
The content of this data range is copied, and the copied ByteBuf has readerIndex = 0 and writerIndex = capacity = the value of the original ByteBuf.readableBytes()
maxCapacity is the same as the native maxCapacity.
public class UnpooledDirectByteBuf {
@Override
public ByteBuf copy(int index, int length) {
ensureAccessible();
ByteBuffer src;
try {
// general term for sth. ByteBuf center [index , index + lengh) The data in this range is copied to the new ByteBuf center
src = (ByteBuffer) ().clear().position(index).limit(index + length);
} catch (IllegalArgumentException ignored) {
throw new IndexOutOfBoundsException("Too many bytes to read - Need " + (index + length));
}
// First, a new paragraph is requested native memory , renewed ByteBuf The initial capacity is length (true capacity),Maximum capacity with native ByteBuf (used form a nominal expression) maxCapacity equivalent
// readerIndex = 0 , writerIndex = length
return alloc().directBuffer(length, maxCapacity()).writeBytes(src);
}
}
2.8 Zero-Copy Design of CompositeByteBuf
Zero-copy is not the kind of zero-copy that we often refer to at the OS level, but rather Netty's own implementation of memory-copy avoidance at the userland level. For example, traditionally, if you want to aggregate multiple independent ByteBufs into a single ByteBuf, you would first request a larger section of memory from the OS, then copy the contents of the ByteBufs into that newly requested memory, and then free the memory for those ByteBufs.
Netty introduces CompositeByteBuf to address two performance overheads: the need to reclaim more memory from the OS, and the copying of memory. Netty introduces the CompositeByteBuf to solve these two problems by cleverly utilizing the memory occupied by the original ByteBuf, and on top of that, combining them into a logical CompositeByteBuf to provide a unified logical view.
CompositeByteBuf is also a view ByteBuf, which is similar to the view ByteBuf we introduced in the previous section.
Like SlicedByteBuf and DuplicatedByteBuf, they don't take up Native Memory, the underlying data is stored in the native ByteBuf.
The difference is that SlicedByteBuf and DuplicatedByteBuf are view ByteBufs created on top of a single native ByteBuf, while CompositeByteBuf is a unified logical view ByteBuf created on top of multiple native ByteBufs.
CompositeByteBuf is no different from any other ordinary ByteBuf for us users, it has its own independent readerIndex, writerIndex, capacity, maxCapacity, and the design elements of various ByteBufs introduced in the previous sections are also reflected in the CompositeByteBuf will also reflect the design elements of ByteBuf introduced in the previous sections.
However, from an implementation point of view, CompositeByteBuf is just a logical ByteBuf, which doesn't take up any Native Memory, and any operations on CompositeByteBuf need to be converted to its internal ByteBuf. In this section, we'll dive into the inner workings of the CompositeByteBuf and look at Netty's clever design.
2.8.1 General Architecture of CompositeByteBuf
In terms of overall design, CompositeByteBuf contains the following five important attributes, the core of which is the components array. Native ByteBufs that need to be aggregated are encapsulated by Netty in the Component class and organized in the components array. All subsequent operations on the CompositeByteBuf need to deal with this array.
public class CompositeByteBuf extends AbstractReferenceCountedByteBuf implements Iterable<ByteBuf> {
// inside (part, section) ByteBuf splitter,for subsequent capacity expansion,copy , merge
private final ByteBufAllocator alloc;
// compositeDirectBuffer nevertheless compositeHeapBuffer ?
private final boolean direct;
// largest components array size(16)
private final int maxNumComponents;
// be facing (us) CompositeByteBuf inclusive components number of individuals
private int componentCount;
// stockpile component arrays
private Component[] components; // resized when needed
}
maxNumComponents represents the maximum size of the components array. By default, the maximum number of components that can be contained in a CompositeByteBuf is 16; if this number is exceeded, Netty will re-combine all of the components currently in the CompositeByteBuf into one larger component. If this number is exceeded, Netty will recombine all the Components in the current CompositeByteBuf into one larger Component.
public abstract class AbstractByteBufAllocator implements ByteBufAllocator {
static final int DEFAULT_MAX_COMPONENTS = 16;
}
componentCount represents the number of Components currently contained in the CompositeByteBuf. Whenever we pass theaddComponent
method adds a new ByteBuf to the CompositeByteBuf, Netty wraps the ByteBuf in a new instance of Component and stores it in the components array, adding one to the componentCount.
The relationship between CompositeByteBuf and the real ByteBuf architecture design of its underlying aggregation is shown below:
The core of creating a CompositeByteBuf is actually creating the underlying components array, where all the native ByteBufs that are added to the CompositeByteBuf will be organized.
private CompositeByteBuf(ByteBufAllocator alloc, boolean direct, int maxNumComponents, int initSize) {
// set up maxCapacity
super(AbstractByteBufAllocator.DEFAULT_MAX_CAPACITY);
= (alloc, "alloc");
= direct;
= maxNumComponents;
// starting (point) Component The capacity of the array is maxNumComponents
components = newCompArray(initSize, maxNumComponents);
}
Here are the parametersinitSize
This is not the number of bytes contained in the CompositeByteBuf, but rather the number of native ByteBufs initially wrapped, i.e., the number of initial Components. the overall size of the components array is determined by the parameter maxNumComponents, but cannot exceed 16.
private static Component[] newCompArray(int initComponents, int maxNumComponents) {
// MAX_COMPONENT
int capacityGuess = (AbstractByteBufAllocator.DEFAULT_MAX_COMPONENTS, maxNumComponents);
// starting (point) Component The capacity of the array is maxNumComponents
return new Component[(initComponents, capacityGuess)];
}
Now that we have a basic skeleton of CompositeByteBuf, how does Netty assemble multiple native ByteBufs into a logical unified view ByteBuf based on this basic skeleton?
That is to say, how do we translate the read/write logic based on the readerIndex and writerIndex in CompositeByteBuf to the underlying native ByteBuf? This is the core of the design.
The following author will take you from the outside to the inside, from easy to difficult to dismantle the core design elements of CompositeByteBuf. From the outermost layer of CompositeByteBuf, in fact, we are not unfamiliar to the user it is an ordinary ByteBuf, with its own independent readerIndex, writerIndex.
But the bytes in the CompositeByteBuf that logically appear to be contiguous are actually stored behind the scenes in different native ByteBufs. The memory of the different ByteBufs is actually not contiguous with each other.
The key to the problem is how to determine the logical data in CompositeByteBuf behind a section of the corresponding real ByteBuf, if we can pass the relevant Index of CompositeByteBuf to find the corresponding ByteBuf behind this Index, and nearly find the Index of the ByteBuf, so that we can convert the logical operations of CompositeByteBuf into read/write operations on real memory. If we can find the ByteBuf behind the Index of CompositeByteBuf, then we can find the Index of ByteBuf, and then we can convert the logical operation of CompositeByteBuf to read/write operation of real memory.
CompositeByteBuf to native ByteBuf conversion relationship, Netty wrapped in Component class, each native ByteBuf wrapped in CompositeByteBuf corresponds to a Component instance. They are organized in order in the components array.
private static final class Component {
// Native ByteBuf
final ByteBuf srcBuf; // The index of CompositeByteBuf plus srcAdjustment gives the index of srcBuf.
// Add srcAdjustment to the index of CompositeByteBuf to get the associated index of srcBuf.
int srcAdjustment; // srcBuf may be the index of a CompositeByteBuf.
// srcBuf may be a wrapped ByteBuf, e.g. SlicedByteBuf, DuplicatedByteBuf.
// The lowest level ByteBuf wrapped by srcBuf is stored in the buf field.
final ByteBuf buf; // CompositeByteBuf
// The index of CompositeByteBuf is added to adjustment to get the relevant index of buf.
int adjustment; // The index of the CompositeByteBuf plus the
// The range of data represented by the Component in the CompositeByteBuf's perspective [offset , endOffset)
int offset.
int endOffset; }
}
The logical range of data that a Component can represent in the perspective of a CompositeByteBuf is[offset , endOffset)
。
For example, the first green ByteBuf in the above figure, the data stored in it makes up the data in the CompositeByteBuf.[0 , 4)
This logical data range. The second yellow ByteBuf, the data stored in it, makes up the logical range of data in the CompositeByteBuf.[4 , 8)
This logical data range. The third blue ByteBuf, the data stored in it, makes up the logical range of data in the CompositeByteBuf.[8 , 12)
This logical data range. The endOffset of the previous Component is exactly the offset of the next Component.
The ByteBuf that stores the data is stored in the srcBuf field of the corresponding Component. When we read or write from or to the readerIndex or writerIndex of the CompositeByteBuf, we first need to determine the srcBuf that corresponds to the relevant index, and then convert the index of the CompositeByteBuf to the srcIndex of the srcBuf. When we read or write from CompositeByteBuf's readerIndex or writerIndex, we first need to determine the srcBuf that corresponds to the index, and then convert the index of CompositeByteBuf to the srcIndex of srcBuf, and then read or write to srcBuf via srcIndex.
The index is converted by srcAdjustment. e.g. the current readerIndex of CompositeByteBuf is 5, which corresponds to the second yellow ByteBuf, whose readerIndex is 1.
So the srcAdjustment of the second Component is -4, so when we read the CompositeByteBuf, we first add its readerIndex to the srcAdjustment to get the ByteBuf's readerIndex, and then it's just a normal ByteBuf reading. operation.
For example, if we want to write to a CompositeByteBuf, the current writerIndex is 10, which corresponds to the third blue ByteBuf, which has a writerIndex of 2.
So the srcAdjustment of the third Component is -8, the writerIndex of the CompositeByteBuf plus the srcAdjustment will get the writerIndex of the ByteBuf, and then it is the normal ByteBuf writing operation.
int srcIdx(int index) {
// Convert the index associated with the CompositeByteBuf to the index associated with the srcBuf.
return index + srcAdjustment; }
}
In addition to srcBuf, there is another buf field in the Component instance, and you may wonder why there are two ByteBuf fields in the design; isn't there a one-to-one relationship between the Component instance and the ByteBuf?
srcBuf means that we have passed theaddComponent
method is added to the original ByteBuf in CompositeByteBuf. srcBuf may be a view ByteBuf, such as SlicedByteBuf and DuplicatedByteBuf described in the previous subsection. srcBuf may also be a wrapped ByteBuf, such as WrappedByteBuf , SwappedByteBuf .
If srcBuf is a SlicedByteBuf, we need to disassemble its native ByteBuf and store it in the buf field of the Component instance. In fact, the buf in the Component is where the data is actually stored.
abstract class AbstractUnpooledSlicedByteBuf {
// protozoa ByteBuf
private final ByteBuf buffer;
}
The counterpart to the buf is the adjustment, which is used to convert the index associated with the CompositeByteBuf to the index associated with the buf. If we are performing a read operation on a CompositeByteBuf, its current readerIndex is 5, while the readerIndex of the buf is 6. is 6.
So before reading, we need to add the adjustment to the readerIndex of CompositeByteBuf to get the readerIndex of the buf, and then transfer the reading operation to the buf. In fact, it is exactly the same as the view ByteBuf introduced in the previous subsection, we need to adjust the index before reading or writing.
@Override
public byte getByte(int index) {
// Find the component to which the data belongs, using the index of the CompositeByteBuf.
Component c = findComponent(index); // First convert the idx to the index associated with the buf.
// First convert the idx to the index associated with the buf.
// Convert reads and writes from CompositeByteBuf to buf.
return ((index)); }
}
int idx(int index) {
// Convert the index associated with the CompositeByteBuf to the index associated with the buf.
return index + adjustment; }
}
So how do we find out where the underlying data is stored in the Component based on the index of the specified CompositeByteBuf?
The core idea is actually quite simple, as each Component describes which range of data in the CompositeByteBuf it represents -- the[offset , endOffset)
All Components are organized in an orderly manner in the components array. All Components are organized in the components array. We can use binary lookup to find out where the index falls within the range of the Component representation.
This lookup is done in thefindComponent
method, Netty caches the last accessed Component in the lastAccessed field of the CompositeByteBuf, and each lookup determines whether the index falls within the lastAccessed range - - - - - - - - - - - - - - - - - - - - -[ , )
。
If the index happens to be contained in a cached Component (lastAccessed), then lastAccessed is returned directly.
// Cache the most recently found Component
private Component lastAccessed;
private Component findComponent(int offset) {
Component la = lastAccessed;
// first locate offset Does it happen to fall on lastAccessed interval of
if (la != null && offset >= && offset < ) {
return la;
}
// in all Components Perform a bisection lookup in
return findIt(offset);
}
If the index happens to miss the cache, then a binary lookup is performed on the entire components array:
private Component findIt(int offset) {
for (int low = 0, high = componentCount; low <= high;) {
int mid = low + high >>> 1;
Component c = components[mid];
if (offset >= ) {
low = mid + 1;
} else if (offset < ) {
high = mid - 1;
} else {
lastAccessed = c;
return c;
}
}
throw new Error("should not reach here");
}
2.8.2 CompositeByteBuf Creation
Okay, now that we're familiar with the general architecture of CompositeByteBuf, let's take a look at how Netty logically aggregates multiple ByteBufs into a single CompositeByteBuf.
public final class Unpooled {
public static ByteBuf wrappedBuffer(ByteBuf... buffers) {
return wrappedBuffer(, buffers);
}
}
The initial maxNumComponents of CompositeByteBuf is the length of the buffers array, so if we just pass in a ByteBuf, we don't need to create a CompositeByteBuf, but instead return the slice view of that ByteBuf.
If we pass in more than one ByteBuf, then the ByteBufs are wrapped into a CompositeByteBuf.
public final class Unpooled {
public static ByteBuf wrappedBuffer(int maxNumComponents, ByteBuf... buffers) {
switch () {
case 0:
break;
case 1:
ByteBuf buffer = buffers[0];
if (()) {
// Direct return () view
return wrappedBuffer((BIG_ENDIAN));
} else {
();
}
break;
default:
for (int i = 0; i < ; i++) {
ByteBuf buf = buffers[i];
if (()) {
// From the first readable ByteBuf —— buffers[i] Start creating CompositeByteBuf
return new CompositeByteBuf(ALLOC, false, maxNumComponents, buffers, i);
}
// buf unreadable then release
();
}
break;
}
return EMPTY_BUFFER;
}
}
After entering the creation process of CompositeByteBuf, the first step is to create an empty CompositeByteBuf, that is to say, to build up the skeleton of CompositeByteBuf, at this time, its initSize is - offset
。
Note that initSize is not the number of bytes initially contained in the CompositeByteBuf, but rather the number of initial Components. offset is the index in the buffers array from which the CompositeByteBuf is created, which is the last parameter i in the CompositeByteBuf constructor above. is the last parameter i in the above CompositeByteBuf constructor.
followed byaddComponents0
method creates initialized Component instances for each ByteBuf in the buffers array and adds them to the components array of the CompositeByteBuf in an orderly fashion.
But at this point the number of Component instances may have exceeded the maxNumComponents limit, so the next step will be to add a new instance in theconsolidateIfNeeded()
method merges all Components in the current CompositeByteBuf into one larger Component.The length of the components array in the CompositeByteBuf cannot exceed the maxNumComponents limit, if it does, it needs to be merged here.
Finally, set the readerIndex and writerIndex of the current CompositeByteBuf. In the initial state, the readerIndex of the CompositeByteBuf will be set to 0, and the writerIndex will be set to the endOffset of the last Component.
CompositeByteBuf(ByteBufAllocator alloc, boolean direct, int maxNumComponents,
ByteBuf[] buffers, int offset) {
// First initialize an empty CompositeByteBuf
// initSize because of - offset
this(alloc, direct, maxNumComponents, - offset);
// because of所有(used form a nominal expression) buffers establish Component an actual example,and add it to the components array
addComponents0(false, 0, buffers, offset);
// If the current component The number of individuals has exceeded maxNumComponents,then all of the component merge into one
consolidateIfNeeded();
// set up CompositeByteBuf (used form a nominal expression) readerIndex = 0
// writerIndex because of最后一个 component (used form a nominal expression) endOffset
setIndex0(0, capacity());
}
2.8.3 shiftComps to make room for a new ByteBuf
In the entire CompositeByteBuf construction process, the core and most complex step is actually theaddComponents0
method, adding multiple ByteBufs to the components array of a CompositeByteBuf in an orderly fashion may seem simple, but there are a number of complexities to consider.
The complication is where in the components array do we insert the ByteBufs? The simpler and more intuitive case is to insert them at the end of the components array, i.e., at a location whose index cIndex equals componentCount, in two cases:
-
cIndex = componentCount = 0
This case means that we are inserting ByteBufs into an empty CompositeByteBuf, which is simple enough to do. -
cIndex = componentCount > 0
This case means that we are inserting ByteBufs into a non-empty CompositeByteBuf, as shown above. Again, it's simple, just insert it at the componentCount position.
A slightly more complicated case is when we insert in the middle of the components array instead of at the end, i.e., thecIndex < componentCount
of the situation. As shown in the figure below, suppose we now need to add a new file to thecIndex = 3
We insert two ByteBufs into the components[3] and components[4], but now the components[3] and components[4] are already occupied. So we need to move the original component at these two positions back two positions to free up components[3] and components[4].
// i = 3 , count = 2 , size = 5
(components, i, components, i + count, size - i);
In the more complex case where the components array needs to be expanded, when a CompositeByteBuf is just initialized, its components array length is equal to maxNumComponents.
If the number of components in the components array - componentCount plus the number of ByteBufs to be added - count - has exceeded maxNumComponents, the components array needs to be expanded.
// Initially 0, the number of components currently in the CompositeByteBuf.
final int size = componentCount, // the number of new components after this addComponents0 operation.
// The number of new components after this addComponents0 operation.
newSize = size + count; final int size = componentCount, // the number of new components after this addComponents0 operation.
// if newSize exceeds maxNumComponents then expand the components array
if (newSize > ) {
....... expand ....
// New array after expansion
components = newArr;
}
The length of the expanded components array is the difference between the newSize and the original length of the components array.3 / 2
A maximum value is taken between.
int newArrSize = (size + (size >> 1), newSize);
If we originally happened to want to insert at the end of the components array, that is, thecIndex = componentCount
case, then it is necessary to pass the First you request an array of newArrSize and then you copy the contents of the original components array.
newArr = (components, newArrSize, Component[].class);
This gives the new components array a place to hold the ByteBufs that need to be added this time.
If we wish to insert in the middle of the original components array, that is, thecIndex < componentCount
of the situation, as shown in the figure below:
In this case, you can't copy the original components array as it is when expanding it; instead, you first pass it through the commander-in-chief (military)
[0 , cIndex)
The contents of this range are copied over, after the[cIndex , componentCount)
The contents of this range are copied into the new array'scIndex + count
Location.
This frees up two positions in the cIndex index of the new components array to add the current two ByteBufs, and finally updates the value of componentCount. The above freeing up logic is encapsulated in the shiftComps method:
private void shiftComps(int i, int count) {
// Initialize to 0, the number of components currently in the CompositeByteBuf.
final int size = componentCount, // the number of new components after this addComponents0 operation.
// The number of new components after this addComponents0 operation.
newSize = size + count; // the number of new components after this addComponents0 operation.
// if newSize exceeds max components (16) then expand the components array
if (newSize > ) {
// grow the array, expanding it to 3 / 2
int newArrSize = (size + (size >> 1), newSize);
Component[] newArr.
if (i == size) {
// insert at the end of the Component[] array
// Initial state i = size = 0
// size - 1 is the last element of the Component[] array, and the specified i happens to be out of bounds.
// The contents of the original Component[] array are copied into newArr.
newArr = (components, newArrSize, Component[].class);
} else {
// Insert in the middle of the Component[] array
newArr = new Component[newArrSize]; } else { // Insert in the middle of the Component[] array.
if (i > 0) {
// copy the contents between [0 , i) into newArr
(components, 0, newArr, 0, i);
}
if (i < size) {
// Copy the rest of [i , size) into newArr starting at i + count.
// Because we need to make the original [ i , i+count ] locations available to add this new component.
(components, i, newArr, i + count, size - i);
}
}
// Expand the new array
components = newArr;
} else if (i < size) {
// i < size This operation overwrites the original [ i , i + count ), so we need to move the components backward from the original position here
(components, i, components, i + count, size - i); }
}
// Update componentCount
componentCount = newSize; }
}
2.8.4 How a Component Encapsulates a ByteBuf
After all the moving around in the shiftComps method in the previous section, the components array in CompositeByteBuf finally has a place for the ByteBufs that need to be added. Next, we need to initialize a Component instance for each ByteBuf, and then place these Component instances in the components array.
private static final class Component {
// Native ByteBuf
final ByteBuf srcBuf; // The index of CompositeByteBuf plus srcAdjustment gives the index of srcBuf.
// Add srcAdjustment to the index of CompositeByteBuf to get the associated index of srcBuf.
int srcAdjustment; // srcBuf may be the index of a CompositeByteBuf.
// srcBuf may be a wrapped ByteBuf, e.g., SlicedByteBuf, DuplicatedByteBuf.
// The lowest level ByteBuf wrapped by srcBuf is stored in the buf field.
final ByteBuf buf; // CompositeByteBuf
// The index of CompositeByteBuf is added to adjustment to get the relevant index of buf.
int adjustment; // The index of the CompositeByteBuf plus the
// The range of data represented by the Component in the CompositeByteBuf's perspective [offset , endOffset)
int offset.
int endOffset; }
}
We first need to initialize the offset, endOffset properties of the Component instance, we have already introduced, a Component in the perspective of the CompositeByteBuf can represent the data logical range is[offset , endOffset)
In the components array, the endOffset of the previous Component is often the offset of the next Component. In the components array, the endOffset of the previous Component is usually the offset of the next Component.
If we expect the insertion to start at the first position in the components array (cIndex = 0), then the offset of the first Component is naturally zero.
If cIndex > 0 , then we need to find its previous Component -- components[cIndex - 1], whose endOffset is exactly the offset of the current Component.
and then throughnewComponent
method initializes the Component instance using ByteBuf related properties and offset. The created Component instance is then placed at the corresponding location -- components[cIndex].
// Get the current insertion Component (used form a nominal expression) offset
int nextOffset = cIndex > 0 ? components[cIndex - 1].endOffset : 0;
for (ci = cIndex; arrOffset < len; arrOffset++, ci++) {
// awaiting insertion ByteBuf
ByteBuf b = buffers[arrOffset];
if (b == null) {
break;
}
// commander-in-chief (military) ByteBuf encapsulated in Component center
Component c = newComponent(ensureAccessible(b), nextOffset);
components[ci] = c;
// the next one Component (used form a nominal expression) Offset It's the last one. Component (used form a nominal expression) endOffset
nextOffset = ;
}
Assuming that we now have an empty CompositeByteBuf, we need to convert a file with a data range of[1 , 4]
The srcBuf with readerIndex = 1 is inserted into the components array of the CompositeByteBuf.
But if the srcBuf is a view ByteBuf, such as: SlicedByteBuf, DuplicatedByteBuf, or a wrapped ByteBuf, such as: WrappedByteBuf, SwappedByteBuf.
Then we need to keep executing the srcBufunwrap()
, extracts its lowest level native ByteBuf, as shown above, the data range of the native buf is[4 , 7]
, the offset adjustment of the relevant index between srcBuf and buf is equal to 3, and the readerIndex of the native buf = 4.
Finally we initialize the Component instance based on srcBuf, srcIndex (readerIndex of srcBuf), native buf, unwrappedIndex (readerIndex of buf), offset, len (number of readable bytes in srcBuf).
private Component newComponent(final ByteBuf buf, final int offset) {
// srcBuf (used form a nominal expression) readerIndex = 1
final int srcIndex = ();
// srcBuf 中(used form a nominal expression)可读字节数 = 4
final int len = ();
// srcBuf 可能是一个被包装过(used form a nominal expression) ByteBuf,for example SlicedByteBuf,DuplicatedByteBuf
// gain srcBuf 底层(used form a nominal expression)protozoa ByteBuf
ByteBuf unwrapped = buf;
// protozoa ByteBuf (used form a nominal expression) readerIndex
int unwrappedIndex = srcIndex;
while (unwrapped instanceof WrappedByteBuf || unwrapped instanceof SwappedByteBuf) {
unwrapped = ();
}
// unwrap if already sliced
if (unwrapped instanceof AbstractUnpooledSlicedByteBuf) {
// gain视图 ByteBuf as opposed to protozoa ByteBuf (used form a nominal expression)相关 index misalignment
// adjustment = 3
// unwrappedIndex = srcIndex + adjustment = 4
unwrappedIndex += ((AbstractUnpooledSlicedByteBuf) unwrapped).idx(0);
// gainprotozoa ByteBuf
unwrapped = ();
} else if (unwrapped instanceof PooledSlicedByteBuf) {
unwrappedIndex += ((PooledSlicedByteBuf) unwrapped).adjustment;
unwrapped = ();
} else if (unwrapped instanceof DuplicatedByteBuf || unwrapped instanceof PooledDuplicatedByteBuf) {
unwrapped = ();
}
return new Component((ByteOrder.BIG_ENDIAN), srcIndex,
(ByteOrder.BIG_ENDIAN), unwrappedIndex, offset, len, slice);
}
Since the current CompositeByteBuf is still empty and contains no logical data, when the srcBuf of length 4 is added, the CompositeByteBuf generates the[0 , 3]
This logical data range, so srcBuf belongs to Component with offset = 0 , endOffset = 4 , srcAdjustment = 1 , adjustment = 4.
Component(ByteBuf srcBuf, int srcOffset, ByteBuf buf, int bufOffset,
int offset, int len, ByteBuf slice) {
= srcBuf;
// for use in placing CompositeByteBuf (used form a nominal expression) index convert to srcBuf (used form a nominal expression)index
// 1 - 0 = 1
= srcOffset - offset;
= buf;
// for use in placing CompositeByteBuf (used form a nominal expression) index convert to buf (used form a nominal expression)index
// 4 - 0 = 4
= bufOffset - offset;
// CompositeByteBuf [offset , endOffset) 这段范围(used form a nominal expression)字节存储在该 Component center
// 0
= offset;
// the next one Component (used form a nominal expression) offset
// 4
= offset + len;
}
When we go on to initialize the next Component, its Offset is actually the endOffset of that Component. The process is the same after that.
2.8.5 addComponents0
With this background in mind, the logic of the addComponents0 method is clear:
private CompositeByteBuf addComponents0(boolean increaseWriterIndex,
final int cIndex, ByteBuf[] buffers, int arrOffset) {
// Length of the buffers array
final int len = ,
// The number of ByteBufs to add in this batch.
count = len - arrOffset; // ci is the number of ByteBufs added from the components array.
// ci is the index of the components array at which to start adding.
// This is given as an initial value and will be reset when the shiftComps are done.
int ci = Integer.MAX_VALUE; // This is an initial value that will be reset when shiftComps is complete.
try {
// cIndex >= 0 && cIndex <= componentCount
checkComponentIndex(cIndex); // Check the component count for the newly added Byte.
// Make room for the newly added ByteBuf and increase the componentCount count.
shiftComps(cIndex, count); // will increase componentCount
// Get the offset of the component currently being inserted
int nextOffset = cIndex > 0 ? components[cIndex - 1].endOffset : 0; for (ci = cIndex, count); // will increase componentCount.
for (ci = cIndex; arrOffset < len; arrOffset++, ci++) {
ByteBuf b = buffers[arrOffset];
if (b == null) {
break;
}
// Wrap the ByteBuf in a Component.
Component c = newComponent(ensureAccessible(b), nextOffset);
Component[ci] = c; // The Offset of the next Component.
// The Offset of the next Component is the endOffset of the previous Component.
nextOffset = ;; // the next Component's Offset is the endOffset of the previous Component.
}
nextOffset = ; }
} finally {
// ci is now the index following the last successfully added component
// ci = componentCount means that the component has been appended backward in order
// ci < componentCount means that a new component is inserted in the middle of the components array.
if (ci < componentCount) {
// If the for loop above completes, ci = cIndex + count
if (ci < cIndex + count) {
// If there is a break in the above for loop or an exception is thrown.
// ci < componentCount, which will involve moving the component in the above shiftComps, because it has to make room for it.
// If an exception occurs, remove any component positions that are not added to the components array after it
// [ci, cIndex + count) This is removed because an exception was thrown at ci-1, and the components array is reorganized
removeCompRange(ci, cIndex + count);
for (; arrOffset < len; ++arrOffset) {
(buffers[arrOffset]); }
}
}
// (In the case of an intermediate insertion) you need to adjust the Offset associated with the component from ci to size -1
updateComponentOffsets(ci); // only need to do this here for components after the added ones
}
if (increaseWriterIndex && ci > cIndex && ci <= componentCount) {
// last component[ci - 1] added this time
// first component added this time components[cIndex]
// The last endOffset minus the offset of the first is the number of bytes added.
writerIndex += components[ci - 1].endOffset - components[cIndex].offset;
}
}
}
Here we focus onfinally {}
The logic in the code block. The core logic in the addComponents0 method is to use the shiftComps method to make room for the next newly created Component, since we're likely to be inserting it in the middle of the original components array.
It will then be in afor ()
loop keeps placing the newly created Component into thecomponents[ci]
Position.
When we jump out of the for loop into the finally block, the value of ci is exactly the next position of the last component that was successfully added to the components array, as shown in the following figure, assuming that components[0], components[1], components[2] are the new values that we have just inserted into the components array in the for loop, then the value of ci will be 3 after the for loop ends. Assuming components[0], components[1], and components[2] are the new values we just inserted in the for loop, the value of ci at the end of the for loop will be 3.
in the event thatci = componentCount
This just means that we've been inserting at the end of the components array, in which case [offset , endOffset) is continuous across Component instances and doesn't need to be adjusted in any way.
unlessci < componentCount
This means that we start inserting in the middle of the original components array, components[3], components[4] in the following figure are the insertion locations, when the insertion is complete the value of ci is 5.
At this point it's time to realign components[5], components[6] in the[offset , endOffset)
range, because the shiftComps method only moves you around, it doesn't realign the[offset , endOffset)
scope, when a new Component instance is inserted, the[offset , endOffset)
It would be discontinuous, so it needs to be realigned here.
For example, in the case shown below, the original components array contains five instances of Component at positions 0 - 4, which were originally contiguous with each other.[offset , endOffset)
。
Now we want to insert two new Component instances at positions 3 and 4, so the original components[3] and components[4] need to be moved to the positions of components[5] and components[6], but shiftComps is only responsible for moving them and not for realigning them.[offset , endOffset)
。
When a new Component instance is inserted, the[offset , endOffset)
It would be discontinuous. So it is necessary to pass theupdateComponentOffsets
Method readjustment.
private void updateComponentOffsets(int cIndex) {
int size = componentCount;
if (size <= cIndex) {
return;
}
// recalibration components[5] ,components[6] inter- [offset , endOffset)
int nextIndex = cIndex > 0 ? components[cIndex - 1].endOffset : 0;
for (; cIndex < size; cIndex++) {
Component c = components[cIndex];
// recalibration Component (used form a nominal expression) offset , endOffset
(nextIndex);
nextIndex = ;
}
}
void reposition(int newOffset) {
int move = newOffset - offset;
endOffset += move;
srcAdjustment -= move;
adjustment -= move;
offset = newOffset;
}
The above is the normal logic, if there is a break or an exception during the execution of the for loop, then the value of ci must be less than the value ofcIndex + count
What does it mean? What does it mean?
Let's say we want to add an array of components to thecIndex = 0
The position of the insertion of thecount = 5
Component instances, but when the fourth Component is inserted, i.e., a break or exception occurs at components[3], then the for loop exits to the finally block here.
If the value of ci is 3 and the value of cIndex + count is 5, then an exception has occurred.
It's worth noting that the positions components[3] and components[4], which were vacated by the shiftComps method, will not hold any Component instances due to the exception.
This leaves a hole in the components array, so next we need to move the Component instances in components[5], components[6] back to components[3] and components[4].
ByteBufs in the ByteBuf array that have not been added to CompositeByteBuf need to be released due to an exception.
2.8.6 consolidateIfNeeded
So far an empty CompositeByteBuf has been filled, but there is a problem in that the number of Component instances that can be contained in a CompositeByteBuf is limited by maxNumComponents.
Looking back at the entire addComponents process, it seems that nowhere is there a limit on the number of components, and the components array is even expanded in the shiftComps method.
In this case, the number of components may exceed the maxNumComponents limit. If the number of components in the current CompositeByteBuf already exceeds maxNumComponents, then you need to add a new component in theconsolidate0
method to merge all components.
private void consolidateIfNeeded() {
int size = componentCount;
// If the current component The number of individuals has exceeded maxNumComponents,then all of the component merge into one
if (size > maxNumComponents) {
consolidate0(0, size);
}
}
Here, Netty merges all the Components currently in the CompositeByteBuf into a single, larger Component, and after the merge, the CompositeByteBuf contains only one Component. The core logic of the merge is as follows:
-
Re-claim a larger ByteBuf based on the current capacity of the CompositeByteBuf, which needs to hold all the bytes that the CompositeByteBuf can represent.
-
Transfer all the contents stored in the underlying buf of the Component to the new ByteBuf and free the memory of the original buf.
-
Removes all Components from the Components array.
-
Creates a new Component instance based on the new ByteBuf and places it in the first position of the components array.
private void consolidate0(int cIndex, int numComponents) {
if (numComponents <= 1) {
return;
}
// commander-in-chief (military) [cIndex , endCIndex) inter- Components merge into one
final int endCIndex = cIndex + numComponents;
final int startOffset = cIndex != 0 ? components[cIndex].offset : 0;
// Calculation of the scope of consolidation Components The total number of bytes stored in the
final int capacity = components[endCIndex - 1].endOffset - startOffset;
// Reapply for a new ByteBuf
final ByteBuf consolidated = allocBuffer(capacity);
// commander-in-chief (military)合并范围内的 Components All of the data in the program will be transferred to the new ByteBuf center
for (int i = cIndex; i < endCIndex; i ++) {
components[i].transferTo(consolidated);
}
lastAccessed = null;
// After the data transfer is complete,commander-in-chief (military)合并之前的这些 components removing
removeCompRange(cIndex + 1, endCIndex);
// commander-in-chief (military)合并之后的新 Component stored in cIndex location
components[cIndex] = newComponent(consolidated, 0);
if (cIndex != 0 || numComponents != componentCount) {
// in the event that cIndex not from 0 initial,Then update it. newComponent relevant offset
updateComponentOffsets(cIndex);
}
}
2.8.7 Application of CompositeByteBuf
When we use the TCP protocol to transfer data at the transport layer, we often encounter the problem of half-packets or sticky packets, and the ByteBuf we read from the socket may not be a complete packet, in which case we need to accumulate the ByteBufs we read from the socket in the userland cache each time.
When the accumulated ByteBuf reaches a complete packet, we read the bytes from the cached ByteBuf, decode them, and pass the decoded objects back along the pipeline.
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
// cached ByteBuf
ByteBuf cumulation;
// ByteBuf cumulative aggregator
private Cumulator cumulator = MERGE_CUMULATOR;
// Is it the first time you receive a package
private boolean first;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
// Used to store objects after decoding
CodecOutputList out = ();
try {
// First time receiving a package.
first = cumulation == null;
// sweep the incoming (ByteBuf) msg With the previously cached cumulation accumulate
cumulation = ((),
first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);
// decoder
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
........ an omission ........
// decoder成功之后,就将decoder出来的对象沿着 pipeline backward propagation
fireChannelRead(ctx, out, size);
}
} else {
(msg);
}
}
}
Netty defines a Cumulator interface to aggregate the ByteBufs read from the socket. The parameter alloc is a ByteBuf allocator that can be used to request memory during the aggregation process if it involves expanding, merging, or other operations.
The parameter cumulation is the previously cached ByteBuf, when the packet is received for the first time, the cumulation here is an empty ByteBuf -- Unpooled.EMPTY_BUFFER.
The in parameter is the ByteBuf just read from the socket, which may be a half-packet, and the role of the Cumulator is to accumulate and merge the newly read ByteBuf (in) into the previously cached ByteBuf (cumulation).
public interface Cumulator {
ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in);
}
Netty provides two implementations of the Cumulator interface, one for MERGE_CUMULATOR and one for COMPOSITE_CUMULATOR.
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
public static final Cumulator MERGE_CUMULATOR
public static final Cumulator COMPOSITE_CUMULATOR
}
MERGE_CUMULATOR is Netty's default Cumulator, and is traditionally the most common implementation of aggregated ByteBufs. The core idea is that when aggregating multiple ByteBufs, you first request a larger piece of memory, and then copy all of the contents of the ByteBufs that need to be aggregated into the new Then all the contents of the ByteBufs to be aggregated will be copied into the new ByteBuf. Then the original ByteBuf is freed.
The effect is to re-aggregate multiple ByteBufs into a larger ByteBuf, but this approach involves the overhead of memory requests and memory copies, with the advantage that the memory is all contiguous and fast to read.
Another implementation is COMPOSITE_CUMULATOR, which is also the subject of this subsection, and its core idea is to aggregate multiple ByteBufs into a single CompositeByteBuf, with no additional memory request, much less a copy of memory.
However, since CompositeByteBuf is only a logical view ByteBuf, its underlying memory is still dependent on the original ByteBuf, so the memory in CompositeByteBuf is not contiguous, in addition to the complexity of the design of the index associated with the CompositeByteBuf, it may be a little slower than MERGE_CUMULATOR in terms of read speed, so we need to consider the trade-offs according to their own scenarios and choose flexibly. In addition, the index of CompositeByteBuf is designed to be more complex, so it may be slower than MERGE_CUMULATOR in terms of reading speed, so we need to consider the trade-offs according to our own scenarios, and choose flexibly.
public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() {
@Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
If (! ()) {
// The previously cached one has been decoded, so release it here and start cumulating from in again.
(); return in; // The previous cache has been decoded and is released here and re-accumulated from in.
return in;
}
CompositeByteBuf composite = null;
try {
// cumulation is a CompositeByteBuf, which means that cumulation was previously an aggregated ByteBuf.
if (cumulation instanceof CompositeByteBuf && () == 1) {
composite = (CompositeByteBuf) cumulation; // Here we need to make sure that the CompositeByteBuf is not a ByteBuf.
// We need to make sure that the writerIndex of the CompositeByteBuf is equal to the capacity.
// because we need to add a new ByteBuf to the end of the CompositeByteBuf each time it is cumulated
if (() ! = ()) {
(()); }
}
} else {
// If the cumulation is not a CompositeByteBuf, but just a regular ByteBuf, then the cumulation has not been aggregated before, so we need to create an empty CompositeByteBuf first.
// if cumulation is not a CompositeByteBuf, but just a regular ByteBuf, cumulation hasn't been aggregated before, and this is the first time it's been aggregated, so we need to create an empty CompositeByteBuf first
// Then add the cumulation to the CompositeByteBuf
composite = (Integer.MAX_VALUE).addFlattenedComponents(true, cumulation);
}
// Add this newly received ByteBuf (in) to cumulation in CompositeByteBuf
(true, in); // add this newly received ByteBuf (in) to the CompositeByteBuf.
in = null; return composite; }
return composite.
} finally {
........ Omit handling of aggregation failures ..........
}
}
}
3. Heap or Direct
In the previous subsections, we discussed many of the details of ByteBuf's design, so let's step out of those details and take a fresh look at ByteBuf's overall design from a global perspective.
In the ByteBuf design system, Netty divides the system into two categories, HeapByteBuf and DirectByteBuf, from the perspective of ByteBuf memory layout.()
method to specify whether Direct Memory allocation is favored by default.
public final class PlatformDependent {
// Is there a preference for distribution Direct Memory
private static final boolean DIRECT_BUFFER_PREFERRED;
public static boolean directBufferPreferred() {
return DIRECT_BUFFER_PREFERRED;
}
}
For DIRECT_BUFFER_PREFERRED to be true, both of the following conditions must be met:
-
-
parameter must be specified as false (the default). -
CLEANER is not NULL, which means that the JDK needs to contain a valid CLEANER mechanism.
static {
DIRECT_BUFFER_PREFERRED = CLEANER != NOOP
&& !("", false);
if (()) {
("-: {}", !DIRECT_BUFFER_PREFERRED);
}
}
If it is Android platform, then CLEANER is directly NOOP, will not do any judgment, the default is directly go to Heap Memory , unless specifically specified to go to Direct Memory.
if (!isAndroid()) {
if (javaVersion() >= 9) {
// Check to see if the class contains a valid invokeCleaner method.
CLEANER = () ? new CleanerJava9() : NOOP;
} else {
// Check if the cleaner field is included in the class.
CLEANER = () ? new CleanerJava6() : NOOP; } else { // Check if the cleaner field is included in CLEANER = () ?
}
} else {
CLEANER = NOOP; }
}
If it's JDK 9 or above, Netty checks to see if it can be passed through the (used form a nominal expression)
invokeCleaner
method correctly executes DirectBuffer's Cleaner, and if an exception occurs during execution, then CLEANER is NOOP and Netty goes Heap Memory by default.
public final class Unsafe {
public void invokeCleaner( directBuffer) {
if (!())
throw new IllegalArgumentException("buffer is non-direct");
(directBuffer);
}
}
If it is a version of JDK 9 or below, Netty will first get the cleaner field of DirectByteBuffer by reflection, if the cleaner is null or an exception occurs during the execution of the cleaner method, then the CLEANER will be NOOP, and Netty will go to Heap Memory.
class DirectByteBuffer extends MappedByteBuffer implements DirectBuffer
{
private final Cleaner cleaner;
DirectByteBuffer(int cap) { // package-private
...... an omission .....
base = (size);
cleaner = (this, new Deallocator(base, size, cap));
}
}
in the event that()
method returns true , then ByteBufAllocator next allocates the directBuffer by default when allocating memory.
public final class UnpooledByteBufAllocator extends AbstractByteBufAllocator {
// ByteBuf dispenser (for consumables such as liquid soap)
public static final UnpooledByteBufAllocator DEFAULT =
new UnpooledByteBufAllocator(());
}
public abstract class AbstractByteBufAllocator implements ByteBufAllocator {
// Whether or not it is assigned by default directBuffer
private final boolean directByDefault;
protected AbstractByteBufAllocator(boolean preferDirect) {
directByDefault = preferDirect && ();
}
@Override
public ByteBuf buffer() {
if (directByDefault) {
return directBuffer();
}
return heapBuffer();
}
}
In general, the JDK includes a valid CLEANER mechanism, so it's possible to just pass the-
(default false) to control Netty to go Direct Memory by default.
But if it's an Android platform, then whatever-
How to set it up, Netty goes Heap Memory by default.
4. Cleaner or NoCleaner
From a memory reclamation point of view, Netty categorizes ByteBufs into DirectByteBufs with cleaners and DirectByteBufs without cleaners. In the previous articleHow the JVM implements Reference semantics, using ZGC as an example. In the third subsection, I described in detail how the JVM utilizes the Cleaner mechanism to reclaim the Native Memory behind the DirectByteBuffer.
The Cleaner's recovery of DirectByteBuffer's Native Memory depends on the occurrence of GC, when a DirectByteBuffer doesn't have any strong or soft references, if there is a GC at that time, the Cleaner will recover the Native Memory, if there is no GC for a long time, then the Native Memory referenced by these DirectByteBuffers will never be released. If no GC occurs for a long time, the Native Memory referenced by the DirectByteBuffer will never be released.
So just relying on the Cleaner to free up Native Memory has a certain delay, and in extreme cases, if you can't wait for a GC, you're likely to get an OOM.
Netty's ByteBuf design is a complete extension of the NIO ByteBuffer, and its underlying dependency is on a JDK ByteBuffer, for example, UnpooledDirectByteBuf, UnpooledUnsafeDirectByteBuf, and UnpooledUnsafeUnsafeUnsafeUnsafeUnsafeUnsafeUnsafeDirectByteBuf. UnpooledDirectByteBuf and UnpooledUnsafeDirectByteBuf have an underlying dependency on the JDK DirectByteBuffer, which is a ByteBuf with a Cleaner.
public class UnpooledDirectByteBuf extends AbstractReferenceCountedByteBuf {
// underlying JDK DirectByteBuffer
ByteBuffer buffer;
public UnpooledDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
// establish DirectByteBuffer
setByteBuffer(allocateDirect(initialCapacity), false);
}
protected ByteBuffer allocateDirect(int initialCapacity) {
return (initialCapacity);
}
public class UnpooledUnsafeDirectByteBuf extends UnpooledDirectByteBuf {
// underlying JDK DirectByteBuffer memory address
long memoryAddress;
public UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
// Calling the parent class UnpooledDirectByteBuf 构建函数创建underlying JDK DirectByteBuffer
super(alloc, initialCapacity, maxCapacity);
}
@Override
final void setByteBuffer(ByteBuffer buffer, boolean tryFree) {
(buffer, tryFree);
// gain JDK DirectByteBuffer memory address
memoryAddress = (buffer);
}
In JDK NIO, anything that is passed through the method to the DirectByteBuffer are with Cleaer.
public abstract class ByteBuffer {
public static ByteBuffer allocateDirect(int capacity) {
return new DirectByteBuffer(capacity);
}
}
class DirectByteBuffer extends MappedByteBuffer implements DirectBuffer
{
private final Cleaner cleaner;
DirectByteBuffer(int cap) { // package-private
...... an omission .....
// applied through this constructor Direct Memory would be affected by -XX:MaxDirectMemorySize Limitations of parameters
(size, cap);
// primitive call malloc Request Memory
base = (size);
...... an omission .....
// establish Cleaner
cleaner = (this, new Deallocator(base, size, cap));
}
}
The Direct Memory that can be referenced behind a DirectByteBuffer with a Cleaner is the Direct Memory that is subject to the-XX:MaxDirectMemorySize
JVM Parameter LimitationsThe following is an example of a GC. Since UnpooledDirectByteBuf and UnpooledUnsafeDirectByteBuf both come with a Cleaner, the Cleaner frees up their Direct Memory if a GC occurs when they don't have any strong or soft references in the system.
Since Cleaner execution will rely on GC, and GC is often not timely, there will be a certain delay, so Netty in order to release Direct Memory in a timely manner, often choose not to rely on the JDK's Cleaner mechanism, and manually release. So there is a NoCleaner type DirectByteBuf -- UnpooledUnsafeNoCleanerDirectByteBuf.
class UnpooledUnsafeNoCleanerDirectByteBuf extends UnpooledUnsafeDirectByteBuf {
@Override
protected ByteBuffer allocateDirect(int initialCapacity) {
// Creates no Cleaner (used form a nominal expression) JDK DirectByteBuffer
return (initialCapacity);
}
@Override
protected void freeDirect(ByteBuffer buffer) {
// Since there's no more Cleaner , the reason why Netty To release manually
(buffer);
}
}
The underlying UnpooledUnsafeNoCleanerDirectByteBuf also relies on a JDK DirectByteBuffer, but unlike the previous one, the DirectByteBuffer here does not have a cleaner.
We call it via JNI.DirectByteBuffer(long addr, int cap)
The JDK DirectByteBuffer created by the constructor is not cleaner.However, the Native Memory referenced behind the DirectByteBuffer created in this way is not affected by the-XX:MaxDirectMemorySize
JVM Parameter Limitations。
class DirectByteBuffer {
// Invoked only by JNI: NewDirectByteBuffer(void*, long)
private DirectByteBuffer(long addr, int cap) {
super(-1, 0, cap, cap, null);
address = addr;
// cleaner because of null
cleaner = null;
}
}
Since there is no cleaner, Netty can't rely on GC to free Direct Memory, which requires Netty to manually call thefreeDirect
method to release Direct Memory in a timely manner.
In fact, whether or not there is a cleaner for the DirectByteBuf in Netty, Netty chooses to release it manually, in order to avoid GC delays and to release the Direct Memory in a timely manner.
So when does DirectByteBuf in Netty come with a Cleaner and when does it come without? We can do this with the method's return value to make a judgment:
public final class PlatformDependent {
// Netty (used form a nominal expression) DirectByteBuf with or without Cleaner
private static final boolean USE_DIRECT_BUFFER_NO_CLEANER;
public static boolean useDirectBufferNoCleaner() {
return USE_DIRECT_BUFFER_NO_CLEANER;
}
}
-
USE_DIRECT_BUFFER_NO_CLEANER = TRUE means that the DirectByteBuf created by Netty does not have a cleaner. The amount of Direct Memory is not affected by the JVM parameter -
XX:MaxDirectMemorySize
The limitations of the -
USE_DIRECT_BUFFER_NO_CLEANER = FALSE means that the DirectByteBuf created by Netty has a cleaner. The amount of Direct Memory is affected by the JVM parameter -
XX:MaxDirectMemorySize
The limitations of the
We can do this through the-
to set the value of USE_DIRECT_BUFFER_NO_CLEANER, which, among other things, specifies the maximum DirectMemory usage that can be used at the Netty level.
= 0
Then USE_DIRECT_BUFFER_NO_CLEANER is FALSE , meaning that DirectByteBufs created at the Netty level are cleaner.In this case, Netty doesn't limit the amount of maxDirectMemory, because it's useless to do so, and the exact amount of maxDirectMemory that can be used is determined by the JVM parameters.-XX:MaxDirectMemorySize
intent。
< 0
In this case, the default value is -1, which means that by default USE_DIRECT_BUFFER_NO_CLEANER is TRUE, and DirectByteBufs are created without a cleaner. In this case, the maxDirectMemory amount is not affected by the JVM parameter-XX:MaxDirectMemorySize
limit, so the maxDirectMemory usage must be limited at the Netty level, and the default value is the-XX:MaxDirectMemorySize
The value specified.
Special attention should be paid here toThe maxDirectMemory limit at the Netty level and the maxDirectMemory limit at the JVM level are calculated separately and do not affect each other. Therefore, from the perspective of a JVM process, the overall maxDirectMemory usage is-XX:MaxDirectMemorySize
Twice as many.
> 0
case is the same as the less-than-0 case, the only difference being that the Netty-level maxDirectMemory usage is exclusively controlled by the-
parameter, which is still calculated independently of the JVM-level maxDirectMemory limit.
So on that level, another reason Netty designed the NoCleaner type of DirectByteBuf was to break the JVM's limit on maxDirectMemory usage.。
public final class PlatformDependent {
// Netty-level Direct Memory usage statistics.
// NULL means that there are no special limits at the Netty level, and the JVM limits Direct Memory usage entirely.
private static final AtomicLong DIRECT_MEMORY_COUNTER; // Netty-level Direct Memory statistics.
// Maximum amount of Direct Memory at the Netty level.
private static final long DIRECT_MEMORY_LIMIT; // JVM-specified -XXX limit.
// JVM-specified -XX:MaxDirectMemorySize maximum off-heap memory.
private static final long MAX_DIRECT_MEMORY = maxDirectMemory0(); // JVM specified -XX:MaxDirectMemorySize maximum off-heap memory.
static {
long maxDirectMemory = ("", -1);
if (maxDirectMemory == 0 || !hasUnsafe() || ! ()) {
// maxDirectMemory = 0 means that subsequent DirectBuffers are created with a cleaner, Netty itself does not force a limit on maxDirectMemory, leaving it up to the JDK's maxDirectMemory to do so.
// Netty doesn't limit maxDirectMemory, it leaves it up to the JDK's maxDirectMemory limitation, because Netty still relies on the JDK DirectBuffer (Cleaner), which limits maxDirectMemory.
// In the absence of Unsafe, Cleaner must be used, because without Cleaner and Unsafe, we can't free Native Memory.
// If the JDK doesn't include a constructor to create a NoCleaner DirectBuffer -- DirectByteBuffer(long, int) -- then naturally we have to use the Cleaner.
USE_DIRECT_BUFFER_NO_CLEANER = false;
// Netty doesn't keep track of Direct Memory usage, the JDK does that.
DIRECT_MEMORY_COUNTER = null; } else { Netty does not count Direct Memory.
} else {
USE_DIRECT_BUFFER_NO_CLEANER = true; if (maxDirectMemory_CLEANER = false; // Netty does not count Direct Memory.
if (maxDirectMemory < 0) {
// maxDirectMemory < 0 (default -1) create NoCleaner DirectBuffer later
// The Netty level limits the maxDirectMemory usage separately, with maxDirectMemory being the same value as -XX:MaxDirectMemorySize.
// Because the JDK doesn't count and limit NoCleaner DirectBuffer usage.
// Note that Netty's maxDirectMemory and JDK's maxDirectMemory are counted separately here.
// In terms of JVM processes, the overall maxDirectMemory usage is twice the -XX:MaxDirectMemorySize (the sum of Netty and JDK)
maxDirectMemory = MAX_DIRECT_MEMORY; if (maxDirectMemory = MAX_DIRECT_MEMORY;)
if (maxDirectMemory <= 0) {
DIRECT_MEMORY_COUNTER = null;
} else {
// Count the Netty DirectMemory usage.
DIRECT_MEMORY_COUNTER = new AtomicLong(); } else { // Count Netty DirectMemory usage.
}
} else {
// maxDirectMemory > 0 The NoCleaner DirectBuffer is created later, and the Netty layer's maxDirectMemory is the specified value.
DIRECT_MEMORY_COUNTER = new AtomicLong(); }
}
}
("-: {} bytes", maxDirectMemory);
DIRECT_MEMORY_LIMIT = maxDirectMemory >= 1 ? maxDirectMemory : MAX_DIRECT_MEMORY;
}
}
When the direct memory usage at the Netty level exceeds the-
parameter, then it throws theOutOfDirectMemoryError
The allocation of DirectByteBuf will fail.
private static void incrementMemoryCounter(int capacity) {
if (DIRECT_MEMORY_COUNTER != null) {
long newUsedMemory = DIRECT_MEMORY_COUNTER.addAndGet(capacity);
if (newUsedMemory > DIRECT_MEMORY_LIMIT) {
DIRECT_MEMORY_COUNTER.addAndGet(-capacity);
throw new OutOfDirectMemoryError("failed to allocate " + capacity
+ " byte(s) of direct memory (used: " + (newUsedMemory - capacity)
+ ", max: " + DIRECT_MEMORY_LIMIT + ')');
}
}
}
5. Unsafe or NoUnsafe
In terms of memory access, Netty divides ByteBuf into two categories, Unsafe and NoUnsafe, where NoUnsafe memory access is dependent on the underlying JDK ByteBuffer, and any operations on the Netty ByteBuf will ultimately be proxied to the underlying JDK's ByteBuffer.
public class UnpooledDirectByteBuf extends AbstractReferenceCountedByteBuf {
// underlying JDK DirectByteBuffer
ByteBuffer buffer;
@Override
protected byte _getByte(int index) {
return (index);
}
@Override
protected void _setByte(int index, int value) {
(index, (byte) value);
}
}
Unsafe, on the other hand, is accessed via the class provides many low-level direct buffer access APIs to directly access the memory address, because it is out of the JVM specification to directly access the memory address, so we need to consider the details of the JVM and the OS when calling the Unsafe related methods, and if we are not careful, we will step on the pit and make a mistake, so it's an unsafe access method, but flexible enough to be efficient. safe access method, but it is flexible and efficient enough.
public class UnpooledUnsafeDirectByteBuf extends UnpooledDirectByteBuf {
// underlying JDK DirectByteBuffer memory address
long memoryAddress;
@Override
protected byte _getByte(int index) {
return (addr(index));
}
final long addr(int index) {
// Direct access via memory address
return memoryAddress + index;
}
@Override
protected void _setByte(int index, int value) {
(addr(index), value);
}
}
Netty provides-
parameter allows us to decide whether or not to use Unsafe memory access. The default value is false , which means that Netty enables Unsafe access by default.
final class PlatformDependent0 {
// Whether explicitly prohibited Unsafe,null Indicates that the Unsafe
private static final Throwable EXPLICIT_NO_UNSAFE_CAUSE = explicitNoUnsafeCause0();
private static Throwable explicitNoUnsafeCause0() {
final boolean noUnsafe = ("", false);
("-: {}", noUnsafe);
if (noUnsafe) {
(": unavailable ()");
return new UnsupportedOperationException(": unavailable ()");
}
return null;
}
}
Once we've confirmed that Unsafe is turned on, we need to take the next step of checking to make sure that the class, whether the Unsafe instance -- theUnsafe -- can be obtained by reflection.
public final class Unsafe {
// Unsafe an actual example
private static final Unsafe theUnsafe = new Unsafe();
}
final class PlatformDependent0 {
// validate (a theory) Unsafe availability,null indicate Unsafe Yes, it's available.
private static final Throwable UNSAFE_UNAVAILABILITY_CAUSE;
static {
// Try to get it by reflection theUnsafe an actual example
final Object maybeUnsafe = (new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
final Field unsafeField = ("theUnsafe");
Throwable cause = (unsafeField, false);
if (cause != null) {
return cause;
}
// the unsafe instance
return (null);
} catch (NoSuchFieldException e) {
return e;
} catch (SecurityException e) {
return e;
} catch (IllegalAccessException e) {
return e;
} catch (NoClassDefFoundError e) {
// Also catch NoClassDefFoundError in case someone uses for example OSGI and it made
// Unsafe unloadable.
return e;
}
}
});
}
}
After getting the Unsafe instance, we also need to check that the Unsafe contains all the low-level direct buffer access APIs used by Netty, to make sure that they work properly and efficiently. For example, does it contain thecopyMemory
Methods.
public final class Unsafe {
@ForceInline
public void copyMemory(Object srcBase, long srcOffset,
Object destBase, long destOffset,
long bytes) {
(srcBase, srcOffset, destBase, destOffset, bytes);
}
}
Is it possible to access the address field of the NIO Buffer via Unsafe, because later we need to manipulate the memory address directly.
public abstract class Buffer {
// memory address
long address;
}
If any exception occurs during the entire process, it means that there is no class or due to the design of different versions of the JDK, Unsafe does not have some of the necessary access APIs that Netty needs. This prevents us from using Unsafe, and the memory access method needs to fall back to NoUnsafe.
if (maybeUnsafe instanceof Throwable) {
unsafe = null;
unsafeUnavailabilityCause = (Throwable) maybeUnsafe;
(": unavailable", (Throwable) maybeUnsafe);
} else {
unsafe = (Unsafe) maybeUnsafe;
(": available");
}
// because of null indicate Unsafe usability
UNSAFE_UNAVAILABILITY_CAUSE = unsafeUnavailabilityCause;
UNSAFE = unsafe;
If no exceptions are thrown throughout the process and we get a valid UNSAFE instance, then the Unsafe approach to memory access will be officially turned on later.
final class PlatformDependent0 {
static boolean hasUnsafe() {
return UNSAFE != null;
}
}
completehasUnsafe()
The judgment logic is as follows:
-
Unsafe cannot be enabled if the current platform is Android or .
Class.
-
-
parameter needs to be set to false (on by default).
3... Does the current classpath contain a valid Class.
- Unsafe instances need to include the necessary access APIs.
public final class PlatformDependent {
private static final Throwable UNSAFE_UNAVAILABILITY_CAUSE = unsafeUnavailabilityCause0();
public static boolean hasUnsafe() {
return UNSAFE_UNAVAILABILITY_CAUSE == null;
}
private static Throwable unsafeUnavailabilityCause0() {
if (isAndroid()) {
(": unavailable (Android)");
return new UnsupportedOperationException(": unavailable (Android)");
}
if (isIkvmDotNet()) {
(": unavailable ()");
return new UnsupportedOperationException(": unavailable ()");
}
Throwable cause = ();
if (cause != null) {
return cause;
}
try {
boolean hasUnsafe = ();
(": {}", hasUnsafe ? "available" : "unavailable");
return hasUnsafe ? null : ();
} catch (Throwable t) {
("Could not determine if Unsafe is available", t);
// Probably failed to initialize PlatformDependent0.
return new UnsupportedOperationException("Could not determine if Unsafe is available", t);
}
}
}
in the event that()
method returns true , then Netty creates a ByteBuf of type Unsafe at a later date.
6. Pooled or Unpooled
In terms of memory management, Netty divides ByteBufs into two categories: Pooled and Unpooled, where Unpooled ByteBufs are created temporarily when they're needed and then freed when they're finished.
The overhead of applying and releasing Direct Memory is much larger than that of Heap Memory, and Netty faces highly concurrent network communication scenarios, where applying and releasing Direct Memory is a very frequent operation, and this large number of frequent memory applications and releases has a huge impact on the program's performance, which is why Netty introduces memory pooling to unify the management of Direct Memory. Netty therefore introduces a memory pool to unify and pool the Direct Memory.
Netty provides-
parameter lets us decide whether to use a memory pool to manage ByteBufs, and the default value ispooled
, which means that Netty manages PooledByteBuf by default using pooling. For Android, the default is to use an unpooled ByteBuf.
-
When the parameter
is pooled, Netty's default ByteBufAllocator is
。
-
When the parameter
is unpooled, Netty's default ByteBufAllocator is
。
public final class ByteBufUtil {
// default (setting) PooledByteBufAllocator,Pooling management ByteBuf
static final ByteBufAllocator DEFAULT_ALLOCATOR;
static {
// default (setting)为 pooled
String allocType = (
"", () ? "unpooled" : "pooled");
allocType = ().trim();
ByteBufAllocator alloc;
if ("unpooled".equals(allocType)) {
alloc = ;
("-: {}", allocType);
} else if ("pooled".equals(allocType)) {
alloc = ;
("-: {}", allocType);
} else {
alloc = ;
("-: pooled (unknown: {})", allocType);
}
DEFAULT_ALLOCATOR = alloc;
}
}
When Netty creates a SocketChannel, the ByteBufAllocator specified in the SocketChannelConfig will be the same as the one in theByteBufUtil.DEFAULT_ALLOCATOR
, defaults to PooledByteBufAllocator.
public interface ByteBufAllocator {
ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR;
}
public class DefaultChannelConfig implements ChannelConfig {
// PooledByteBufAllocator
private volatile ByteBufAllocator allocator = ;
}
When Netty reads network data from a socket, it first gets the ByteBufAllocator from DefaultChannelConfig, then uses the ByteBufAllocator to get a DirectByteBuf from the memory pool, and finally reads the data from the socket into the Finally, it reads the data from the socket into the DirectByteBuf and then propagates it backward along the pipeline for IO processing.
protected class NioByteUnsafe extends AbstractNioUnsafe {
@Override
public final void read() {
// gain SocketChannelConfig
final ChannelConfig config = config();
// gain ByteBufAllocator , default PooledByteBufAllocator
final ByteBufAllocator allocator = ();
// 从内存池中gain byteBuf
byteBuf = (allocator);
// retrieve socket The data in the byteBuf
(doReadBytes(byteBuf));
// commander-in-chief (military) byteBuf stroll along pipeline backward propagation
(byteBuf);
....... an omission .......
}
}
In addition to this, Netty provides option, which gives us the flexibility to specify a custom ByteBufAllocator for the SocketChannel when configuring ServerBootstrap.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
(bossGroup, workerGroup)
// Flexible Configuration ByteBufAllocator
.childOption(, ;);
Configuring Socket-related properties via ChannelOption is the highest priority here, and overrides any default configuration.
7. Metric
In the fourth subsection, we introduced the Cleaner and NoCleaner DirectByteBufs, where the overall Direct Memory usage of the CleanerDirectByteBuf is affected by the JVM parameter-XX:MaxDirectMemorySize
The overall Direct Memory of NoCleanerDirectByteBuf can break the limitation of this parameter, and the JVM will not count the usage of this Direct Memory.
Netty usually selects NoCleanerDirectByteBuf by default in order to release this Direct Memory in time, which requires Netty to set its own limit on the amount of Direct Memory available. Memory of the NoCleanerDirectByteBuf can be set with the-
is specified, which by default is equal to-XX:MaxDirectMemorySize
The value set.
PlatformDependent classDIRECT_MEMORY_COUNTER
field is used to count the size of Direct Memory used by all NoCleanerDirectByteBufs at the Netty level. Note that the Direct Memory usage of CleanerDirectByteBuf is not counted here, that is the responsibility of the JVM.
public final class PlatformDependent {
// For statistics NoCleaner (used form a nominal expression) DirectByteBuf 所引用(used form a nominal expression) Native Memory adults and children
private static final AtomicLong DIRECT_MEMORY_COUNTER;
public static ByteBuffer allocateDirectNoCleaner(int capacity) {
// rise Native Memory usage statistics
incrementMemoryCounter(capacity);
try {
// allocate Native Memory
// initialization NoCleaner (used form a nominal expression) DirectByteBuffer
return (capacity);
} catch (Throwable e) {
decrementMemoryCounter(capacity);
throwException(e);
return null;
}
public static void freeDirectNoCleaner(ByteBuffer buffer) {
int capacity = ();
// liberate (a *er) Native Memory
((buffer));
// minimize Native Memory usage statistics
decrementMemoryCounter(capacity);
}
}
The PlatformDependent class is the lowest level of Netty. All memory allocation and release actions are ultimately performed in this class, so the DIRECT_MEMORY_COUNTER field counts the global Direct Memory size (at the Netty level).
Each memory request -- allocateDirectNoCleaner -- increases the DIRECT_MEMORY_COUNTER count, and each memory release -- freeDirectNoCleaner -- decreases the DIRECT_MEMORY_COUNTER count. freeDirectNoCleaner, decreases the DIRECT_MEMORY_COUNTER count.
We can do this through the()
method to get the size of the Direct Memory currently occupied by Netty. However, if we specifically specify that we need to use CleanerDirectByteBuf, for example, by setting the-
The parameters are set to0
, then -1 will be returned here.
private static void incrementMemoryCounter(int capacity) {
// statistic NoCleaner (used form a nominal expression) DirectByteBuf 所引用(used form a nominal expression) Native Memory
if (DIRECT_MEMORY_COUNTER != null) {
long newUsedMemory = DIRECT_MEMORY_COUNTER.addAndGet(capacity);
if (newUsedMemory > DIRECT_MEMORY_LIMIT) {
DIRECT_MEMORY_COUNTER.addAndGet(-capacity);
throw new OutOfDirectMemoryError("failed to allocate " + capacity
+ " byte(s) of direct memory (used: " + (newUsedMemory - capacity)
+ ", max: " + DIRECT_MEMORY_LIMIT + ')');
}
}
}
private static void decrementMemoryCounter(int capacity) {
if (DIRECT_MEMORY_COUNTER != null) {
long usedMemory = DIRECT_MEMORY_COUNTER.addAndGet(-capacity);
assert usedMemory >= 0;
}
}
public static long usedDirectMemory() {
return DIRECT_MEMORY_COUNTER != null ? DIRECT_MEMORY_COUNTER.get() : -1;
}
In addition to the global statistics here at PlatformDependent, Netty provides memory usage statistics at the granularity of ByteBufAllocator, with dimensions for Heap Memory usage and Direct Memory usage.
public final class UnpooledByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocatorMetricProvider {
// made from that ByteBufAllocator Allocated Memory Statistics
private final UnpooledByteBufAllocatorMetric metric = new UnpooledByteBufAllocatorMetric();
@Override
public ByteBufAllocatorMetric metric() {
return metric;
}
// statisticians Direct Memory occupancy
void incrementDirect(int amount) {
(amount);
}
void decrementDirect(int amount) {
(-amount);
}
// statisticians Heap Memory occupancy
void incrementHeap(int amount) {
(amount);
}
void decrementHeap(int amount) {
(-amount);
}
}
Each ByteBufAllocator defined by Netty has a field of type ByteBufAllocatorMetric, which defines two counting fields: directCounter and heapCounter. These fields are used for counting direct memory and heap memory respectively. memory, respectively.
private static final class UnpooledByteBufAllocatorMetric implements ByteBufAllocatorMetric {
final LongCounter directCounter = ();
final LongCounter heapCounter = ();
@Override
public long usedHeapMemory() {
return ();
}
@Override
public long usedDirectMemory() {
return ();
}
@Override
public String toString() {
return (this) +
"(usedHeapMemory: " + usedHeapMemory() + "; usedDirectMemory: " + usedDirectMemory() + ')';
}
}
Therefore, from the point of view of memory usage statistics, Netty will also divide the entire ByteBuf system into Instrumented and NoInstrumented two categories, with Instrumented prefixed ByteBuf, regardless of whether you are Heap or Direct, Cleaner or NoCleaner, Unsafe or NoUnsafe type of ByteBuf, Netty will count this part of the memory usage. ByteBufs with the Instrumented prefix, whether you have a Heap or Direct, Cleaner or NoCleaner, Unsafe or NoUnsafe type of ByteBuf, Netty counts the memory usage.
private static final class InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf
extends UnpooledUnsafeNoCleanerDirectByteBuf {
InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(
UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
// generic UnpooledUnsafeNoCleanerDirectByteBuf
super(alloc, initialCapacity, maxCapacity);
}
// allocate,liberate (a *er) When updating the Direct Memory
@Override
protected ByteBuffer allocateDirect(int initialCapacity) {
ByteBuffer buffer = (initialCapacity);
((UnpooledByteBufAllocator) alloc()).incrementDirect(());
return buffer;
}
@Override
protected void freeDirect(ByteBuffer buffer) {
int capacity = ();
(buffer);
((UnpooledByteBufAllocator) alloc()).decrementDirect(capacity);
}
}
private static final class InstrumentedUnpooledUnsafeDirectByteBuf extends UnpooledUnsafeDirectByteBuf {
InstrumentedUnpooledUnsafeDirectByteBuf(
UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
// generic UnpooledUnsafeDirectByteBuf
super(alloc, initialCapacity, maxCapacity);
}
// allocate,liberate (a *er) When updating the Direct Memory
@Override
protected ByteBuffer allocateDirect(int initialCapacity) {
ByteBuffer buffer = (initialCapacity);
((UnpooledByteBufAllocator) alloc()).incrementDirect(());
return buffer;
}
@Override
protected void freeDirect(ByteBuffer buffer) {
int capacity = ();
(buffer);
((UnpooledByteBufAllocator) alloc()).decrementDirect(capacity);
}
}
8. ByteBufAllocator
In Netty, ByteBufs must be created through a ByteBufAllocator and cannot be created on their own by calling the ByteBuf constructor directly on the display.Netty defines two types of ByteBufAllocators:
-
PooledByteBufAllocator is responsible for pooling ByteBuf, which is the core of Netty's memory management, in the next article, I will introduce it in detail.
-
The UnpooledByteBufAllocator is responsible for allocating unpooled ByteBufs, temporarily requesting Native Memory from the OS when creating a ByteBuf, and then manually calling release to release the Native Memory to the OS in a timely manner after use.
-
parameter allows us to choose the type of ByteBufAllocator, the default value ispooled
, Netty uses pooling to manage ByteBufs by default.
public interface ByteBufAllocator {
// Defaults to PooledByteBufAllocator.
ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR; }
}
In addition to the above two official ByteBufAllocator definitions, we can also customize the ByteBufAllocator according to our own business scenarios, and then we can use the option to flexibly specify ByteBufAllocator as our own custom implementation.
Netty also provides a utility class for UnpooledByteBuf.Unpooled
The definition here implements many useful operations for ByteBuf, such as allocate, wrapped, copied, etc. Here I take the creation of DirectByteBuf as an example. Here I take the creation of DirectByteBuf as an example:
public final class Unpooled {
private static final ByteBufAllocator ALLOC = ;
public static ByteBuf directBuffer() {
return ();
}
}
Unpooled relies on the UnpooledByteBufAllocator at the bottom, and all the creation of ByteBufs will eventually be proxied to this Allocator. In the creation process of DirectBuffer, we can see all the types of ByteBuf introduced earlier.
public final class UnpooledByteBufAllocator {
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
final ByteBuf buf;
if (()) {
buf = noCleaner ? new InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(this, initialCapacity, maxCapacity) :
new InstrumentedUnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
} else {
buf = new InstrumentedUnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
// Whether to start memory leak detection,If activated it is additionally activated with the LeakAwareByteBuf Packaging Returns
return disableLeakDetector ? buf : toLeakAwareBuffer(buf);
}
}
-
First of all, all ByteBufs created by Netty come with Metric statistics, and specific ByteBuf types are prefixed with Instrumented.
-
If the current JRE environment supports Unsafe, then the ByteBuf will be operated by Unsafe (by default), and the specific ByteBuf type will be prefixed with Unsafe.
-
If we explicitly specify the NoCleaner type of DirectByteBuf (the default), then the created ByteBuf type will be prefixed with NoCleaner, and since there is no Cleaner, this requires that we manually release the ByteBuf when we are done using it.
-
If we turn on memory leak detection, then at the end of the creation process, Netty wraps the newly created ByteBuf in a LeakAwareByteBuf, and when the ByteBuf is GC'd, Netty determines if there is a memory leak by using the relevant reference counts to determine if there is a forgotten release. When the ByteBuf is GC'd, Netty will determine if a memory leak has occurred by using the relevant reference counts to determine if a release was forgotten.
summarize
In this paper, I analyze the overall design of ByteBuf in detail from eight perspectives, which are: the perspective of the distribution of memory areas, the perspective of memory management, the perspective of memory access, the perspective of memory recycling, the perspective of the Memory Statistics Metric, the perspective of the zero-copy, the perspective of reference counting, the perspective of the expansion.
So far, we just cleared the Netty memory management periphery of some of the obstacles, then the next article, I will take you deep into the core of memory management, thoroughly let you understand Netty's memory management mechanism. Well, the content of this article is here, we will see you in the next article ~ ~ ~~~!