preamble
ConcurrentLinkedQueue
It is an unbounded thread-safe queue based on linked nodes. This queue sorts the elements according to the FIFO (First In First Out) principle. The head of the queue is the element that has been in the queue for the longest time, while the tail of the queue is the most recently added element. New elements are always inserted into the tail of the queue, and queue fetch operations (such aspoll
maybepeek
), on the other hand, starts at the head of the queue.
As opposed to the traditionalLinkedList
Different.ConcurrentLinkedQueue
Uses an efficient non-blocking algorithm, called Lock-Free programming, which is thread-safe through atomic variables and CAS (Compare-And-Swap) operations, rather than through traditional locking mechanisms. This gives it excellent performance in highly concurrent scenarios.
can be viewed as a thread-safeLinkedList
, is a thread-safe unbounded queue, but theLinkedList
is a bi-directional linked table, andConcurrentLinkedQueue
is a unidirectional linked table.
ConcurrentLinkedQueue
Thread-safety lies in the cas operation used to set the head, tail, and next pointers, and the item and next variables in the node are all modified with volatile, which ensures the visibility of the variables in multiple threads. TheConcurrentLinkedQueue
of all read operations are lock-free, so it is possible that reads will be inconsistent.
application scenario
If the cost of locking the queue is high then it is appropriate to use a locklessConcurrentLinkedQueue
to replace. Ideal for scenarios with relatively high performance requirements and multiple threads reading and writing to the queue at the same time.
ConcurrentLinkedQueue does higher concurrency by being lockless and is a high performance queue, but theUsage scenarios are relatively less common than blocking queuesAfter all, the fetch data also have to keep going to the loop, not as good as the blocking design, but in the case of a particularly large amount of concurrency, it is a good choice, performance is much better, and the design of this queue is also particularly laborious, especially the use of improved algorithms and the handling of sentinels.
Primary methodology
ConcurrentLinkedQueue
A rich set of methods are provided to manipulate the queue, including:
-
offer(E e)
: Inserts the specified element into the end of this queue. -
add(E e)
: Inserts the specified element into the end of this queue (with theoffer
(the method has the same functionality, but throws an exception on failure). -
poll()
: Gets and removes the header of this queue, if this queue is empty, then returns thenull
。 -
peek()
: Gets but does not remove the head of this queue, if this queue is empty, returns thenull
。 -
size()
: Returns the number of elements in this queue. Note that the results returned by this method may not be accurate due to concurrency. If you need to get the exact number of elements in a concurrent environment, it is recommended that you use theThe atomic variables in the package are counted.
-
isEmpty()
: Check if this queue is empty. Compare this with thesize()
method is similar, and the results returned by this method can be inaccurate due to concurrency.
It is important to note that the use of thesize()
cap (a poem)isEmpty()
methods require special care, as their results may not be accurate. If an exact number of elements or empty queue detection is required, it is recommended to use additional synchronization mechanisms or atomic variables to achieve this.
underlying source code
Internal Classes of Classes
private static class Node<E> {
// elemental
volatile E item;
// nextdomain (taxonomy)
volatile Node<E> next;
/**
* Constructs a new node. Uses relaxed write because item can
* only be seen after publication via casNext.
*/
// constructor
Node(E item) {
// set upitemvalue of
(this, itemOffset, item);
}
// Compare and replaceitem(be) worth
boolean casItem(E cmp, E val) {
return (this, itemOffset, cmp, val);
}
void lazySetNext(Node<E> val) {
// set upnextdomain (taxonomy)value of,does not guarantee that the changes will be immediately visible to other threads
(this, nextOffset, val);
}
// Compare and replacenextdomain (taxonomy)value of
boolean casNext(Node<E> cmp, Node<E> val) {
return (this, nextOffset, cmp, val);
}
// Unsafe mechanics
// reflex mechanism
private static final UNSAFE;
// itemdomain (taxonomy)的偏移量
private static final long itemOffset;
// nextdomain (taxonomy)的偏移量
private static final long nextOffset;
static {
try {
UNSAFE = ();
Class<?> k = ;
itemOffset =
(("item"));
nextOffset =
(("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
Description: The Node class represents a chained table node for storing elements and contains an item field and a next field. The item field represents the element and the next field represents the next node, which utilizes the reflection mechanism and CAS mechanism to update the item field and the next field to ensure atomicity.
Class Attributes
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
implements Queue<E>, {
// version number
private static final long serialVersionUID = 196745693267521676L;
// reflex mechanism
private static final UNSAFE;
// headOffset of the field
private static final long headOffset;
// tailOffset of the field
private static final long tailOffset;
static {
try {
UNSAFE = ();
Class<?> k = ;
headOffset =
(("head"));
tailOffset =
(("tail"));
} catch (Exception e) {
throw new Error(e);
}
}
// head node
private transient volatile Node<E> head;
// end node (math.)
private transient volatile Node<E> tail;
}
Description: The attribute contains head field and tail field which represent the head node and tail node of the linked table, also ConcurrentLinkedQueue uses reflection mechanism and CAS mechanism to update the head node and tail node to ensure atomicity.
Class constructor
- ConcurrentLinkedQueue()
public ConcurrentLinkedQueue() {
// Initialize the head and tail nodes
head = tail = new Node<E>(null);
}
Description: This constructor creates an initially empty ConcurrentLinkedQueue with the head node and the tail node pointing to the same node with a null item field and a null next field.
- ConcurrentLinkedQueue(Collection<? extends E>)
public ConcurrentLinkedQueue(Collection<? extends E> c) {
Node<E> h = null, t = null;
for (E e : c) { // Iterate through the c collection
// Ensure that the element is not null
checkNotNull(e);
// Create a new node
Node<E> newNode = new Node<E>(e);
if (h == null) // head node is null
// Assign head and tail nodes
h = t = newNode;
else {
// Direct the next field of the head node
(newNode).
// Reassign the head node
t = newNode; }
}
}
if (h == null) // head node is null
// New head and tail nodes
h = t = new Node<E>(null);
// Assign the head node
head = h; // assign head node
// Assign the tail node
tail = t; }
}
Description: This constructor is used to create a ConcurrentLinkedQueue initially containing the elements of a given collection, adding elements in the order in which this collection iterator is traversed.
Core Function Analysis
offer function
public boolean offer(E e) {
// Check if e is null, if so throw a NullPointerException.
checkNotNull(e);
// Create a new node
final Node<E> newNode = new Node<E>(e);
// Add the "new node" to the end of the list.
for (Node<E> t = tail, p = t;;) { // This for loop is a dead loop, adding two pointers p, t.
Node<E> q = ;)
// Case 1: q is null, p is the tail node, and the new node is inserted
if (q == null) {
// CAS operation: if "p's next node is null" (i.e., p is the tail node), set p's next node to be newNode.
// If this CAS operation succeeds, compare "p and t" (if p is not equal to t, set newNode to be the new tail node) and return true.
// If the CAS operation fails, meaning "some other thread made changes to the tail node", loop again.
if ((null, newNode)) {
if (p ! = t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true; // Failure is OK.
return true; }
}
// Case 2: p and q are equal
else if (p == q)
p = (t ! = (t = tail)) ? p = (t ! = (t = tail)) ?
// Case 3: else
// Here we move the p pointer, which means we point to tail if p is not the last element, otherwise we point to q, which is the element.
else
p = (p ! = t && t ! = (t = tail)) ? t : q;
}
}
Description: offer function is used to insert the specified elements into the end of this queue. The following simulation of the operation of the offer function, the queue state changes (assuming a single thread to add the element, the continuous addition of 10, 20 two elements).
If the initial state of ConcurrentLinkedQueue is as shown above, i.e., the queue is empty. A single thread adds an element, at this point, add element 10, then the state is as follows
As shown in the above figure, after adding element 10, there is no change in tail, it still points to the previous node, continue to add element 20, then the state is as follows
As shown above, after adding element 20, tail points to the most recently added node.
poll function
public E poll() {
restartFromHead:
for (;;) { // infinite loop
for (Node<E> h = head, p = h, q;;) { // Save header nodes
// itemclassifier for principles, items, clauses, tasks, research projects etc
E item = ;
if (item != null && (item, null)) { // itemlet sb. do sth.nulland compare and replaceitemsuccesses
// Successful CAS is the linearization point
// for item to be removed from this queue.
if (p != h) // pnot equal ≠h // hop two nodes at a time
// Update header nodes
updateHead(h, ((q = ) != null) ? q : p);
// come (or go) backitem
return item;
}
else if ((q = ) == null) { // qThe nodes arenull
// Update header nodes
updateHead(h, p);
return null;
}
else if (p == q) // pbe tantamount toq
// Keep cycling.
continue restartFromHead;
else
// passignq
p = q;
}
}
}
Description: This function is used to obtain and remove the head of this queue, if the queue is empty, then return null. the following simulation of the operation of the poll function, the state of the queue changes (assuming a single-threaded operation, the state of the state of the previous offer10, 20 after the state of the poll twice).
The initial state of the queue is shown above, and after the poll operation, the state of the queue is shown below
As can be seen in the above figure, after the poll operation, the head changes and the item of the node pointed to by the head becomes null. another poll operation is performed and the state of the queue is shown in the following figure.
As can be seen in the above figure, after the poll operation, the HEAD node is unchanged, only the ITEM field of the indicated node becomes NULL.
remove function
public boolean remove(Object o) {
// If the element is null, return
if (o == null) return false;
Node<E> pred = null;
for (Node<E> p = first(); p ! = null; p = succ(p)) { // get the first surviving node
// Get the item value of the first surviving node.
E item = ;
if (item ! = null &&
(item) &&
(item, null)) { // find the node with equal item and set that node's item to null
// Successor node to p
Node<E> next = succ(p);
if (pred ! = null && next ! = null) // pred is not null and next is not null
// Compare and replace the next field
(p, next); return true; // compare and replace the next field.
return true; }
}
// assign pred to p
pred = p; }
}
return false; }
}
Description: This function is used to remove a single instance of a specified element from a queue, if it exists. The first and succ functions are called. The source code for the first function is as follows
Node<E> first() {
restartFromHead.
for (;;) { // infinite loop to ensure success
for (Node<E> h = head, p = h, q;;) {
// Whether the item field of the p node is null or not
boolean hasItem = ( ! = null);
if (hasItem || (q = ) == null) { // item is not null or next domain is null
// Update the head node
updateHead(h, p);
// Return the node
return hasItem ? p : null;
}
else if (p == q) // p equals q
// continue from the head node
continue restartFromHead;
restartFromHead; } else
// p is assigned to q
p = q; }
}
}
}
Description: The first function is used to find the first surviving node in a linked table. succ function source code is as follows
final Node<E> succ(Node<E> p) {
// The next field of the p node
Node<E> next = ;
// Returns the head node if the next field is itself, otherwise, returns next
return (p == next) ? head : next;
}
Description: succ is used to get the next node of a node. If the next domain of the node points to itself, then return the head head node, otherwise, return the next node. The following simulation of the operation of the remove function, the queue state changes (assuming single-threaded operation, the state of the previous offer 10, 20 after the state, the implementation of remove (10), remove (20) operation).
As shown in the above figure, it is the initial state of ConcurrentLinkedQueue, and the state after remove(10) is shown as follows
As shown above, when remove(10) is executed, head points to the next node of the node previously pointed to by the head node, and the item field of the head node is set to null. continue to execute remove(20), and the state is shown below
As shown above, after executing remove(20), head and tail point to the same node and the item field is null.
size function
public int size() {
// count
int count = 0; for (Node<E> p = first(); p !
for (Node<E> p = first(); p ! = null; p = succ(p)) // Traverse backward from the first surviving node
if ( ! = null) // The node's item field is not null.
// () spec says to max out
if (++count == Integer.MAX_VALUE) // Increase count, if it reaches the maximum value, break out of the loop.
break;
// return size
return count.
}
Description: This function returns the size of the ConcurrenLinkedQueue, starting with the first surviving node (first) and traversing backward through the linked table, increasing the count when the node's item field is not null, and returning the size afterwards.
Design of HOPS (delayed update policy)
By analyzing the offer and poll methods above, we find that tail and head are delayed updates, and the update trigger timing for both is:
-
Tail update trigger timing: when the next node of the node pointed to by tail is not null, the operation of locating the real tail node of the queue will be performed, and the tail update will be performed through casTail only after the insertion of the node is completed after finding the tail node; when the next node of the node pointed to by tail is null, the node will only be inserted without updating the tail.
-
head update trigger timing: when the item field of the node pointed to by the head is null, the operation of locating the real head node of the queue will be executed, and the head node will be deleted after finding the head node before updating the head; when the item field of the node pointed to by the head is not null, the node will be deleted without updating the head.
As you can see from the above update state diagram, the head and tail updates are "jumping", i.e. there is always a gap between them. So what is the intention of this design?
If you let tail always act as the tail node of the queue, the amount of code to implement would be less and the logic would be more understandable. However, this has a disadvantage, if a large number of operations into the queue, each time to perform CAS for tail update, the aggregate performance will also be a great loss. If you can reduce the CAS update operation, you can undoubtedly greatly improve the efficiency of the operation into the queue, so doug lea master every interval of 1 time (tail and tail node distance of 1) to carry out the use of CAS to update the tail. the head of the update is also the same reason, although, so the design will be more than in the loop to locate the tail node, but the overall efficiency of the read operation is much higher than the write performance. The same applies to the head. Although this design will result in more operations to locate the tail node in the loop, the overall efficiency of the read operation is much higher than that of the write operation, so the performance loss of the extra operation of locating the tail node in the loop is relatively small.
About the Author.
From the first-line programmer Seven's exploration and practice, continuous learning iteration in the~
This article is included in my personal blog:https://
Public number: seven97, welcome to follow~